Yarn: Add ConditionVariable

Basically a partial clone of std::condition_variable, but works with fibers / the scheduler.

Bug: b/139010488
Change-Id: Ia89a4930f8c203b03197a4dda9ddd585ae5d8e40
Reviewed-on: https://swiftshader-review.googlesource.com/c/SwiftShader/+/34815
Tested-by: Ben Clayton <bclayton@google.com>
Reviewed-by: Nicolas Capens <nicolascapens@google.com>
Kokoro-Presubmit: kokoro <noreply+kokoro@google.com>
diff --git a/src/Yarn/ConditionVariable.hpp b/src/Yarn/ConditionVariable.hpp
new file mode 100644
index 0000000..7676c42
--- /dev/null
+++ b/src/Yarn/ConditionVariable.hpp
@@ -0,0 +1,119 @@
+// Copyright 2019 The SwiftShader Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef yarn_condition_variable_hpp
+#define yarn_condition_variable_hpp
+
+#include "Containers.hpp"
+#include "Debug.hpp"
+#include "Scheduler.hpp"
+
+#include <atomic>
+#include <mutex>
+
+namespace yarn {
+
+// ConditionVariable is a synchronization primitive that can be used to block
+// one or more fibers or threads, until another fiber or thread modifies a
+// shared variable (the condition) and notifies the ConditionVariable.
+//
+// If the ConditionVariable is blocked on a thread with a Scheduler bound, the
+// thread will work on other tasks until the ConditionVariable is unblocked.
+class ConditionVariable
+{
+public:
+    // Notifies and potentially unblocks one waiting fiber or thread.
+    inline void notify_one();
+
+    // Notifies and potentially unblocks all waiting fibers and/or threads.
+    inline void notify_all();
+
+    // Blocks the current fiber or thread until the predicate is satisfied
+    // and the ConditionVariable is notified.
+    template <typename Predicate>
+    inline void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
+
+private:
+    std::mutex mutex;
+    containers::vector<Scheduler::Fiber*, 4> waiting;
+    std::condition_variable condition;
+    std::atomic<int> numWaiting = { 0 };
+    std::atomic<int> numWaitingOnCondition = { 0 };
+};
+
+void ConditionVariable::notify_one()
+{
+    if (numWaiting == 0) { return; }
+    std::unique_lock<std::mutex> lock(mutex);
+    if (waiting.size() > 0)
+    {
+        auto fiber = waiting.back();
+        waiting.pop_back();
+        fiber->schedule();
+    }
+    lock.unlock();
+    if (numWaitingOnCondition > 0) { condition.notify_one(); }
+}
+
+void ConditionVariable::notify_all()
+{
+    if (numWaiting == 0) { return; }
+    std::unique_lock<std::mutex> lock(mutex);
+    while (waiting.size() > 0)
+    {
+        auto fiber = waiting.back();
+        waiting.pop_back();
+        fiber->schedule();
+    }
+    lock.unlock();
+    if (numWaitingOnCondition > 0) { condition.notify_all(); }
+}
+
+template <typename Predicate>
+void ConditionVariable::wait(std::unique_lock<std::mutex>& dataLock, Predicate pred)
+{
+    if (pred())
+    {
+        return;
+    }
+    numWaiting++;
+    if (auto fiber = Scheduler::Fiber::current())
+    {
+        // Currently executing on a scheduler fiber.
+        // Yield to let other tasks run that can unblock this fiber.
+        while (!pred())
+        {
+            mutex.lock();
+            waiting.push_back(fiber);
+            mutex.unlock();
+
+            dataLock.unlock();
+            fiber->yield();
+            dataLock.lock();
+        }
+    }
+    else
+    {
+        // Currently running outside of the scheduler.
+        // Delegate to the std::condition_variable.
+        numWaitingOnCondition++;
+        condition.wait(dataLock, pred);
+        numWaitingOnCondition--;
+    }
+    numWaiting--;
+}
+
+} // namespace yarn
+
+#endif // yarn_condition_variable_hpp
diff --git a/src/Yarn/ConditionVariable_test.cpp b/src/Yarn/ConditionVariable_test.cpp
new file mode 100644
index 0000000..a7bc327
--- /dev/null
+++ b/src/Yarn/ConditionVariable_test.cpp
@@ -0,0 +1,96 @@
+// Copyright 2019 The SwiftShader Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "ConditionVariable.hpp"
+
+#include "Yarn_test.hpp"
+
+TEST(WithoutBoundScheduler, ConditionVariable)
+{
+    bool trigger[3] = {false, false, false};
+    bool signal[3] = {false, false, false};
+    std::mutex mutex;
+    yarn::ConditionVariable cv;
+
+    std::thread thread([&]
+    {
+        for (int i = 0; i < 3; i++)
+        {
+            std::unique_lock<std::mutex> lock(mutex);
+            cv.wait(lock, [&] { return trigger[i]; });
+            signal[i] = true;
+            cv.notify_one();
+        }
+    });
+
+    ASSERT_FALSE(signal[0]);
+    ASSERT_FALSE(signal[1]);
+    ASSERT_FALSE(signal[2]);
+
+    for (int i = 0; i < 3; i++)
+    {
+        {
+            std::unique_lock<std::mutex> lock(mutex);
+            trigger[i] = true;
+            cv.notify_one();
+            cv.wait(lock, [&] { return signal[i]; });
+        }
+
+        ASSERT_EQ(signal[0], 0 <= i);
+        ASSERT_EQ(signal[1], 1 <= i);
+        ASSERT_EQ(signal[2], 2 <= i);
+    }
+
+    thread.join();
+}
+
+
+TEST_P(WithBoundScheduler, ConditionVariable)
+{
+    bool trigger[3] = {false, false, false};
+    bool signal[3] = {false, false, false};
+    std::mutex mutex;
+    yarn::ConditionVariable cv;
+
+    std::thread thread([&]
+    {
+        for (int i = 0; i < 3; i++)
+        {
+            std::unique_lock<std::mutex> lock(mutex);
+            cv.wait(lock, [&] { return trigger[i]; });
+            signal[i] = true;
+            cv.notify_one();
+        }
+    });
+
+    ASSERT_FALSE(signal[0]);
+    ASSERT_FALSE(signal[1]);
+    ASSERT_FALSE(signal[2]);
+
+    for (int i = 0; i < 3; i++)
+    {
+        {
+            std::unique_lock<std::mutex> lock(mutex);
+            trigger[i] = true;
+            cv.notify_one();
+            cv.wait(lock, [&] { return signal[i]; });
+        }
+
+        ASSERT_EQ(signal[0], 0 <= i);
+        ASSERT_EQ(signal[1], 1 <= i);
+        ASSERT_EQ(signal[2], 2 <= i);
+    }
+
+    thread.join();
+}