Yarn: Add WaitGroup WaitGroup is a synchronization primitive that holds an internal counter that can incremented, decremented and waited on until it reaches 0. WaitGroups can be used as a simple mechanism for waiting on a number of concurrently execute a number of tasks to complete. Bug: b/139010488 Change-Id: I086859b81509076de3dbce8a5fde656ab4e4e347 Reviewed-on: https://swiftshader-review.googlesource.com/c/SwiftShader/+/34816 Tested-by: Ben Clayton <bclayton@google.com> Reviewed-by: Nicolas Capens <nicolascapens@google.com> Kokoro-Presubmit: kokoro <noreply+kokoro@google.com>
diff --git a/src/Yarn/Scheduler_test.cpp b/src/Yarn/Scheduler_test.cpp index 46f365c..1b48301 100644 --- a/src/Yarn/Scheduler_test.cpp +++ b/src/Yarn/Scheduler_test.cpp
@@ -14,6 +14,8 @@ #include "Yarn_test.hpp" +#include "Yarn/WaitGroup.hpp" + TEST(WithoutBoundScheduler, SchedulerConstructAndDestruct) { auto scheduler = new yarn::Scheduler(); @@ -44,3 +46,69 @@ yarn::schedule([] {}); } } + +TEST_P(WithBoundScheduler, DestructWithPendingFibers) +{ + yarn::WaitGroup wg(1); + for (int i = 0; i < 10000; i++) + { + yarn::schedule([=] { wg.wait(); }); + } + wg.done(); + + auto scheduler = yarn::Scheduler::get(); + scheduler->unbind(); + delete scheduler; + + // Rebind a new scheduler so WithBoundScheduler::TearDown() is happy. + (new yarn::Scheduler())->bind(); +} + +TEST_P(WithBoundScheduler, FibersResumeOnSameYarnThread) +{ + yarn::WaitGroup fence(1); + yarn::WaitGroup wg(1000); + for (int i = 0; i < 1000; i++) + { + yarn::schedule([=] { + auto threadID = std::this_thread::get_id(); + fence.wait(); + ASSERT_EQ(threadID, std::this_thread::get_id()); + wg.done(); + }); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); // just to try and get some tasks to yield. + fence.done(); + wg.wait(); +} + +TEST_P(WithBoundScheduler, FibersResumeOnSameStdThread) +{ + auto scheduler = yarn::Scheduler::get(); + + yarn::WaitGroup fence(1); + yarn::WaitGroup wg(1000); + + std::vector<std::thread> threads; + for (int i = 0; i < 1000; i++) + { + threads.push_back(std::thread([=] { + scheduler->bind(); + + auto threadID = std::this_thread::get_id(); + fence.wait(); + ASSERT_EQ(threadID, std::this_thread::get_id()); + wg.done(); + + scheduler->unbind(); + })); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); // just to try and get some tasks to yield. + fence.done(); + wg.wait(); + + for (auto& thread : threads) + { + thread.join(); + } +} \ No newline at end of file
diff --git a/src/Yarn/WaitGroup.hpp b/src/Yarn/WaitGroup.hpp new file mode 100644 index 0000000..a8c6b14 --- /dev/null +++ b/src/Yarn/WaitGroup.hpp
@@ -0,0 +1,108 @@ +// Copyright 2019 The SwiftShader Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef yarn_waitgroup_hpp +#define yarn_waitgroup_hpp + +#include "ConditionVariable.hpp" +#include "Debug.hpp" + +#include <atomic> +#include <mutex> + +namespace yarn { + +// WaitGroup is a synchronization primitive that holds an internal counter that +// can incremented, decremented and waited on until it reaches 0. +// WaitGroups can be used as a simple mechanism for waiting on a number of +// concurrently execute a number of tasks to complete. +// +// Example: +// +// void runTasksConcurrently(int numConcurrentTasks) +// { +// // Construct the WaitGroup with an initial count of numConcurrentTasks. +// yarn::WaitGroup wg(numConcurrentTasks); +// for (int i = 0; i < numConcurrentTasks; i++) +// { +// // Schedule a task to be run asynchronously. +// // These may all be run concurrently. +// yarn::schedule([=] { +// // Once the task has finished, decrement the waitgroup counter +// // to signal that this has completed. +// defer(wg.done()); +// doSomeWork(); +// }); +// } +// // Block until all tasks have completed. +// wg.wait(); +// } +class WaitGroup +{ +public: + // Constructs the WaitGroup with the specified initial count. + inline WaitGroup(unsigned int initialCount = 0); + + // add() increments the internal counter by count. + inline void add(unsigned int count = 1) const; + + // done() decrements the internal counter by one. + // Returns true if the internal count has reached zero. + inline bool done() const; + + // wait() blocks until the WaitGroup counter reaches zero. + inline void wait() const; + +private: + struct Data + { + std::atomic<unsigned int> count = { 0 }; + ConditionVariable condition; + std::mutex mutex; + }; + const std::shared_ptr<Data> data = std::make_shared<Data>(); +}; + +inline WaitGroup::WaitGroup(unsigned int initialCount /* = 0 */) +{ + data->count = initialCount; +} + +void WaitGroup::add(unsigned int count /* = 1 */) const +{ + data->count += count; +} + +bool WaitGroup::done() const +{ + YARN_ASSERT(data->count > 0, "yarn::WaitGroup::done() called too many times"); + auto count = --data->count; + if (count == 0) + { + std::unique_lock<std::mutex> lock(data->mutex); + data->condition.notify_all(); + return true; + } + return false; +} + +void WaitGroup::wait() const +{ + std::unique_lock<std::mutex> lock(data->mutex); + data->condition.wait(lock, [this]{ return data->count == 0; }); +} + +} // namespace yarn + +#endif // yarn_waitgroup_hpp
diff --git a/src/Yarn/WaitGroup_test.cpp b/src/Yarn/WaitGroup_test.cpp new file mode 100644 index 0000000..d2048ff --- /dev/null +++ b/src/Yarn/WaitGroup_test.cpp
@@ -0,0 +1,61 @@ +// Copyright 2019 The SwiftShader Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "Yarn_test.hpp" + +#include "Yarn/WaitGroup.hpp" + +TEST(WithoutBoundScheduler, WaitGroupDone) +{ + yarn::WaitGroup wg(2); // Should not require a scheduler. + wg.done(); + wg.done(); +} + +#if YARN_DEBUG_ENABLED +TEST(WithoutBoundScheduler, WaitGroupDoneTooMany) +{ + yarn::WaitGroup wg(2); // Should not require a scheduler. + wg.done(); + wg.done(); + EXPECT_DEATH(wg.done(), "done\\(\\) called too many times"); +} +#endif // YARN_DEBUG_ENABLED + +TEST_P(WithBoundScheduler, WaitGroup_OneTask) +{ + yarn::WaitGroup wg(1); + std::atomic<int> counter = {0}; + yarn::schedule([&counter, wg] { + counter++; + wg.done(); + }); + wg.wait(); + ASSERT_EQ(counter.load(), 1); +} + +TEST_P(WithBoundScheduler, WaitGroup_10Tasks) +{ + yarn::WaitGroup wg(10); + std::atomic<int> counter = {0}; + for (int i = 0; i < 10; i++) + { + yarn::schedule([&counter, wg] { + counter++; + wg.done(); + }); + } + wg.wait(); + ASSERT_EQ(counter.load(), 10); +}