Squashed 'third_party/marl/' changes from 49e4e3141..14e4d862a
14e4d862a Scheduler: Fix issues with fiber lists a timeouts.
57f41915d SwiftShader build fixes.
791187298 Add Event::any().
ecd5ab322 Implement yields with timeouts, wait_for() / wait_until()
6ba730d94 Update README.md
8348be4f0 Implement page-based functions for Fuchsia
5e512cd0c Fix condition logic of assert in TasksOnlyScheduledOnWorkerThreads.
37ae48f40 Add marl::Event.
a90725760 Update README.md (#49)
git-subtree-dir: third_party/marl
git-subtree-split: 14e4d862a959b831fd994a436e7c104c6fd19006
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3b63274..dfc57a5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -172,6 +172,7 @@
${MARL_SRC_DIR}/conditionvariable_test.cpp
${MARL_SRC_DIR}/containers_test.cpp
${MARL_SRC_DIR}/defer_test.cpp
+ ${MARL_SRC_DIR}/event_test.cpp
${MARL_SRC_DIR}/marl_test.cpp
${MARL_SRC_DIR}/marl_test.h
${MARL_SRC_DIR}/memory_test.cpp
diff --git a/README.md b/README.md
index 7c3ed94..8e847ca 100644
--- a/README.md
+++ b/README.md
@@ -10,17 +10,11 @@
Marl supports Windows, macOS, Linux, Fuchsia and Android (arm, aarch64, ppc64 (ELFv2), x86 and x64).
-Marl has no dependencies on other libraries (with exception on googletest for building the optional unit tests).
-
-Marl is in early development and will have breaking API changes.
-
-**More documentation and examples coming soon.**
-
-Note: This is not an officially supported Google product
+Marl has no dependencies on other libraries (with an exception on googletest for building the optional unit tests).
## Building
-Marl contains a number of unit tests and examples which can be built using CMake.
+Marl contains many unit tests and examples that can be built using CMake.
Unit tests require fetching the `googletest` external project, which can be done by typing the following in your terminal:
@@ -66,3 +60,7 @@
```cmake
target_include_directories($<target> PRIVATE "${MARL_DIR}/include") # replace <target> with the name of your project's target
```
+
+---
+
+Note: This is not an officially supported Google product
diff --git a/include/marl/conditionvariable.h b/include/marl/conditionvariable.h
index ecb1de4..c5b6787 100644
--- a/include/marl/conditionvariable.h
+++ b/include/marl/conditionvariable.h
@@ -15,13 +15,14 @@
#ifndef marl_condition_variable_h
#define marl_condition_variable_h
-#include "containers.h"
#include "debug.h"
+#include "defer.h"
#include "scheduler.h"
#include <atomic>
#include <condition_variable>
#include <mutex>
+#include <unordered_set>
namespace marl {
@@ -43,11 +44,29 @@
// wait() blocks the current fiber or thread until the predicate is satisfied
// and the ConditionVariable is notified.
template <typename Predicate>
- inline void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
+ inline void wait(std::unique_lock<std::mutex>& lock, Predicate&& pred);
+
+ // wait_for() blocks the current fiber or thread until the predicate is
+ // satisfied, and the ConditionVariable is notified, or the timeout has been
+ // reached. Returns false if pred still evaluates to false after the timeout
+ // has been reached, otherwise true.
+ template <typename Rep, typename Period, typename Predicate>
+ bool wait_for(std::unique_lock<std::mutex>& lock,
+ const std::chrono::duration<Rep, Period>& duration,
+ Predicate&& pred);
+
+ // wait_until() blocks the current fiber or thread until the predicate is
+ // satisfied, and the ConditionVariable is notified, or the timeout has been
+ // reached. Returns false if pred still evaluates to false after the timeout
+ // has been reached, otherwise true.
+ template <typename Clock, typename Duration, typename Predicate>
+ bool wait_until(std::unique_lock<std::mutex>& lock,
+ const std::chrono::time_point<Clock, Duration>& timeout,
+ Predicate&& pred);
private:
std::mutex mutex;
- containers::vector<Scheduler::Fiber*, 4> waiting;
+ std::unordered_set<Scheduler::Fiber*> waiting;
std::condition_variable condition;
std::atomic<int> numWaiting = {0};
std::atomic<int> numWaitingOnCondition = {0};
@@ -58,12 +77,12 @@
return;
}
std::unique_lock<std::mutex> lock(mutex);
- if (waiting.size() > 0) {
- auto fiber = waiting.back();
- waiting.pop_back();
+ for (auto fiber : waiting) {
fiber->schedule();
}
+ waiting.clear();
lock.unlock();
+
if (numWaitingOnCondition > 0) {
condition.notify_one();
}
@@ -74,20 +93,20 @@
return;
}
std::unique_lock<std::mutex> lock(mutex);
- while (waiting.size() > 0) {
- auto fiber = waiting.back();
- waiting.pop_back();
+ for (auto fiber : waiting) {
fiber->schedule();
}
+ waiting.clear();
lock.unlock();
+
if (numWaitingOnCondition > 0) {
condition.notify_all();
}
}
template <typename Predicate>
-void ConditionVariable::wait(std::unique_lock<std::mutex>& dataLock,
- Predicate pred) {
+void ConditionVariable::wait(std::unique_lock<std::mutex>& lock,
+ Predicate&& pred) {
if (pred()) {
return;
}
@@ -97,23 +116,71 @@
// Yield to let other tasks run that can unblock this fiber.
while (!pred()) {
mutex.lock();
- waiting.push_back(fiber);
+ waiting.emplace(fiber);
mutex.unlock();
- dataLock.unlock();
+ lock.unlock();
fiber->yield();
- dataLock.lock();
+ lock.lock();
}
} else {
// Currently running outside of the scheduler.
// Delegate to the std::condition_variable.
numWaitingOnCondition++;
- condition.wait(dataLock, pred);
+ condition.wait(lock, pred);
numWaitingOnCondition--;
}
numWaiting--;
}
+template <typename Rep, typename Period, typename Predicate>
+bool ConditionVariable::wait_for(
+ std::unique_lock<std::mutex>& lock,
+ const std::chrono::duration<Rep, Period>& duration,
+ Predicate&& pred) {
+ return wait_until(lock, std::chrono::system_clock::now() + duration, pred);
+}
+
+template <typename Clock, typename Duration, typename Predicate>
+bool ConditionVariable::wait_until(
+ std::unique_lock<std::mutex>& lock,
+ const std::chrono::time_point<Clock, Duration>& timeout,
+ Predicate&& pred) {
+ if (pred()) {
+ return true;
+ }
+ numWaiting++;
+ defer(numWaiting--);
+
+ if (auto fiber = Scheduler::Fiber::current()) {
+ // Currently executing on a scheduler fiber.
+ // Yield to let other tasks run that can unblock this fiber.
+ while (!pred()) {
+ mutex.lock();
+ waiting.emplace(fiber);
+ mutex.unlock();
+
+ lock.unlock();
+ fiber->yield_until(timeout);
+ lock.lock();
+
+ if (std::chrono::system_clock::now() >= timeout) {
+ mutex.lock();
+ waiting.erase(fiber);
+ mutex.unlock();
+ return false;
+ }
+ }
+ return true;
+ } else {
+ // Currently running outside of the scheduler.
+ // Delegate to the std::condition_variable.
+ numWaitingOnCondition++;
+ defer(numWaitingOnCondition--);
+ return condition.wait_until(lock, timeout, pred);
+ }
+}
+
} // namespace marl
#endif // marl_condition_variable_h
diff --git a/include/marl/event.h b/include/marl/event.h
new file mode 100644
index 0000000..2c8078e
--- /dev/null
+++ b/include/marl/event.h
@@ -0,0 +1,245 @@
+// Copyright 2019 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef marl_event_h
+#define marl_event_h
+
+#include "conditionvariable.h"
+#include "containers.h"
+#include "memory.h"
+
+#include <chrono>
+
+namespace marl {
+
+// Event is a synchronization primitive used to block until a signal is raised.
+class Event {
+ public:
+ enum class Mode {
+ // The event signal will be automatically reset when a call to wait()
+ // returns.
+ // A single call to signal() will only unblock a single (possibly
+ // future) call to wait().
+ Auto,
+
+ // While the event is in the signaled state, any calls to wait() will
+ // unblock without automatically reseting the signaled state.
+ // The signaled state can be reset with a call to clear().
+ Manual
+ };
+
+ inline Event(Mode mode = Mode::Auto,
+ bool initialState = false,
+ Allocator* allocator = Allocator::Default);
+
+ // signal() signals the event, possibly unblocking a call to wait().
+ inline void signal() const;
+
+ // clear() clears the signaled state.
+ inline void clear() const;
+
+ // wait() blocks until the event is signaled.
+ // If the event was constructed with the Auto Mode, then only one
+ // call to wait() will unblock before returning, upon which the signalled
+ // state will be automatically cleared.
+ inline void wait() const;
+
+ // wait_for() blocks until the event is signaled, or the timeout has been
+ // reached.
+ // If the timeout was reached, then wait_for() return false.
+ // If the event is signalled and event was constructed with the Auto Mode,
+ // then only one call to wait() will unblock before returning, upon which the
+ // signalled state will be automatically cleared.
+ template <typename Rep, typename Period>
+ inline bool wait_for(
+ const std::chrono::duration<Rep, Period>& duration) const;
+
+ // wait_until() blocks until the event is signaled, or the timeout has been
+ // reached.
+ // If the timeout was reached, then wait_for() return false.
+ // If the event is signalled and event was constructed with the Auto Mode,
+ // then only one call to wait() will unblock before returning, upon which the
+ // signalled state will be automatically cleared.
+ template <typename Clock, typename Duration>
+ inline bool wait_until(
+ const std::chrono::time_point<Clock, Duration>& timeout) const;
+
+ // test() returns true if the event is signaled, otherwise false.
+ // If the event is signalled and was constructed with the Auto Mode
+ // then the signalled state will be automatically cleared upon returning.
+ inline bool test() const;
+
+ // isSignalled() returns true if the event is signaled, otherwise false.
+ // Unlike test() the signal is not automatically cleared when the event was
+ // constructed with the Auto Mode.
+ // Note: No lock is held after bool() returns, so the event state may
+ // immediately change after returning. Use with caution.
+ inline bool isSignalled() const;
+
+ // any returns an event that is automatically signalled whenever any of the
+ // events in the list are signalled.
+ template <typename Iterator>
+ inline static Event any(Mode mode,
+ const Iterator& begin,
+ const Iterator& end);
+
+ // any returns an event that is automatically signalled whenever any of the
+ // events in the list are signalled.
+ // This overload defaults to using the Auto mode.
+ template <typename Iterator>
+ inline static Event any(const Iterator& begin, const Iterator& end);
+
+ private:
+ struct Shared {
+ inline Shared(Mode mode, bool initialState);
+ inline void signal();
+ inline void wait();
+
+ template <typename Rep, typename Period>
+ inline bool wait_for(const std::chrono::duration<Rep, Period>& duration);
+
+ template <typename Clock, typename Duration>
+ inline bool wait_until(
+ const std::chrono::time_point<Clock, Duration>& timeout);
+
+ std::mutex mutex;
+ ConditionVariable cv;
+ const Mode mode;
+ bool signalled;
+ containers::vector<std::shared_ptr<Shared>, 2> deps;
+ };
+
+ const std::shared_ptr<Shared> shared;
+};
+
+Event::Shared::Shared(Mode mode, bool initialState)
+ : mode(mode), signalled(initialState) {}
+
+void Event::Shared::signal() {
+ std::unique_lock<std::mutex> lock(mutex);
+ if (signalled) {
+ return;
+ }
+ signalled = true;
+ if (mode == Mode::Auto) {
+ cv.notify_one();
+ } else {
+ cv.notify_all();
+ }
+ for (auto dep : deps) {
+ dep->signal();
+ }
+}
+
+void Event::Shared::wait() {
+ std::unique_lock<std::mutex> lock(mutex);
+ cv.wait(lock, [&] { return signalled; });
+ if (mode == Mode::Auto) {
+ signalled = false;
+ }
+}
+
+template <typename Rep, typename Period>
+bool Event::Shared::wait_for(
+ const std::chrono::duration<Rep, Period>& duration) {
+ std::unique_lock<std::mutex> lock(mutex);
+ if (!cv.wait_for(lock, duration, [&] { return signalled; })) {
+ return false;
+ }
+ if (mode == Mode::Auto) {
+ signalled = false;
+ }
+ return true;
+}
+
+template <typename Clock, typename Duration>
+bool Event::Shared::wait_until(
+ const std::chrono::time_point<Clock, Duration>& timeout) {
+ std::unique_lock<std::mutex> lock(mutex);
+ if (!cv.wait_until(lock, timeout, [&] { return signalled; })) {
+ return false;
+ }
+ if (mode == Mode::Auto) {
+ signalled = false;
+ }
+ return true;
+}
+
+Event::Event(Mode mode /* = Mode::Auto */,
+ bool initialState /* = false */,
+ Allocator* allocator /* = Allocator::Default */)
+ : shared(allocator->make_shared<Shared>(mode, initialState)) {}
+
+void Event::signal() const {
+ shared->signal();
+}
+
+void Event::clear() const {
+ std::unique_lock<std::mutex> lock(shared->mutex);
+ shared->signalled = false;
+}
+
+void Event::wait() const {
+ shared->wait();
+}
+
+template <typename Rep, typename Period>
+bool Event::wait_for(const std::chrono::duration<Rep, Period>& duration) const {
+ return shared->wait_for(duration);
+}
+
+template <typename Clock, typename Duration>
+bool Event::wait_until(
+ const std::chrono::time_point<Clock, Duration>& timeout) const {
+ return shared->wait_until(timeout);
+}
+
+bool Event::test() const {
+ std::unique_lock<std::mutex> lock(shared->mutex);
+ if (!shared->signalled) {
+ return false;
+ }
+ if (shared->mode == Mode::Auto) {
+ shared->signalled = false;
+ }
+ return true;
+}
+
+bool Event::isSignalled() const {
+ std::unique_lock<std::mutex> lock(shared->mutex);
+ return shared->signalled;
+}
+
+template <typename Iterator>
+Event Event::any(Mode mode, const Iterator& begin, const Iterator& end) {
+ Event any(mode, false);
+ for (auto it = begin; it != end; it++) {
+ auto s = it->shared;
+ std::unique_lock<std::mutex> lock(s->mutex);
+ if (s->signalled) {
+ any.signal();
+ }
+ s->deps.push_back(any.shared);
+ }
+ return any;
+}
+
+template <typename Iterator>
+Event Event::any(const Iterator& begin, const Iterator& end) {
+ return any(Mode::Auto, begin, end);
+}
+
+} // namespace marl
+
+#endif // marl_event_h
diff --git a/include/marl/scheduler.h b/include/marl/scheduler.h
index 6490228..ffcddc3 100644
--- a/include/marl/scheduler.h
+++ b/include/marl/scheduler.h
@@ -21,12 +21,16 @@
#include <array>
#include <atomic>
+#include <chrono>
#include <condition_variable>
#include <functional>
+#include <map>
#include <mutex>
#include <queue>
+#include <set>
#include <thread>
#include <unordered_map>
+#include <unordered_set>
namespace marl {
@@ -45,6 +49,8 @@
class Worker;
public:
+ using TimePoint = std::chrono::system_clock::time_point;
+
Scheduler(Allocator* allocator = Allocator::Default);
~Scheduler();
@@ -101,6 +107,14 @@
// yield() must only be called on the currently executing fiber.
void yield();
+ // yield_until() suspends execution of this Fiber, allowing the thread to
+ // work on other tasks. yield_until() may automatically resume sometime
+ // after timeout.
+ // yield_until() must only be called on the currently executing fiber.
+ template <typename Clock, typename Duration>
+ inline void yield_until(
+ const std::chrono::time_point<Clock, Duration>& timeout);
+
// schedule() reschedules the suspended Fiber for execution.
void schedule();
@@ -113,6 +127,8 @@
Fiber(Allocator::unique_ptr<OSFiber>&&, uint32_t id);
+ void yield_until_sc(const TimePoint& timeout);
+
// switchTo() switches execution to the given fiber.
// switchTo() must only be called on the currently executing fiber.
void switchTo(Fiber*);
@@ -143,10 +159,40 @@
// Maximum number of worker threads.
static constexpr size_t MaxWorkerThreads = 256;
+ // WaitingFibers holds all the fibers waiting on a timeout.
+ struct WaitingFibers {
+ // operator bool() returns true iff there are any wait fibers.
+ inline operator bool() const;
+
+ // take() returns the next fiber that has exceeded its timeout, or nullptr
+ // if there are no fibers that have yet exceeded their timeouts.
+ inline Fiber* take(const TimePoint& timepoint);
+
+ // next() returns the timepoint of the next fiber to timeout.
+ // next() can only be called if operator bool() returns true.
+ inline TimePoint next() const;
+
+ // add() adds another fiber and timeout to the list of waiting fibers.
+ inline void add(const TimePoint& timeout, Fiber* fiber);
+
+ // erase() removes the fiber from the waiting list.
+ inline void erase(Fiber* fiber);
+
+ private:
+ struct Timeout {
+ TimePoint timepoint;
+ Fiber* fiber;
+ inline bool operator<(const Timeout&) const;
+ };
+ std::set<Timeout> timeouts;
+ std::unordered_map<Fiber*, TimePoint> fibers;
+ };
+
// TODO: Implement a queue that recycles elements to reduce number of
// heap allocations.
using TaskQueue = std::queue<Task>;
using FiberQueue = std::queue<Fiber*>;
+ using FiberSet = std::unordered_set<Fiber*>;
// Workers executes Tasks on a single thread.
// Once a task is started, it may yield to other tasks on the same Worker.
@@ -172,7 +218,9 @@
// yield() suspends execution of the current task, and looks for other
// tasks to start or continue execution.
- void yield(Fiber* fiber);
+ // If timeout is not nullptr, yield may automatically resume the current
+ // task sometime after timeout.
+ void yield(Fiber* fiber, const TimePoint* timeout);
// enqueue(Fiber*) enqueues resuming of a suspended fiber.
void enqueue(Fiber* fiber);
@@ -236,6 +284,10 @@
// frequently putting the thread to sleep and re-waking.
void spinForWork();
+ // enqueueFiberTimeouts() enqueues all the fibers that have finished
+ // waiting.
+ _Requires_lock_held_(lock) void enqueueFiberTimeouts();
+
// numBlockedFibers() returns the number of fibers currently blocked and
// held externally.
_Requires_lock_held_(lock) inline size_t numBlockedFibers() const {
@@ -247,6 +299,7 @@
std::atomic<uint64_t> num = {0}; // tasks.size() + fibers.size()
TaskQueue tasks; // guarded by mutex
FiberQueue fibers; // guarded by mutex
+ WaitingFibers waiting; // guarded by mutex
std::condition_variable added;
std::mutex mutex;
};
@@ -274,7 +327,7 @@
Fiber* currentFiber = nullptr;
std::thread thread;
Work work;
- FiberQueue idleFibers; // Fibers that have completed which can be reused.
+ FiberSet idleFibers; // Fibers that have completed which can be reused.
std::vector<Allocator::unique_ptr<Fiber>>
workerFibers; // All fibers created by this worker.
FastRnd rng;
@@ -312,6 +365,14 @@
singleThreadedWorkers;
};
+template <typename Clock, typename Duration>
+void Scheduler::Fiber::yield_until(
+ const std::chrono::time_point<Clock, Duration>& timeout) {
+ using ToDuration = typename TimePoint::duration;
+ using ToClock = typename TimePoint::clock;
+ yield_until_sc(std::chrono::time_point_cast<ToDuration, ToClock>(timeout));
+}
+
Scheduler::Worker* Scheduler::Worker::getCurrent() {
return Worker::current;
}
diff --git a/src/event_test.cpp b/src/event_test.cpp
new file mode 100644
index 0000000..aa4fdbe
--- /dev/null
+++ b/src/event_test.cpp
@@ -0,0 +1,226 @@
+// Copyright 2019 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "marl/event.h"
+#include "marl/waitgroup.h"
+
+#include "marl_test.h"
+
+namespace std {
+namespace chrono {
+template <typename Rep, typename Period>
+std::ostream& operator<<(std::ostream& os, const duration<Rep, Period>& d) {
+ return os << chrono::duration_cast<chrono::microseconds>(d).count() << "ms";
+}
+} // namespace chrono
+} // namespace std
+
+TEST_P(WithBoundScheduler, EventIsSignalled) {
+ std::vector<marl::Event::Mode> modes = {marl::Event::Mode::Manual,
+ marl::Event::Mode::Auto};
+ for (auto mode : modes) {
+ auto event = marl::Event(mode);
+ ASSERT_EQ(event.isSignalled(), false);
+ event.signal();
+ ASSERT_EQ(event.isSignalled(), true);
+ ASSERT_EQ(event.isSignalled(), true);
+ event.clear();
+ ASSERT_EQ(event.isSignalled(), false);
+ }
+}
+
+TEST_P(WithBoundScheduler, EventAutoTest) {
+ auto event = marl::Event(marl::Event::Mode::Auto);
+ ASSERT_EQ(event.test(), false);
+ event.signal();
+ ASSERT_EQ(event.test(), true);
+ ASSERT_EQ(event.test(), false);
+}
+
+TEST_P(WithBoundScheduler, EventManualTest) {
+ auto event = marl::Event(marl::Event::Mode::Manual);
+ ASSERT_EQ(event.test(), false);
+ event.signal();
+ ASSERT_EQ(event.test(), true);
+ ASSERT_EQ(event.test(), true);
+}
+
+TEST_P(WithBoundScheduler, EventAutoWait) {
+ std::atomic<int> counter = {0};
+ auto event = marl::Event(marl::Event::Mode::Auto);
+ auto done = marl::Event(marl::Event::Mode::Auto);
+
+ for (int i = 0; i < 3; i++) {
+ marl::schedule([=, &counter] {
+ event.wait();
+ counter++;
+ done.signal();
+ });
+ }
+
+ ASSERT_EQ(counter.load(), 0);
+ event.signal();
+ done.wait();
+ ASSERT_EQ(counter.load(), 1);
+ event.signal();
+ done.wait();
+ ASSERT_EQ(counter.load(), 2);
+ event.signal();
+ done.wait();
+ ASSERT_EQ(counter.load(), 3);
+}
+
+TEST_P(WithBoundScheduler, EventManualWait) {
+ std::atomic<int> counter = {0};
+ auto event = marl::Event(marl::Event::Mode::Manual);
+ auto wg = marl::WaitGroup(3);
+ for (int i = 0; i < 3; i++) {
+ marl::schedule([=, &counter] {
+ event.wait();
+ counter++;
+ wg.done();
+ });
+ }
+ event.signal();
+ wg.wait();
+ ASSERT_EQ(counter.load(), 3);
+}
+
+TEST_P(WithBoundScheduler, EventSequence) {
+ std::vector<marl::Event::Mode> modes = {marl::Event::Mode::Manual,
+ marl::Event::Mode::Auto};
+ for (auto mode : modes) {
+ std::string sequence;
+ auto eventA = marl::Event(mode);
+ auto eventB = marl::Event(mode);
+ auto eventC = marl::Event(mode);
+ auto done = marl::Event(mode);
+ marl::schedule([=, &sequence] {
+ eventB.wait();
+ sequence += "B";
+ eventC.signal();
+ });
+ marl::schedule([=, &sequence] {
+ eventA.wait();
+ sequence += "A";
+ eventB.signal();
+ });
+ marl::schedule([=, &sequence] {
+ eventC.wait();
+ sequence += "C";
+ done.signal();
+ });
+ ASSERT_EQ(sequence, "");
+ eventA.signal();
+ done.wait();
+ ASSERT_EQ(sequence, "ABC");
+ }
+}
+
+TEST_P(WithBoundScheduler, EventWaitForUnblocked) {
+ auto event = marl::Event(marl::Event::Mode::Manual);
+ auto wg = marl::WaitGroup(1000);
+ for (int i = 0; i < 1000; i++) {
+ marl::schedule([=] {
+ defer(wg.done());
+ auto duration = std::chrono::seconds(10);
+ event.wait_for(duration);
+ });
+ }
+ event.signal(); // unblock
+ wg.wait();
+}
+
+TEST_P(WithBoundScheduler, EventWaitForTimeTaken) {
+ auto event = marl::Event(marl::Event::Mode::Auto);
+ auto wg = marl::WaitGroup(1000);
+ for (int i = 0; i < 1000; i++) {
+ marl::schedule([=] {
+ defer(wg.done());
+ auto duration = std::chrono::milliseconds(10);
+ auto start = std::chrono::system_clock::now();
+ auto triggered = event.wait_for(duration);
+ auto end = std::chrono::system_clock::now();
+ ASSERT_FALSE(triggered);
+ ASSERT_GE(end - start, duration);
+ });
+ }
+ wg.wait();
+}
+
+TEST_P(WithBoundScheduler, EventWaitUntilUnblocked) {
+ auto event = marl::Event(marl::Event::Mode::Manual);
+ auto wg = marl::WaitGroup(1000);
+ for (int i = 0; i < 1000; i++) {
+ marl::schedule([=] {
+ defer(wg.done());
+ auto duration = std::chrono::seconds(10);
+ auto start = std::chrono::system_clock::now();
+ event.wait_until(start + duration);
+ });
+ }
+ event.signal(); // unblock
+ wg.wait();
+}
+
+TEST_P(WithBoundScheduler, EventWaitUntilTimeTaken) {
+ auto event = marl::Event(marl::Event::Mode::Auto);
+ auto wg = marl::WaitGroup(1000);
+ for (int i = 0; i < 1000; i++) {
+ marl::schedule([=] {
+ defer(wg.done());
+ auto duration = std::chrono::milliseconds(10);
+ auto start = std::chrono::system_clock::now();
+ auto triggered = event.wait_until(start + duration);
+ auto end = std::chrono::system_clock::now();
+ ASSERT_FALSE(triggered);
+ ASSERT_GE(end - start, duration);
+ });
+ }
+ wg.wait();
+}
+
+// EventWaitStressTest spins up a whole lot of wait_fors(), unblocks them early,
+// and then let's all the workers go to idle before repeating.
+// This is testing to ensure that the scheduler handles timeouts correctly when
+// they are early-unblocked. Specifically, this is to test that fibers are
+// not double-placed into the idle or working lists.
+TEST_P(WithBoundScheduler, EventWaitStressTest) {
+ auto event = marl::Event(marl::Event::Mode::Manual);
+ for (int i = 0; i < 10; i++) {
+ auto wg = marl::WaitGroup(1000);
+ for (int j = 0; j < 1000; j++) {
+ marl::schedule([=] {
+ defer(wg.done());
+ event.wait_for(std::chrono::milliseconds(100));
+ });
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(50));
+ event.signal(); // unblock
+ wg.wait();
+ }
+}
+
+TEST_P(WithBoundScheduler, EventAny) {
+ for (int i = 0; i < 3; i++) {
+ std::vector<marl::Event> events = {
+ marl::Event(marl::Event::Mode::Auto),
+ marl::Event(marl::Event::Mode::Auto),
+ marl::Event(marl::Event::Mode::Auto),
+ };
+ auto any = marl::Event::any(events.begin(), events.end());
+ events[i].signal();
+ ASSERT_TRUE(any.isSignalled());
+ }
+}
\ No newline at end of file
diff --git a/src/memory.cpp b/src/memory.cpp
index 834c849..b9f6cc1 100644
--- a/src/memory.cpp
+++ b/src/memory.cpp
@@ -50,6 +50,48 @@
MARL_ASSERT(res == 0, "Failed to protect page at %p", addr);
}
} // anonymous namespace
+#elif defined(__Fuchsia__)
+#include <unistd.h>
+#include <zircon/process.h>
+#include <zircon/syscalls.h>
+namespace {
+// This was a static in pageSize(), but due to the following TSAN false-positive
+// bug, this has been moved out to a global.
+// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=68338
+const size_t kPageSize = sysconf(_SC_PAGESIZE);
+inline size_t pageSize() {
+ return kPageSize;
+}
+inline void* allocatePages(size_t count) {
+ auto length = count * kPageSize;
+ zx_handle_t vmo;
+ if (zx_vmo_create(length, 0, &vmo) != ZX_OK) {
+ return nullptr;
+ }
+ zx_vaddr_t reservation;
+ zx_status_t status =
+ zx_vmar_map(zx_vmar_root_self(), ZX_VM_PERM_READ | ZX_VM_PERM_WRITE, 0,
+ vmo, 0, length, &reservation);
+ zx_handle_close(vmo);
+ (void)status;
+ MARL_ASSERT(status == ZX_OK, "Failed to allocate %d pages", int(count));
+ return reinterpret_cast<void*>(reservation);
+}
+inline void freePages(void* ptr, size_t count) {
+ auto length = count * kPageSize;
+ zx_status_t status = zx_vmar_unmap(zx_vmar_root_self(),
+ reinterpret_cast<zx_vaddr_t>(ptr), length);
+ (void)status;
+ MARL_ASSERT(status == ZX_OK, "Failed to free %d pages at %p", int(count),
+ ptr);
+}
+inline void protectPage(void* addr) {
+ zx_status_t status = zx_vmar_protect(
+ zx_vmar_root_self(), 0, reinterpret_cast<zx_vaddr_t>(addr), kPageSize);
+ (void)status;
+ MARL_ASSERT(status == ZX_OK, "Failed to protect page at %p", addr);
+}
+} // anonymous namespace
#elif defined(_WIN32)
#define WIN32_LEAN_AND_MEAN 1
#include <Windows.h>
@@ -82,7 +124,6 @@
}
} // anonymous namespace
#else
-// TODO: Fuchsia support
#error "Page based allocation not implemented for this platform"
#endif
diff --git a/src/scheduler.cpp b/src/scheduler.cpp
index 7dd013f..d352bef 100644
--- a/src/scheduler.cpp
+++ b/src/scheduler.cpp
@@ -43,6 +43,14 @@
return out;
}
+template <typename T>
+inline T take(std::unordered_set<T>& set) {
+ auto it = set.begin();
+ auto out = std::move(*it);
+ set.erase(it);
+ return out;
+}
+
inline void nop() {
#if defined(_WIN32)
__nop();
@@ -216,7 +224,12 @@
void Scheduler::Fiber::yield() {
MARL_SCOPED_EVENT("YIELD");
- worker->yield(this);
+ worker->yield(this, nullptr);
+}
+
+void Scheduler::Fiber::yield_until_sc(const TimePoint& timeout) {
+ MARL_SCOPED_EVENT("YIELD_UNTIL");
+ worker->yield(this, &timeout);
}
void Scheduler::Fiber::switchTo(Fiber* to) {
@@ -241,6 +254,60 @@
}
////////////////////////////////////////////////////////////////////////////////
+// Scheduler::WaitingFibers
+////////////////////////////////////////////////////////////////////////////////
+Scheduler::WaitingFibers::operator bool() const {
+ return fibers.size() > 0;
+}
+
+Scheduler::Fiber* Scheduler::WaitingFibers::take(const TimePoint& timepoint) {
+ if (!*this) {
+ return nullptr;
+ }
+ auto it = timeouts.begin();
+ if (timepoint < it->timepoint) {
+ return nullptr;
+ }
+ auto fiber = it->fiber;
+ timeouts.erase(it);
+ auto deleted = fibers.erase(fiber) != 0;
+ (void)deleted;
+ MARL_ASSERT(deleted, "WaitingFibers::take() maps out of sync");
+ return fiber;
+}
+
+Scheduler::TimePoint Scheduler::WaitingFibers::next() const {
+ MARL_ASSERT(*this,
+ "WaitingFibers::next() called when there' no waiting fibers");
+ return timeouts.begin()->timepoint;
+}
+
+void Scheduler::WaitingFibers::add(const TimePoint& timepoint, Fiber* fiber) {
+ timeouts.emplace(Timeout{timepoint, fiber});
+ bool added = fibers.emplace(fiber, timepoint).second;
+ (void)added;
+ MARL_ASSERT(added, "WaitingFibers::add() fiber already waiting");
+}
+
+void Scheduler::WaitingFibers::erase(Fiber* fiber) {
+ auto it = fibers.find(fiber);
+ if (it != fibers.end()) {
+ auto timeout = it->second;
+ auto erased = timeouts.erase(Timeout{timeout, fiber}) != 0;
+ (void)erased;
+ MARL_ASSERT(erased, "WaitingFibers::erase() maps out of sync");
+ fibers.erase(it);
+ }
+}
+
+bool Scheduler::WaitingFibers::Timeout::operator<(const Timeout& o) const {
+ if (timepoint != o.timepoint) {
+ return timepoint < o.timepoint;
+ }
+ return fiber < o.fiber;
+}
+
+////////////////////////////////////////////////////////////////////////////////
// Scheduler::Worker
////////////////////////////////////////////////////////////////////////////////
thread_local Scheduler::Worker* Scheduler::Worker::current = nullptr;
@@ -296,15 +363,20 @@
}
}
-void Scheduler::Worker::yield(Fiber* from) {
- (void)from; // unreferenced parameter
+void Scheduler::Worker::yield(
+ Fiber* from,
+ const std::chrono::system_clock::time_point* timeout) {
MARL_ASSERT(currentFiber == from,
"Attempting to call yield from a non-current fiber");
// Current fiber is yielding as it is blocked.
- // First wait until there's something else this worker can do.
std::unique_lock<std::mutex> lock(work.mutex);
+ if (timeout != nullptr) {
+ work.waiting.add(*timeout, from);
+ }
+
+ // First wait until there's something else this worker can do.
waitForWork(lock);
if (work.fibers.size() > 0) {
@@ -332,6 +404,7 @@
void Scheduler::Worker::enqueue(Fiber* fiber) {
std::unique_lock<std::mutex> lock(work.mutex);
auto wasIdle = work.num == 0;
+ work.waiting.erase(fiber);
work.fibers.push(std::move(fiber));
work.num++;
lock.unlock();
@@ -385,7 +458,8 @@
Fiber::current()->id);
{
std::unique_lock<std::mutex> lock(work.mutex);
- work.added.wait(lock, [this] { return work.num > 0 || shutdown; });
+ work.added.wait(
+ lock, [this] { return work.num > 0 || work.waiting || shutdown; });
while (!shutdown || work.num > 0 || numBlockedFibers() > 0U) {
waitForWork(lock);
runUntilIdle(lock);
@@ -418,9 +492,25 @@
spinForWork();
lock.lock();
}
- work.added.wait(lock, [this] {
- return work.num > 0 || (shutdown && numBlockedFibers() == 0U);
- });
+
+ if (work.waiting) {
+ work.added.wait_until(lock, work.waiting.next(), [this] {
+ return work.num > 0 || (shutdown && numBlockedFibers() == 0U);
+ });
+ enqueueFiberTimeouts();
+ } else {
+ work.added.wait(lock, [this] {
+ return work.num > 0 || (shutdown && numBlockedFibers() == 0U);
+ });
+ }
+}
+
+_Requires_lock_held_(lock) void Scheduler::Worker::enqueueFiberTimeouts() {
+ auto now = std::chrono::system_clock::now();
+ while (auto fiber = work.waiting.take(now)) {
+ work.fibers.push(fiber);
+ work.num++;
+ }
}
void Scheduler::Worker::spinForWork() {
@@ -467,7 +557,11 @@
work.num--;
auto fiber = take(work.fibers);
lock.unlock();
- idleFibers.push(currentFiber);
+
+ auto added = idleFibers.emplace(currentFiber).second;
+ (void)added;
+ MARL_ASSERT(added, "fiber already idle");
+
switchToFiber(fiber);
lock.lock();
}
@@ -499,6 +593,8 @@
}
void Scheduler::Worker::switchToFiber(Fiber* to) {
+ MARL_ASSERT(to == mainFiber.get() || idleFibers.count(to) == 0,
+ "switching to idle fiber");
auto from = currentFiber;
currentFiber = to;
from->switchTo(to);
diff --git a/src/scheduler_test.cpp b/src/scheduler_test.cpp
index 982d9ab..bf40513 100644
--- a/src/scheduler_test.cpp
+++ b/src/scheduler_test.cpp
@@ -150,7 +150,7 @@
}
wg.wait();
- ASSERT_EQ(threads.size(), 8U);
+ ASSERT_LE(threads.size(), 8U);
ASSERT_EQ(threads.count(std::this_thread::get_id()), 0U);
scheduler->unbind();