Yarn: Add WaitGroup
WaitGroup is a synchronization primitive that holds an internal counter that can incremented, decremented and waited on until it reaches 0.
WaitGroups can be used as a simple mechanism for waiting on a number of concurrently execute a number of tasks to complete.
Bug: b/139010488
Change-Id: I086859b81509076de3dbce8a5fde656ab4e4e347
Reviewed-on: https://swiftshader-review.googlesource.com/c/SwiftShader/+/34816
Tested-by: Ben Clayton <bclayton@google.com>
Reviewed-by: Nicolas Capens <nicolascapens@google.com>
Kokoro-Presubmit: kokoro <noreply+kokoro@google.com>
diff --git a/src/Yarn/Scheduler_test.cpp b/src/Yarn/Scheduler_test.cpp
index 46f365c..1b48301 100644
--- a/src/Yarn/Scheduler_test.cpp
+++ b/src/Yarn/Scheduler_test.cpp
@@ -14,6 +14,8 @@
#include "Yarn_test.hpp"
+#include "Yarn/WaitGroup.hpp"
+
TEST(WithoutBoundScheduler, SchedulerConstructAndDestruct)
{
auto scheduler = new yarn::Scheduler();
@@ -44,3 +46,69 @@
yarn::schedule([] {});
}
}
+
+TEST_P(WithBoundScheduler, DestructWithPendingFibers)
+{
+ yarn::WaitGroup wg(1);
+ for (int i = 0; i < 10000; i++)
+ {
+ yarn::schedule([=] { wg.wait(); });
+ }
+ wg.done();
+
+ auto scheduler = yarn::Scheduler::get();
+ scheduler->unbind();
+ delete scheduler;
+
+ // Rebind a new scheduler so WithBoundScheduler::TearDown() is happy.
+ (new yarn::Scheduler())->bind();
+}
+
+TEST_P(WithBoundScheduler, FibersResumeOnSameYarnThread)
+{
+ yarn::WaitGroup fence(1);
+ yarn::WaitGroup wg(1000);
+ for (int i = 0; i < 1000; i++)
+ {
+ yarn::schedule([=] {
+ auto threadID = std::this_thread::get_id();
+ fence.wait();
+ ASSERT_EQ(threadID, std::this_thread::get_id());
+ wg.done();
+ });
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(10)); // just to try and get some tasks to yield.
+ fence.done();
+ wg.wait();
+}
+
+TEST_P(WithBoundScheduler, FibersResumeOnSameStdThread)
+{
+ auto scheduler = yarn::Scheduler::get();
+
+ yarn::WaitGroup fence(1);
+ yarn::WaitGroup wg(1000);
+
+ std::vector<std::thread> threads;
+ for (int i = 0; i < 1000; i++)
+ {
+ threads.push_back(std::thread([=] {
+ scheduler->bind();
+
+ auto threadID = std::this_thread::get_id();
+ fence.wait();
+ ASSERT_EQ(threadID, std::this_thread::get_id());
+ wg.done();
+
+ scheduler->unbind();
+ }));
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(10)); // just to try and get some tasks to yield.
+ fence.done();
+ wg.wait();
+
+ for (auto& thread : threads)
+ {
+ thread.join();
+ }
+}
\ No newline at end of file
diff --git a/src/Yarn/WaitGroup.hpp b/src/Yarn/WaitGroup.hpp
new file mode 100644
index 0000000..a8c6b14
--- /dev/null
+++ b/src/Yarn/WaitGroup.hpp
@@ -0,0 +1,108 @@
+// Copyright 2019 The SwiftShader Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef yarn_waitgroup_hpp
+#define yarn_waitgroup_hpp
+
+#include "ConditionVariable.hpp"
+#include "Debug.hpp"
+
+#include <atomic>
+#include <mutex>
+
+namespace yarn {
+
+// WaitGroup is a synchronization primitive that holds an internal counter that
+// can incremented, decremented and waited on until it reaches 0.
+// WaitGroups can be used as a simple mechanism for waiting on a number of
+// concurrently execute a number of tasks to complete.
+//
+// Example:
+//
+// void runTasksConcurrently(int numConcurrentTasks)
+// {
+// // Construct the WaitGroup with an initial count of numConcurrentTasks.
+// yarn::WaitGroup wg(numConcurrentTasks);
+// for (int i = 0; i < numConcurrentTasks; i++)
+// {
+// // Schedule a task to be run asynchronously.
+// // These may all be run concurrently.
+// yarn::schedule([=] {
+// // Once the task has finished, decrement the waitgroup counter
+// // to signal that this has completed.
+// defer(wg.done());
+// doSomeWork();
+// });
+// }
+// // Block until all tasks have completed.
+// wg.wait();
+// }
+class WaitGroup
+{
+public:
+ // Constructs the WaitGroup with the specified initial count.
+ inline WaitGroup(unsigned int initialCount = 0);
+
+ // add() increments the internal counter by count.
+ inline void add(unsigned int count = 1) const;
+
+ // done() decrements the internal counter by one.
+ // Returns true if the internal count has reached zero.
+ inline bool done() const;
+
+ // wait() blocks until the WaitGroup counter reaches zero.
+ inline void wait() const;
+
+private:
+ struct Data
+ {
+ std::atomic<unsigned int> count = { 0 };
+ ConditionVariable condition;
+ std::mutex mutex;
+ };
+ const std::shared_ptr<Data> data = std::make_shared<Data>();
+};
+
+inline WaitGroup::WaitGroup(unsigned int initialCount /* = 0 */)
+{
+ data->count = initialCount;
+}
+
+void WaitGroup::add(unsigned int count /* = 1 */) const
+{
+ data->count += count;
+}
+
+bool WaitGroup::done() const
+{
+ YARN_ASSERT(data->count > 0, "yarn::WaitGroup::done() called too many times");
+ auto count = --data->count;
+ if (count == 0)
+ {
+ std::unique_lock<std::mutex> lock(data->mutex);
+ data->condition.notify_all();
+ return true;
+ }
+ return false;
+}
+
+void WaitGroup::wait() const
+{
+ std::unique_lock<std::mutex> lock(data->mutex);
+ data->condition.wait(lock, [this]{ return data->count == 0; });
+}
+
+} // namespace yarn
+
+#endif // yarn_waitgroup_hpp
diff --git a/src/Yarn/WaitGroup_test.cpp b/src/Yarn/WaitGroup_test.cpp
new file mode 100644
index 0000000..d2048ff
--- /dev/null
+++ b/src/Yarn/WaitGroup_test.cpp
@@ -0,0 +1,61 @@
+// Copyright 2019 The SwiftShader Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "Yarn_test.hpp"
+
+#include "Yarn/WaitGroup.hpp"
+
+TEST(WithoutBoundScheduler, WaitGroupDone)
+{
+ yarn::WaitGroup wg(2); // Should not require a scheduler.
+ wg.done();
+ wg.done();
+}
+
+#if YARN_DEBUG_ENABLED
+TEST(WithoutBoundScheduler, WaitGroupDoneTooMany)
+{
+ yarn::WaitGroup wg(2); // Should not require a scheduler.
+ wg.done();
+ wg.done();
+ EXPECT_DEATH(wg.done(), "done\\(\\) called too many times");
+}
+#endif // YARN_DEBUG_ENABLED
+
+TEST_P(WithBoundScheduler, WaitGroup_OneTask)
+{
+ yarn::WaitGroup wg(1);
+ std::atomic<int> counter = {0};
+ yarn::schedule([&counter, wg] {
+ counter++;
+ wg.done();
+ });
+ wg.wait();
+ ASSERT_EQ(counter.load(), 1);
+}
+
+TEST_P(WithBoundScheduler, WaitGroup_10Tasks)
+{
+ yarn::WaitGroup wg(10);
+ std::atomic<int> counter = {0};
+ for (int i = 0; i < 10; i++)
+ {
+ yarn::schedule([&counter, wg] {
+ counter++;
+ wg.done();
+ });
+ }
+ wg.wait();
+ ASSERT_EQ(counter.load(), 10);
+}