+# `marl::Scheduler`
+Table of Contents:
+- [`marl::Scheduler`](#marlscheduler)
+ - [Binding](#binding)
+ - [Fibers](#fibers)
+ - [Tasks](#tasks)
+ - [Workers](#workers)
+ - [`marl::Scheduler::Worker::run()`](#marlschedulerworkerrun)
+ - [`marl::Scheduler::Worker::runUntilIdle()`](#marlschedulerworkerrununtilidle)
+ - [`marl::Scheduler::Worker::suspend()`](#marlschedulerworkersuspend)
+ - [`marl::Scheduler::Worker::waitForWork()`](#marlschedulerworkerwaitforwork)
+ - [`marl::Scheduler::Worker::spinForWork()`](#marlschedulerworkerspinforwork)
+ - [Worker Types](#worker-types)
+ - [Single-Threaded-Workers](#single-threaded-workers)
+ - [Multi-Threaded-Workers](#multi-threaded-workers)
+The `marl::Scheduler` is the most complex part of marl and is responsible for executing tasks and keeping threads running when tasks become blocked.
+This document describes the inner workings of the scheduler. This document is not intended to describe usage.
+## Binding
+The scheduler must be bound to each thread that calls `marl::schedule()`, and unbound from all threads before the scheduler is destructed.
+Binding is made using the `marl::Scheduler::bind()` and `marl::Scheduler::unbind()` methods.
+Binding assigns a thread-local storage variable so the scheduler is associated with the given thread. This serves two purposes:
+1. It allows `marl::schedule()` and the various synchronization primitives to be called without requiring a pointer to the `marl::Scheduler`.
+2. More importantly, it provides a way to get the currently executing fiber for the current thread. This is used by `marl::ConditionVariable::wait()` to suspend the current fiber and place it into a vector so the `marl::ConditionVariable::notify_`xxx`()` methods can reschedule the blocked fibers.
+Binding also creates an internal [Single-Threaded-Worker](#single-threaded-workers) for the calling thread. This worker is used for scheduling tasks when there are no [Multi-Threaded-Workers](#multi-threaded-workers) available. Unbinding will ensure that all scheduled tasks for the [Single-Threaded-Worker](#single-threaded-workers) are completed before returning.
+## Fibers
+A [fiber](https://en.wikipedia.org/wiki/Fiber_(computer_science)) is a lightweight cooperative thread, which can be suspended and resumed at explicit yield points.
+At the time of writing, there's no standard and cross-platform library for fibers or coroutines, so marl implements the `marl::OSFiber` class for each supported platform and ABI. Most of these implementations are written in assembly and simply save and restore the callee-saved registers along with maintaining an allocation for the fiber's stack. `marl::OSFiber` is an internal implementation detail, and is not exposed in the public API.
+`marl::Scheduler::Fiber` is the public fiber interface that is tightly coupled with the `marl::Scheduler`. The `marl::Scheduler::Fiber` has a simple `std::condition_variable` like interface.
+Each `marl::Scheduler::Fiber` is permanently associated with a `marl::Scheduler::Worker`, and is guaranteed to only ever be resumed on the same thread used to suspend.
+## Tasks
+A `marl::Task` is an alias to `std::function<void()>`, a function that takes no arguments, and returns no value.
+Tasks are scheduled using `marl::schedule()`, and are typically implemented as a lambda:
+marl::schedule([] {
+ printf("Hello world!\n");
+While the `marl::Task` signature takes no parameters, it is common to capture variables as part of this lambda for task inputs and outputs.
+All the marl synchronization primitives (with exception of `marl::ConditionVariable`) hold a shared pointer to internal state, and you are encouraged to capture these **by value**. This may seem counter-intuitive, but passing by reference can lead to memory corruption if the task outlives the stack used to call `marl::schedule()`. Maintaining a shared state object clearly has allocation and performance overheads, but it was decided that the safety outweighed the costs.
+marl::WaitGroup wg(1);
+marl::schedule([=] { // capture by value, not reference!
+ printf("Hello world!\n");
+ wg.done();
+## Workers
+The scheduler holds a number of `marl::Scheduler::Worker`s. Each worker holds:
+- `work.tasks` - A queue of tasks, yet to be started.
+- `work.fibers` - A queue of suspended fibers, ready to be resumed.
+- `work.waiting` - A queue of suspended fibers, waiting to be resumed or time out.
+- `idleFibers` - A set of idle fibers, ready to be reused.
+- `work.num` - A counter that is kept in sync with `work.tasks.size() + work.fibers.size()`.
+When a task is scheduled with a call to `marl::schedule()`, a worker is picked, and the task is placed on to the worker's `work.tasks` queue. The worker is picked using the following rules:
+- If the scheduler has no dedicated worker threads (`marl::Scheduler::getWorkerThreadCount() == 0`), then the task is queued on to the [Single-Threaded-Worker](#single-threaded-workers) for the currently executing thread.
+- Otherwise one of the [Multi-Threaded-Workers](#multi-threaded-workers) is picked. If any workers have entered a [spin-for-work](#marlschedulerworkerspinforwork) state, then these will be prioritized, otherwise a [Multi-Threaded-Worker](#multi-threaded-workers) is picked in a round-robin fashion.
+### `marl::Scheduler::Worker::run()`
+`run()` is the entry point for workers to execute their tasks. The logic is slightly different based on whether the worker is a [Single-Threaded-Worker](#single-threaded-workers) or a [Multi-Threaded-Worker](#multi-threaded-workers), but both share the same basic logic:
+- Enter a loop that only exits when the worker is shutdown.
+- In this loop call [`marl::Scheduler::Worker::runUntilIdle()`](#marl::Scheduler::Worker::runUntilIdle())
+### `marl::Scheduler::Worker::runUntilIdle()`
+As the name suggests, this function executes its work until there is no more work, or all work is blocked.
+The basic logic of this method is as follows:
+1. Resume any unblocked tasks (fibers)
+ `runUntilIdle()` begins by completing all fibers that are ready to be resumed (no longer blocked).
+ This is done by taking a fiber from the `work.fibers` queue, placing the current fiber into the `idleFibers` queue (this fiber is considered idle as it is looking for work), and switching the context over to the taken fiber.
+ Executing unblocked fibers is prioritized over starting new tasks. This is because new tasks may result in yet more fibers, and each fiber consumes a certain amount of memory (typically for stack).
+2. Start executing new tasks
+ Once all resumable fibers have been completed or have become re-blocked, new tasks are taken from the `work.tasks` queue, and are executed. Once a task is completed, control returns back to `runUntilIdle()`, and the main loop starts again from 1.
+3. Once there's no more fibers or tasks to execute, `runUntilIdle()` returns.
+### `marl::Scheduler::Worker::suspend()`
+Marl allows tasks to block, while keeping threads busy.
+If a task blocks, then `Scheduler::Worker::suspend()` is called. `suspend()` begins by calling [`Scheduler::Worker::waitForWork()`](#marl::Scheduler::Worker::waitForWork()), which blocks until there's a task or fiber that can be executed. Then, one of the following occurs:
+ 1. If there's any unblocked fibers, the fiber is taken from the `work.fibers` queue and is switched to.
+ 2. If there's any idle fibers, one is taken from the `idleFibers` set and is switched to. This idle fiber when resumed, will continue the role of executing tasks.
+ 3. If none of the above occurs, then a new fiber needs to be created to continue executing tasks. This fiber is created to begin execution in [`marl::Scheduler::Worker::run()`](#marl::Scheduler::Worker::run()), and is switched to.
+In all cases, the `suspend()` call switches to another fiber. When the suspended fiber is resumed, `suspend()` returns back to the caller.
+### `marl::Scheduler::Worker::waitForWork()`
+When a worker runs out of tasks to start and fibers to resume, `waitForWork()` is called to block until there's something for the worker to do.
+If the worker is a [Multi-Threaded-Worker](#multi-threaded-workers), `waitForWork()` begins by entering [`spinForWork()`](#marlschedulerworkerspinforwork), otherwise this stage is skipped.
+`waitForWork()` then waits for any of the following to occur before returning:
+- A fiber becoming ready to be resumed, by being enqueued on the `work.fibers` queue.
+- A task becoming enqueued on the `work.tasks` queue.
+- A fiber timing out in the `work.waiting` queue.
+- The worker being shutdown.
+Any fibers that have timed out in the `work.waiting` queue are automatically moved onto the `work.fibers` queue before returning.
+### `marl::Scheduler::Worker::spinForWork()`
+`spinForWork()` has two roles:
+1. It attempts to steal work from other workers to keep worker work-loads evenly balanced.
+ Task lengths can vary significntly in duration, and over time some workers can end up with a large queue of work, while others are starved. `spinForWork()` is only called when the worker is starved, and will attempt to steal tasks from randomly picked workers. Because fibers must only be executed on the same thread, only tasks, not fibers can be stolen.
+2. It attempts to avoid yielding the thread to the OS.
+ It is common to have a single task (provider) scheduling many small sub-tasks to the scheduler, which are evenly distributed to the workers (consumers). These consumers typically outnumber the providers, and it is easy to have the provider struggle to provide enough work to keep the consumers fully occupied.
+ In this situation, the workers can enter a loop where they are given a task, complete it, and end up waiting a short duration for more work. Allowing a worker thread to yield to the OS when waiting for another task (e.g. with `std::condition_variable::wait()`) can be costly in terms of performance. Depending on the platform, it may take a millisecond or more before the thread is resumed by the OS. A stall of this length can lead to significant stalls in the entire task dependency graph.
+`spinForWork()` contains a loop that runs for a short duration. In the body of the loop, the following is performed:
+- A tight loop of `nops` is used to keep the CPU busy, while periodically checking `work.num` to see if any new work has become available. If new work is found, `spinForWork()` returns immediately.
+- If no new work was scheduled, an attempt is made to steal a task from another random worker. If the steal was successful, `spinForWork()` returns immediately.
+- If the steal was unsuccessful, `std::this_thread::yield()` is called to prevent marl from starving the OS.
+## Worker Types
+A worker is created as either a Single-Threaded-Worker or Multi-Threaded-Worker.
+Most of the logic is the same between these two modes.
+The most significant difference between the STW and MTW is the behavior of the worker's entry point function - `marl::Scheduler::Worker::run()`.
+### Single-Threaded-Workers
+A single-threaded-worker (STW) is created for each thread that is bound with a call to `marl::Scheduler::bind()`.
+If the scheduler has no dedicated worker threads (`marl::Scheduler::getWorkerThreadCount() == 0`), then scheduled tasks are queued on to the STW for the currently executing thread.
+Because in this mode there are no worker threads, the tasks queued on the STW are not automatically background executed. Instead, tasks are only executed whenever there's a call to [`marl::Scheduler::Worker::suspend()`](#marlschedulerworkersuspend).
+The logic for `suspend()` is common for STWs and MTWs, and the first call will create a fiber which calls `marl::Scheduler::Worker::run()`.
+`marl::Scheduler::Worker::run()` is implemented for STWs as a loop that calls `marl::Scheduler::Worker::runUntilIdle()`, and then switches back to the main fiber. This loop only exits once the worker is shutdown.
+void SingleThreadedWorkerExample() {
+ marl::Scheduler scheduler;
+ scheduler.setWorkerThreadCount(0); // STW mode.
+ scheduler.bind();
+ defer(scheduler.unbind());
+ // Calling marl::schedule() enqueues the task on the STW, but does not
+ // execute it until the thread is blocked.
+ marl::Event done;
+ marl::schedule([=] {
+ done.signal();
+ });
+ // This is a blocking call.
+ // marl::Event::wait() (indirectly) calls marl::Scheduler::Worker::suspend().
+ // marl::Scheduler::Worker::suspend() creates and switches to a fiber which
+ // calls marl::Scheduler::Worker::run() to run all enqueued tasks. The fiber
+ // then places itself into idleFibers set and switches back to the main fiber
+ // to continue execution.
+ done.wait();
+### Multi-Threaded-Workers
+Multi-Threaded-Workers are created when `marl::Scheduler::setWorkerThreadCount()` is called with a positive number.
+Each MTW is paired with a new `std::thread` that calls `marl::Scheduler::Worker::run()`.
+`marl::Scheduler::Worker::run()` is implemented for MTWs as a loop that calls `marl::Scheduler::Worker::waitForWork()` followed by `marl::Scheduler::Worker::runUntilIdle()`. This loop only exits once the worker is shutdown.
diff --git a/examples/hello_task.cpp b/examples/hello_task.cpp
index 139406c..6dfff3c 100644
--- a/examples/hello_task.cpp
+++ b/examples/hello_task.cpp
@@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// Simple "hello world" example that uses marl::Event.
+// Simple "hello world" example that uses marl::Event and marl::WaitGroup.
#include "marl/defer.h"
#include "marl/event.h"
#include "marl/scheduler.h"
+#include "marl/waitgroup.h"
#include <cstdio>
@@ -28,31 +29,36 @@
defer(scheduler.unbind()); // Automatically unbind before returning.
- // Create an event that automatically resets itself.
- marl::Event sayHellow(marl::Event::Mode::Auto);
- marl::Event saidHellow(marl::Event::Mode::Auto);
+ constexpr int numTasks = 10;
+ // Create an event that is manually reset.
+ marl::Event sayHellow(marl::Event::Mode::Manual);
+ // Create a WaitGroup with an initial count of numTasks.
+ marl::WaitGroup saidHellow(numTasks);
// Schedule some tasks to run asynchronously.
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < numTasks; i++) {
// Each task will run on one of the 4 worker threads.
marl::schedule([=] { // All marl primitives are capture-by-value.
- printf("Task %d waiting to say hello!\n", i);
+ // Decrement the WaitGroup counter when the task has finished.
+ defer(saidHellow.done());
+ printf("Task %d waiting to say hello...\n", i);
// Blocking in a task?
// The scheduler will find something else for this thread to do.
printf("Hello from task %d!\n", i);
- saidHellow.signal();
- // Unblock the tasks one by one.
- for (int i = 0; i < 10; i++) {
- sayHellow.signal();
- saidHellow.wait();
- }
+ sayHellow.signal(); // Unblock all the tasks.
- // All tasks are guaranteed to completed before the scheduler is destructed.
+ saidHellow.wait(); // Wait for all tasks to complete.
+ printf("All tasks said hello.\n");
+ // All tasks are guaranteed to complete before the scheduler is destructed.
diff --git a/include/marl/conditionvariable.h b/include/marl/conditionvariable.h
index daa7c59..2eb7094 100644
--- a/include/marl/conditionvariable.h
+++ b/include/marl/conditionvariable.h
@@ -34,6 +34,8 @@
// thread will work on other tasks until the ConditionVariable is unblocked.
class ConditionVariable {
+ inline ConditionVariable();
// notify_one() notifies and potentially unblocks one waiting fiber or thread.
inline void notify_one();
@@ -65,6 +67,11 @@
Predicate&& pred);
+ ConditionVariable(const ConditionVariable&) = delete;
+ ConditionVariable(ConditionVariable&&) = delete;
+ ConditionVariable& operator=(const ConditionVariable&) = delete;
+ ConditionVariable& operator=(ConditionVariable&&) = delete;
std::mutex mutex;
std::unordered_set<Scheduler::Fiber*> waiting;
std::condition_variable condition;
@@ -72,6 +79,8 @@
std::atomic<int> numWaitingOnCondition = {0};
+ConditionVariable::ConditionVariable() {}
void ConditionVariable::notify_one() {
if (numWaiting == 0) {
diff --git a/include/marl/scheduler.h b/include/marl/scheduler.h
index 8f5532d..03d121c 100644
--- a/include/marl/scheduler.h
+++ b/include/marl/scheduler.h
@@ -43,6 +43,7 @@
// A scheduler can be bound to one or more threads using the bind() method.
// Once bound to a thread, that thread can call marl::schedule() to enqueue
// work tasks to be executed asynchronously.
+// All threads must be unbound with unbind() before the scheduler is destructed.
// Scheduler are initially constructed in single-threaded mode.
// Call setWorkerThreadCount() to spawn dedicated worker threads.
class Scheduler {
@@ -53,6 +54,10 @@
using Predicate = std::function<bool()>;
Scheduler(Allocator* allocator = Allocator::Default);
+ // Destructor.
+ // Ensure that all threads are unbound before calling - failure to do so may
+ // result in leaked memory.
// get() returns the scheduler bound to the current thread.
@@ -64,6 +69,8 @@
// unbind() unbinds the scheduler currently bound to the current thread.
// There must be a existing scheduler bound to the thread prior to calling.
+ // unbind() flushes any enqueued tasks on the single-threaded worker before
+ // returning.
static void unbind();
// enqueue() queues the task for asynchronous execution.
@@ -117,6 +124,7 @@
// will be locked before wait() returns.
// pred will be always be called with the lock held.
// wait() must only be called on the currently executing fiber.
+ _Requires_lock_held_(lock)
void wait(Lock& lock, const Predicate& pred);
// wait() suspends execution of this Fiber until the Fiber is woken up with
@@ -133,6 +141,7 @@
// will be locked before wait() returns.
// pred will be always be called with the lock held.
// wait() must only be called on the currently executing fiber.
+ _Requires_lock_held_(lock)
template <typename Clock, typename Duration>
inline bool wait(Lock& lock,
const std::chrono::time_point<Clock, Duration>& timeout,
@@ -201,6 +210,11 @@
+ Scheduler(const Scheduler&) = delete;
+ Scheduler(Scheduler&&) = delete;
+ Scheduler& operator=(const Scheduler&) = delete;
+ Scheduler& operator=(Scheduler&&) = delete;
// Stack size in bytes of a new fiber.
// TODO: Make configurable so the default size can be reduced.
static constexpr size_t FiberStackSize = 1024 * 1024;
@@ -271,6 +285,7 @@
// wait() suspends execution of the current task until the predicate pred
// returns true.
// See Fiber::wait() for more information.
+ _Requires_lock_held_(lock)
bool wait(Fiber::Lock& lock,
const TimePoint* timeout,
const Predicate& pred);
@@ -436,6 +451,7 @@
template <typename Clock, typename Duration>
bool Scheduler::Fiber::wait(
Lock& lock,
diff --git a/kokoro/macos/presubmit.sh b/kokoro/macos/presubmit.sh
index da16d77..f2009c8 100755
--- a/kokoro/macos/presubmit.sh
+++ b/kokoro/macos/presubmit.sh
@@ -13,7 +13,7 @@
mkdir build
cd build
make -j$(sysctl -n hw.logicalcpu)
diff --git a/kokoro/ubuntu/presubmit.sh b/kokoro/ubuntu/presubmit.sh
index 9e19fb0..b6bbe0a 100755
--- a/kokoro/ubuntu/presubmit.sh
+++ b/kokoro/ubuntu/presubmit.sh
@@ -14,7 +14,7 @@
cd build
build_and_run() {
make --jobs=$(nproc)
diff --git a/kokoro/windows/presubmit.bat b/kokoro/windows/presubmit.bat
index a80c0a3..1cb9caa 100644
--- a/kokoro/windows/presubmit.bat
+++ b/kokoro/windows/presubmit.bat
@@ -20,7 +20,7 @@
IF /I "%BUILD_SYSTEM%"=="cmake" (
%MSBUILD% /p:Configuration=%CONFIG% Marl.sln
diff --git a/src/conditionvariable_test.cpp b/src/conditionvariable_test.cpp
index 3cca0f8..2930e4d 100644
--- a/src/conditionvariable_test.cpp
+++ b/src/conditionvariable_test.cpp
@@ -13,9 +13,13 @@
// limitations under the License.
#include "marl/conditionvariable.h"
+#include "marl/waitgroup.h"
#include "marl_test.h"
+#include <condition_variable>
+#include <mutex>
TEST_F(WithoutBoundScheduler, ConditionVariable) {
bool trigger[3] = {false, false, false};
bool signal[3] = {false, false, false};
@@ -25,7 +29,11 @@
std::thread thread([&] {
for (int i = 0; i < 3; i++) {
std::unique_lock<std::mutex> lock(mutex);
- cv.wait(lock, [&] { return trigger[i]; });
+ cv.wait(lock, [&] {
+ EXPECT_TRUE(lock.owns_lock());
+ return trigger[i];
+ });
+ EXPECT_TRUE(lock.owns_lock());
signal[i] = true;
@@ -40,7 +48,11 @@
std::unique_lock<std::mutex> lock(mutex);
trigger[i] = true;
- cv.wait(lock, [&] { return signal[i]; });
+ cv.wait(lock, [&] {
+ EXPECT_TRUE(lock.owns_lock());
+ return signal[i];
+ });
+ EXPECT_TRUE(lock.owns_lock());
ASSERT_EQ(signal[0], 0 <= i);
@@ -60,7 +72,11 @@
std::thread thread([&] {
for (int i = 0; i < 3; i++) {
std::unique_lock<std::mutex> lock(mutex);
- cv.wait(lock, [&] { return trigger[i]; });
+ cv.wait(lock, [&] {
+ EXPECT_TRUE(lock.owns_lock());
+ return trigger[i];
+ });
+ EXPECT_TRUE(lock.owns_lock());
signal[i] = true;
@@ -75,7 +91,11 @@
std::unique_lock<std::mutex> lock(mutex);
trigger[i] = true;
- cv.wait(lock, [&] { return signal[i]; });
+ cv.wait(lock, [&] {
+ EXPECT_TRUE(lock.owns_lock());
+ return signal[i];
+ });
+ EXPECT_TRUE(lock.owns_lock());
ASSERT_EQ(signal[0], 0 <= i);
@@ -85,3 +105,39 @@
+// ConditionVariableTimeouts spins up a whole lot of wait_fors(), unblocking
+// some with timeouts and some with a notify, and then let's all the workers
+// go to idle before repeating.
+// This is testing to ensure that the scheduler handles timeouts correctly when
+// they are early-unblocked, along with expected lock state.
+TEST_P(WithBoundScheduler, ConditionVariableTimeouts) {
+ for (int i = 0; i < 10; i++) {
+ std::mutex mutex;
+ marl::ConditionVariable cv;
+ bool signaled = false; // guarded by mutex
+ auto wg = marl::WaitGroup(100);
+ for (int j = 0; j < 100; j++) {
+ marl::schedule([=, &mutex, &cv, &signaled] {
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ cv.wait_for(lock, std::chrono::milliseconds(j), [&] {
+ EXPECT_TRUE(lock.owns_lock());
+ return signaled;
+ });
+ EXPECT_TRUE(lock.owns_lock());
+ }
+ // Ensure the mutex unlock happens *before* the wg.done() call,
+ // otherwise the stack pointer may no longer be valid.
+ wg.done();
+ });
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(50));
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ signaled = true;
+ cv.notify_all();
+ }
+ wg.wait();
+ }
diff --git a/src/defer_bench.cpp b/src/defer_bench.cpp
index c89dc32..d826367 100644
--- a/src/defer_bench.cpp
+++ b/src/defer_bench.cpp
@@ -16,10 +16,12 @@
#include "benchmark/benchmark.h"
+volatile int do_not_optimize_away_result = 0;
static void Defer(benchmark::State& state) {
- int i = 0;
for (auto _ : state) {
- defer(benchmark::DoNotOptimize(i++));
+ // Avoid benchmark::DoNotOptimize() as this is unfairly slower on Windows.
+ defer(do_not_optimize_away_result++);
diff --git a/src/osfiber_ucontext.h b/src/osfiber_ucontext.h
index 664c790..20c3d2b 100644
--- a/src/osfiber_ucontext.h
+++ b/src/osfiber_ucontext.h
@@ -115,6 +115,7 @@
out->target = func;
auto res = getcontext(&out->context);
+ (void)res;
MARL_ASSERT(res == 0, "getcontext() returned %d", int(res));
out->context.uc_stack.ss_sp = out->stack.ptr;
out->context.uc_stack.ss_size = stackSize;
@@ -130,6 +131,7 @@
void OSFiber::switchTo(OSFiber* fiber) {
auto res = swapcontext(&context, &fiber->context);
+ (void)res;
MARL_ASSERT(res == 0, "swapcontext() returned %d", int(res));
diff --git a/src/scheduler.cpp b/src/scheduler.cpp
index cae3a6b..197dbb9 100644
--- a/src/scheduler.cpp
+++ b/src/scheduler.cpp
@@ -133,12 +133,17 @@
Scheduler::~Scheduler() {
std::unique_lock<std::mutex> lock(singleThreadedWorkerMutex);
MARL_ASSERT(singleThreadedWorkers.size() == 0,
"Scheduler still bound on %d threads",
+ // Release all worker threads.
+ // This will wait for all in-flight tasks to complete before returning.
@@ -401,6 +406,7 @@
bool Scheduler::Worker::wait(Fiber::Lock& waitLock,
const TimePoint* timeout,
const Predicate& pred) {
@@ -422,17 +428,20 @@
// Fiber resumed. We don't need the work mutex locked any more.
+ // Re-lock to either return due to timeout, or call pred().
+ waitLock.lock();
// Check timeout.
if (timeout != nullptr && std::chrono::system_clock::now() >= *timeout) {
return false;
- // Spurious wake up. Re-lock, spin again.
- waitLock.lock();
+ // Spurious wake up. Spin again.
return true;
void Scheduler::Worker::suspend(
const std::chrono::system_clock::time_point* timeout) {
// Current fiber is yielding as it is blocked.