Yarn: Add WaitGroup

WaitGroup is a synchronization primitive that holds an internal counter that can incremented, decremented and waited on until it reaches 0.
WaitGroups can be used as a simple mechanism for waiting on a number of concurrently execute a number of tasks to complete.

Bug: b/139010488
Change-Id: I086859b81509076de3dbce8a5fde656ab4e4e347
Reviewed-on: https://swiftshader-review.googlesource.com/c/SwiftShader/+/34816
Tested-by: Ben Clayton <bclayton@google.com>
Reviewed-by: Nicolas Capens <nicolascapens@google.com>
Kokoro-Presubmit: kokoro <noreply+kokoro@google.com>
diff --git a/src/Yarn/Scheduler_test.cpp b/src/Yarn/Scheduler_test.cpp
index 46f365c..1b48301 100644
--- a/src/Yarn/Scheduler_test.cpp
+++ b/src/Yarn/Scheduler_test.cpp
@@ -14,6 +14,8 @@
 
 #include "Yarn_test.hpp"
 
+#include "Yarn/WaitGroup.hpp"
+
 TEST(WithoutBoundScheduler, SchedulerConstructAndDestruct)
 {
     auto scheduler = new yarn::Scheduler();
@@ -44,3 +46,69 @@
         yarn::schedule([] {});
     }
 }
+
+TEST_P(WithBoundScheduler, DestructWithPendingFibers)
+{
+    yarn::WaitGroup wg(1);
+    for (int i = 0; i < 10000; i++)
+    {
+        yarn::schedule([=] { wg.wait(); });
+    }
+    wg.done();
+
+    auto scheduler = yarn::Scheduler::get();
+    scheduler->unbind();
+    delete scheduler;
+
+    // Rebind a new scheduler so WithBoundScheduler::TearDown() is happy.
+    (new yarn::Scheduler())->bind();
+}
+
+TEST_P(WithBoundScheduler, FibersResumeOnSameYarnThread)
+{
+    yarn::WaitGroup fence(1);
+    yarn::WaitGroup wg(1000);
+    for (int i = 0; i < 1000; i++)
+    {
+        yarn::schedule([=] {
+            auto threadID = std::this_thread::get_id();
+            fence.wait();
+            ASSERT_EQ(threadID, std::this_thread::get_id());
+            wg.done();
+        });
+    }
+    std::this_thread::sleep_for(std::chrono::milliseconds(10)); // just to try and get some tasks to yield.
+    fence.done();
+    wg.wait();
+}
+
+TEST_P(WithBoundScheduler, FibersResumeOnSameStdThread)
+{
+    auto scheduler = yarn::Scheduler::get();
+
+    yarn::WaitGroup fence(1);
+    yarn::WaitGroup wg(1000);
+
+    std::vector<std::thread> threads;
+    for (int i = 0; i < 1000; i++)
+    {
+        threads.push_back(std::thread([=] {
+            scheduler->bind();
+
+            auto threadID = std::this_thread::get_id();
+            fence.wait();
+            ASSERT_EQ(threadID, std::this_thread::get_id());
+            wg.done();
+
+            scheduler->unbind();
+        }));
+    }
+    std::this_thread::sleep_for(std::chrono::milliseconds(10)); // just to try and get some tasks to yield.
+    fence.done();
+    wg.wait();
+
+    for (auto& thread : threads)
+    {
+        thread.join();
+    }
+}
\ No newline at end of file
diff --git a/src/Yarn/WaitGroup.hpp b/src/Yarn/WaitGroup.hpp
new file mode 100644
index 0000000..a8c6b14
--- /dev/null
+++ b/src/Yarn/WaitGroup.hpp
@@ -0,0 +1,108 @@
+// Copyright 2019 The SwiftShader Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef yarn_waitgroup_hpp
+#define yarn_waitgroup_hpp
+
+#include "ConditionVariable.hpp"
+#include "Debug.hpp"
+
+#include <atomic>
+#include <mutex>
+
+namespace yarn {
+
+// WaitGroup is a synchronization primitive that holds an internal counter that
+// can incremented, decremented and waited on until it reaches 0.
+// WaitGroups can be used as a simple mechanism for waiting on a number of
+// concurrently execute a number of tasks to complete.
+//
+// Example:
+//
+//  void runTasksConcurrently(int numConcurrentTasks)
+//  {
+//      // Construct the WaitGroup with an initial count of numConcurrentTasks.
+//      yarn::WaitGroup wg(numConcurrentTasks);
+//      for (int i = 0; i < numConcurrentTasks; i++)
+//      {
+//          // Schedule a task to be run asynchronously.
+//          // These may all be run concurrently.
+//          yarn::schedule([=] {
+//              // Once the task has finished, decrement the waitgroup counter
+//              // to signal that this has completed.
+//              defer(wg.done());
+//              doSomeWork();
+//          });
+//      }
+//      // Block until all tasks have completed.
+//      wg.wait();
+//  }
+class WaitGroup
+{
+public:
+    // Constructs the WaitGroup with the specified initial count.
+    inline WaitGroup(unsigned int initialCount = 0);
+
+    // add() increments the internal counter by count.
+    inline void add(unsigned int count = 1) const;
+
+    // done() decrements the internal counter by one.
+    // Returns true if the internal count has reached zero.
+    inline bool done() const;
+
+    // wait() blocks until the WaitGroup counter reaches zero.
+    inline void wait() const;
+
+private:
+    struct Data
+    {
+        std::atomic<unsigned int> count = { 0 };
+        ConditionVariable condition;
+        std::mutex mutex;
+    };
+    const std::shared_ptr<Data> data = std::make_shared<Data>();
+};
+
+inline WaitGroup::WaitGroup(unsigned int initialCount /* = 0 */)
+{
+    data->count = initialCount;
+}
+
+void WaitGroup::add(unsigned int count /* = 1 */) const
+{
+    data->count += count;
+}
+
+bool WaitGroup::done() const
+{
+    YARN_ASSERT(data->count > 0, "yarn::WaitGroup::done() called too many times");
+    auto count = --data->count;
+    if (count == 0)
+    {
+        std::unique_lock<std::mutex> lock(data->mutex);
+        data->condition.notify_all();
+        return true;
+    }
+    return false;
+}
+
+void WaitGroup::wait() const
+{
+    std::unique_lock<std::mutex> lock(data->mutex);
+    data->condition.wait(lock, [this]{ return data->count == 0; });
+}
+
+} // namespace yarn
+
+#endif  // yarn_waitgroup_hpp
diff --git a/src/Yarn/WaitGroup_test.cpp b/src/Yarn/WaitGroup_test.cpp
new file mode 100644
index 0000000..d2048ff
--- /dev/null
+++ b/src/Yarn/WaitGroup_test.cpp
@@ -0,0 +1,61 @@
+// Copyright 2019 The SwiftShader Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "Yarn_test.hpp"
+
+#include "Yarn/WaitGroup.hpp"
+
+TEST(WithoutBoundScheduler, WaitGroupDone)
+{
+    yarn::WaitGroup wg(2); // Should not require a scheduler.
+    wg.done();
+    wg.done();
+}
+
+#if YARN_DEBUG_ENABLED
+TEST(WithoutBoundScheduler, WaitGroupDoneTooMany)
+{
+    yarn::WaitGroup wg(2); // Should not require a scheduler.
+    wg.done();
+    wg.done();
+    EXPECT_DEATH(wg.done(), "done\\(\\) called too many times");
+}
+#endif // YARN_DEBUG_ENABLED
+
+TEST_P(WithBoundScheduler, WaitGroup_OneTask)
+{
+    yarn::WaitGroup wg(1);
+    std::atomic<int> counter = {0};
+    yarn::schedule([&counter, wg] {
+        counter++;
+        wg.done();
+    });
+    wg.wait();
+    ASSERT_EQ(counter.load(), 1);
+}
+
+TEST_P(WithBoundScheduler, WaitGroup_10Tasks)
+{
+    yarn::WaitGroup wg(10);
+    std::atomic<int> counter = {0};
+    for (int i = 0; i < 10; i++)
+    {
+        yarn::schedule([&counter, wg] {
+            counter++;
+            wg.done();
+        });
+    }
+    wg.wait();
+    ASSERT_EQ(counter.load(), 10);
+}