Yarn: Add ConditionVariable
Basically a partial clone of std::condition_variable, but works with fibers / the scheduler.
Bug: b/139010488
Change-Id: Ia89a4930f8c203b03197a4dda9ddd585ae5d8e40
Reviewed-on: https://swiftshader-review.googlesource.com/c/SwiftShader/+/34815
Tested-by: Ben Clayton <bclayton@google.com>
Reviewed-by: Nicolas Capens <nicolascapens@google.com>
Kokoro-Presubmit: kokoro <noreply+kokoro@google.com>
diff --git a/src/Yarn/ConditionVariable.hpp b/src/Yarn/ConditionVariable.hpp
new file mode 100644
index 0000000..7676c42
--- /dev/null
+++ b/src/Yarn/ConditionVariable.hpp
@@ -0,0 +1,119 @@
+// Copyright 2019 The SwiftShader Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef yarn_condition_variable_hpp
+#define yarn_condition_variable_hpp
+
+#include "Containers.hpp"
+#include "Debug.hpp"
+#include "Scheduler.hpp"
+
+#include <atomic>
+#include <mutex>
+
+namespace yarn {
+
+// ConditionVariable is a synchronization primitive that can be used to block
+// one or more fibers or threads, until another fiber or thread modifies a
+// shared variable (the condition) and notifies the ConditionVariable.
+//
+// If the ConditionVariable is blocked on a thread with a Scheduler bound, the
+// thread will work on other tasks until the ConditionVariable is unblocked.
+class ConditionVariable
+{
+public:
+ // Notifies and potentially unblocks one waiting fiber or thread.
+ inline void notify_one();
+
+ // Notifies and potentially unblocks all waiting fibers and/or threads.
+ inline void notify_all();
+
+ // Blocks the current fiber or thread until the predicate is satisfied
+ // and the ConditionVariable is notified.
+ template <typename Predicate>
+ inline void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
+
+private:
+ std::mutex mutex;
+ containers::vector<Scheduler::Fiber*, 4> waiting;
+ std::condition_variable condition;
+ std::atomic<int> numWaiting = { 0 };
+ std::atomic<int> numWaitingOnCondition = { 0 };
+};
+
+void ConditionVariable::notify_one()
+{
+ if (numWaiting == 0) { return; }
+ std::unique_lock<std::mutex> lock(mutex);
+ if (waiting.size() > 0)
+ {
+ auto fiber = waiting.back();
+ waiting.pop_back();
+ fiber->schedule();
+ }
+ lock.unlock();
+ if (numWaitingOnCondition > 0) { condition.notify_one(); }
+}
+
+void ConditionVariable::notify_all()
+{
+ if (numWaiting == 0) { return; }
+ std::unique_lock<std::mutex> lock(mutex);
+ while (waiting.size() > 0)
+ {
+ auto fiber = waiting.back();
+ waiting.pop_back();
+ fiber->schedule();
+ }
+ lock.unlock();
+ if (numWaitingOnCondition > 0) { condition.notify_all(); }
+}
+
+template <typename Predicate>
+void ConditionVariable::wait(std::unique_lock<std::mutex>& dataLock, Predicate pred)
+{
+ if (pred())
+ {
+ return;
+ }
+ numWaiting++;
+ if (auto fiber = Scheduler::Fiber::current())
+ {
+ // Currently executing on a scheduler fiber.
+ // Yield to let other tasks run that can unblock this fiber.
+ while (!pred())
+ {
+ mutex.lock();
+ waiting.push_back(fiber);
+ mutex.unlock();
+
+ dataLock.unlock();
+ fiber->yield();
+ dataLock.lock();
+ }
+ }
+ else
+ {
+ // Currently running outside of the scheduler.
+ // Delegate to the std::condition_variable.
+ numWaitingOnCondition++;
+ condition.wait(dataLock, pred);
+ numWaitingOnCondition--;
+ }
+ numWaiting--;
+}
+
+} // namespace yarn
+
+#endif // yarn_condition_variable_hpp
diff --git a/src/Yarn/ConditionVariable_test.cpp b/src/Yarn/ConditionVariable_test.cpp
new file mode 100644
index 0000000..a7bc327
--- /dev/null
+++ b/src/Yarn/ConditionVariable_test.cpp
@@ -0,0 +1,96 @@
+// Copyright 2019 The SwiftShader Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "ConditionVariable.hpp"
+
+#include "Yarn_test.hpp"
+
+TEST(WithoutBoundScheduler, ConditionVariable)
+{
+ bool trigger[3] = {false, false, false};
+ bool signal[3] = {false, false, false};
+ std::mutex mutex;
+ yarn::ConditionVariable cv;
+
+ std::thread thread([&]
+ {
+ for (int i = 0; i < 3; i++)
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ cv.wait(lock, [&] { return trigger[i]; });
+ signal[i] = true;
+ cv.notify_one();
+ }
+ });
+
+ ASSERT_FALSE(signal[0]);
+ ASSERT_FALSE(signal[1]);
+ ASSERT_FALSE(signal[2]);
+
+ for (int i = 0; i < 3; i++)
+ {
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ trigger[i] = true;
+ cv.notify_one();
+ cv.wait(lock, [&] { return signal[i]; });
+ }
+
+ ASSERT_EQ(signal[0], 0 <= i);
+ ASSERT_EQ(signal[1], 1 <= i);
+ ASSERT_EQ(signal[2], 2 <= i);
+ }
+
+ thread.join();
+}
+
+
+TEST_P(WithBoundScheduler, ConditionVariable)
+{
+ bool trigger[3] = {false, false, false};
+ bool signal[3] = {false, false, false};
+ std::mutex mutex;
+ yarn::ConditionVariable cv;
+
+ std::thread thread([&]
+ {
+ for (int i = 0; i < 3; i++)
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ cv.wait(lock, [&] { return trigger[i]; });
+ signal[i] = true;
+ cv.notify_one();
+ }
+ });
+
+ ASSERT_FALSE(signal[0]);
+ ASSERT_FALSE(signal[1]);
+ ASSERT_FALSE(signal[2]);
+
+ for (int i = 0; i < 3; i++)
+ {
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ trigger[i] = true;
+ cv.notify_one();
+ cv.wait(lock, [&] { return signal[i]; });
+ }
+
+ ASSERT_EQ(signal[0], 0 <= i);
+ ASSERT_EQ(signal[1], 1 <= i);
+ ASSERT_EQ(signal[2], 2 <= i);
+ }
+
+ thread.join();
+}