Squashed 'third_party/marl/' changes from 49e4e3141..14e4d862a

14e4d862a Scheduler: Fix issues with fiber lists a timeouts.
57f41915d SwiftShader build fixes.
791187298 Add Event::any().
ecd5ab322 Implement yields with timeouts, wait_for() / wait_until()
6ba730d94 Update README.md
8348be4f0 Implement page-based functions for Fuchsia
5e512cd0c Fix condition logic of assert in TasksOnlyScheduledOnWorkerThreads.
37ae48f40 Add marl::Event.
a90725760 Update README.md (#49)

git-subtree-dir: third_party/marl
git-subtree-split: 14e4d862a959b831fd994a436e7c104c6fd19006
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3b63274..dfc57a5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -172,6 +172,7 @@
         ${MARL_SRC_DIR}/conditionvariable_test.cpp
         ${MARL_SRC_DIR}/containers_test.cpp
         ${MARL_SRC_DIR}/defer_test.cpp
+        ${MARL_SRC_DIR}/event_test.cpp
         ${MARL_SRC_DIR}/marl_test.cpp
         ${MARL_SRC_DIR}/marl_test.h
         ${MARL_SRC_DIR}/memory_test.cpp
diff --git a/README.md b/README.md
index 7c3ed94..8e847ca 100644
--- a/README.md
+++ b/README.md
@@ -10,17 +10,11 @@
 
 Marl supports Windows, macOS, Linux, Fuchsia and Android (arm, aarch64, ppc64 (ELFv2), x86 and x64).
 
-Marl has no dependencies on other libraries (with exception on googletest for building the optional unit tests).
-
-Marl is in early development and will have breaking API changes.
-
-**More documentation and examples coming soon.**
-
-Note: This is not an officially supported Google product
+Marl has no dependencies on other libraries (with an exception on googletest for building the optional unit tests).
 
 ## Building
 
-Marl contains a number of unit tests and examples which can be built using CMake.
+Marl contains many unit tests and examples that can be built using CMake.
 
 Unit tests require fetching the `googletest` external project, which can be done by typing the following in your terminal:
 
@@ -66,3 +60,7 @@
 ```cmake
 target_include_directories($<target> PRIVATE "${MARL_DIR}/include") # replace <target> with the name of your project's target
 ```
+
+---
+
+Note: This is not an officially supported Google product
diff --git a/include/marl/conditionvariable.h b/include/marl/conditionvariable.h
index ecb1de4..c5b6787 100644
--- a/include/marl/conditionvariable.h
+++ b/include/marl/conditionvariable.h
@@ -15,13 +15,14 @@
 #ifndef marl_condition_variable_h
 #define marl_condition_variable_h
 
-#include "containers.h"
 #include "debug.h"
+#include "defer.h"
 #include "scheduler.h"
 
 #include <atomic>
 #include <condition_variable>
 #include <mutex>
+#include <unordered_set>
 
 namespace marl {
 
@@ -43,11 +44,29 @@
   // wait() blocks the current fiber or thread until the predicate is satisfied
   // and the ConditionVariable is notified.
   template <typename Predicate>
-  inline void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
+  inline void wait(std::unique_lock<std::mutex>& lock, Predicate&& pred);
+
+  // wait_for() blocks the current fiber or thread until the predicate is
+  // satisfied, and the ConditionVariable is notified, or the timeout has been
+  // reached. Returns false if pred still evaluates to false after the timeout
+  // has been reached, otherwise true.
+  template <typename Rep, typename Period, typename Predicate>
+  bool wait_for(std::unique_lock<std::mutex>& lock,
+                const std::chrono::duration<Rep, Period>& duration,
+                Predicate&& pred);
+
+  // wait_until() blocks the current fiber or thread until the predicate is
+  // satisfied, and the ConditionVariable is notified, or the timeout has been
+  // reached. Returns false if pred still evaluates to false after the timeout
+  // has been reached, otherwise true.
+  template <typename Clock, typename Duration, typename Predicate>
+  bool wait_until(std::unique_lock<std::mutex>& lock,
+                  const std::chrono::time_point<Clock, Duration>& timeout,
+                  Predicate&& pred);
 
  private:
   std::mutex mutex;
-  containers::vector<Scheduler::Fiber*, 4> waiting;
+  std::unordered_set<Scheduler::Fiber*> waiting;
   std::condition_variable condition;
   std::atomic<int> numWaiting = {0};
   std::atomic<int> numWaitingOnCondition = {0};
@@ -58,12 +77,12 @@
     return;
   }
   std::unique_lock<std::mutex> lock(mutex);
-  if (waiting.size() > 0) {
-    auto fiber = waiting.back();
-    waiting.pop_back();
+  for (auto fiber : waiting) {
     fiber->schedule();
   }
+  waiting.clear();
   lock.unlock();
+
   if (numWaitingOnCondition > 0) {
     condition.notify_one();
   }
@@ -74,20 +93,20 @@
     return;
   }
   std::unique_lock<std::mutex> lock(mutex);
-  while (waiting.size() > 0) {
-    auto fiber = waiting.back();
-    waiting.pop_back();
+  for (auto fiber : waiting) {
     fiber->schedule();
   }
+  waiting.clear();
   lock.unlock();
+
   if (numWaitingOnCondition > 0) {
     condition.notify_all();
   }
 }
 
 template <typename Predicate>
-void ConditionVariable::wait(std::unique_lock<std::mutex>& dataLock,
-                             Predicate pred) {
+void ConditionVariable::wait(std::unique_lock<std::mutex>& lock,
+                             Predicate&& pred) {
   if (pred()) {
     return;
   }
@@ -97,23 +116,71 @@
     // Yield to let other tasks run that can unblock this fiber.
     while (!pred()) {
       mutex.lock();
-      waiting.push_back(fiber);
+      waiting.emplace(fiber);
       mutex.unlock();
 
-      dataLock.unlock();
+      lock.unlock();
       fiber->yield();
-      dataLock.lock();
+      lock.lock();
     }
   } else {
     // Currently running outside of the scheduler.
     // Delegate to the std::condition_variable.
     numWaitingOnCondition++;
-    condition.wait(dataLock, pred);
+    condition.wait(lock, pred);
     numWaitingOnCondition--;
   }
   numWaiting--;
 }
 
+template <typename Rep, typename Period, typename Predicate>
+bool ConditionVariable::wait_for(
+    std::unique_lock<std::mutex>& lock,
+    const std::chrono::duration<Rep, Period>& duration,
+    Predicate&& pred) {
+  return wait_until(lock, std::chrono::system_clock::now() + duration, pred);
+}
+
+template <typename Clock, typename Duration, typename Predicate>
+bool ConditionVariable::wait_until(
+    std::unique_lock<std::mutex>& lock,
+    const std::chrono::time_point<Clock, Duration>& timeout,
+    Predicate&& pred) {
+  if (pred()) {
+    return true;
+  }
+  numWaiting++;
+  defer(numWaiting--);
+
+  if (auto fiber = Scheduler::Fiber::current()) {
+    // Currently executing on a scheduler fiber.
+    // Yield to let other tasks run that can unblock this fiber.
+    while (!pred()) {
+      mutex.lock();
+      waiting.emplace(fiber);
+      mutex.unlock();
+
+      lock.unlock();
+      fiber->yield_until(timeout);
+      lock.lock();
+
+      if (std::chrono::system_clock::now() >= timeout) {
+        mutex.lock();
+        waiting.erase(fiber);
+        mutex.unlock();
+        return false;
+      }
+    }
+    return true;
+  } else {
+    // Currently running outside of the scheduler.
+    // Delegate to the std::condition_variable.
+    numWaitingOnCondition++;
+    defer(numWaitingOnCondition--);
+    return condition.wait_until(lock, timeout, pred);
+  }
+}
+
 }  // namespace marl
 
 #endif  // marl_condition_variable_h
diff --git a/include/marl/event.h b/include/marl/event.h
new file mode 100644
index 0000000..2c8078e
--- /dev/null
+++ b/include/marl/event.h
@@ -0,0 +1,245 @@
+// Copyright 2019 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef marl_event_h
+#define marl_event_h
+
+#include "conditionvariable.h"
+#include "containers.h"
+#include "memory.h"
+
+#include <chrono>
+
+namespace marl {
+
+// Event is a synchronization primitive used to block until a signal is raised.
+class Event {
+ public:
+  enum class Mode {
+    // The event signal will be automatically reset when a call to wait()
+    // returns.
+    // A single call to signal() will only unblock a single (possibly
+    // future) call to wait().
+    Auto,
+
+    // While the event is in the signaled state, any calls to wait() will
+    // unblock without automatically reseting the signaled state.
+    // The signaled state can be reset with a call to clear().
+    Manual
+  };
+
+  inline Event(Mode mode = Mode::Auto,
+               bool initialState = false,
+               Allocator* allocator = Allocator::Default);
+
+  // signal() signals the event, possibly unblocking a call to wait().
+  inline void signal() const;
+
+  // clear() clears the signaled state.
+  inline void clear() const;
+
+  // wait() blocks until the event is signaled.
+  // If the event was constructed with the Auto Mode, then only one
+  // call to wait() will unblock before returning, upon which the signalled
+  // state will be automatically cleared.
+  inline void wait() const;
+
+  // wait_for() blocks until the event is signaled, or the timeout has been
+  // reached.
+  // If the timeout was reached, then wait_for() return false.
+  // If the event is signalled and event was constructed with the Auto Mode,
+  // then only one call to wait() will unblock before returning, upon which the
+  // signalled state will be automatically cleared.
+  template <typename Rep, typename Period>
+  inline bool wait_for(
+      const std::chrono::duration<Rep, Period>& duration) const;
+
+  // wait_until() blocks until the event is signaled, or the timeout has been
+  // reached.
+  // If the timeout was reached, then wait_for() return false.
+  // If the event is signalled and event was constructed with the Auto Mode,
+  // then only one call to wait() will unblock before returning, upon which the
+  // signalled state will be automatically cleared.
+  template <typename Clock, typename Duration>
+  inline bool wait_until(
+      const std::chrono::time_point<Clock, Duration>& timeout) const;
+
+  // test() returns true if the event is signaled, otherwise false.
+  // If the event is signalled and was constructed with the Auto Mode
+  // then the signalled state will be automatically cleared upon returning.
+  inline bool test() const;
+
+  // isSignalled() returns true if the event is signaled, otherwise false.
+  // Unlike test() the signal is not automatically cleared when the event was
+  // constructed with the Auto Mode.
+  // Note: No lock is held after bool() returns, so the event state may
+  // immediately change after returning. Use with caution.
+  inline bool isSignalled() const;
+
+  // any returns an event that is automatically signalled whenever any of the
+  // events in the list are signalled.
+  template <typename Iterator>
+  inline static Event any(Mode mode,
+                          const Iterator& begin,
+                          const Iterator& end);
+
+  // any returns an event that is automatically signalled whenever any of the
+  // events in the list are signalled.
+  // This overload defaults to using the Auto mode.
+  template <typename Iterator>
+  inline static Event any(const Iterator& begin, const Iterator& end);
+
+ private:
+  struct Shared {
+    inline Shared(Mode mode, bool initialState);
+    inline void signal();
+    inline void wait();
+
+    template <typename Rep, typename Period>
+    inline bool wait_for(const std::chrono::duration<Rep, Period>& duration);
+
+    template <typename Clock, typename Duration>
+    inline bool wait_until(
+        const std::chrono::time_point<Clock, Duration>& timeout);
+
+    std::mutex mutex;
+    ConditionVariable cv;
+    const Mode mode;
+    bool signalled;
+    containers::vector<std::shared_ptr<Shared>, 2> deps;
+  };
+
+  const std::shared_ptr<Shared> shared;
+};
+
+Event::Shared::Shared(Mode mode, bool initialState)
+    : mode(mode), signalled(initialState) {}
+
+void Event::Shared::signal() {
+  std::unique_lock<std::mutex> lock(mutex);
+  if (signalled) {
+    return;
+  }
+  signalled = true;
+  if (mode == Mode::Auto) {
+    cv.notify_one();
+  } else {
+    cv.notify_all();
+  }
+  for (auto dep : deps) {
+    dep->signal();
+  }
+}
+
+void Event::Shared::wait() {
+  std::unique_lock<std::mutex> lock(mutex);
+  cv.wait(lock, [&] { return signalled; });
+  if (mode == Mode::Auto) {
+    signalled = false;
+  }
+}
+
+template <typename Rep, typename Period>
+bool Event::Shared::wait_for(
+    const std::chrono::duration<Rep, Period>& duration) {
+  std::unique_lock<std::mutex> lock(mutex);
+  if (!cv.wait_for(lock, duration, [&] { return signalled; })) {
+    return false;
+  }
+  if (mode == Mode::Auto) {
+    signalled = false;
+  }
+  return true;
+}
+
+template <typename Clock, typename Duration>
+bool Event::Shared::wait_until(
+    const std::chrono::time_point<Clock, Duration>& timeout) {
+  std::unique_lock<std::mutex> lock(mutex);
+  if (!cv.wait_until(lock, timeout, [&] { return signalled; })) {
+    return false;
+  }
+  if (mode == Mode::Auto) {
+    signalled = false;
+  }
+  return true;
+}
+
+Event::Event(Mode mode /* = Mode::Auto */,
+             bool initialState /* = false */,
+             Allocator* allocator /* = Allocator::Default */)
+    : shared(allocator->make_shared<Shared>(mode, initialState)) {}
+
+void Event::signal() const {
+  shared->signal();
+}
+
+void Event::clear() const {
+  std::unique_lock<std::mutex> lock(shared->mutex);
+  shared->signalled = false;
+}
+
+void Event::wait() const {
+  shared->wait();
+}
+
+template <typename Rep, typename Period>
+bool Event::wait_for(const std::chrono::duration<Rep, Period>& duration) const {
+  return shared->wait_for(duration);
+}
+
+template <typename Clock, typename Duration>
+bool Event::wait_until(
+    const std::chrono::time_point<Clock, Duration>& timeout) const {
+  return shared->wait_until(timeout);
+}
+
+bool Event::test() const {
+  std::unique_lock<std::mutex> lock(shared->mutex);
+  if (!shared->signalled) {
+    return false;
+  }
+  if (shared->mode == Mode::Auto) {
+    shared->signalled = false;
+  }
+  return true;
+}
+
+bool Event::isSignalled() const {
+  std::unique_lock<std::mutex> lock(shared->mutex);
+  return shared->signalled;
+}
+
+template <typename Iterator>
+Event Event::any(Mode mode, const Iterator& begin, const Iterator& end) {
+  Event any(mode, false);
+  for (auto it = begin; it != end; it++) {
+    auto s = it->shared;
+    std::unique_lock<std::mutex> lock(s->mutex);
+    if (s->signalled) {
+      any.signal();
+    }
+    s->deps.push_back(any.shared);
+  }
+  return any;
+}
+
+template <typename Iterator>
+Event Event::any(const Iterator& begin, const Iterator& end) {
+  return any(Mode::Auto, begin, end);
+}
+
+}  // namespace marl
+
+#endif  // marl_event_h
diff --git a/include/marl/scheduler.h b/include/marl/scheduler.h
index 6490228..ffcddc3 100644
--- a/include/marl/scheduler.h
+++ b/include/marl/scheduler.h
@@ -21,12 +21,16 @@
 
 #include <array>
 #include <atomic>
+#include <chrono>
 #include <condition_variable>
 #include <functional>
+#include <map>
 #include <mutex>
 #include <queue>
+#include <set>
 #include <thread>
 #include <unordered_map>
+#include <unordered_set>
 
 namespace marl {
 
@@ -45,6 +49,8 @@
   class Worker;
 
  public:
+  using TimePoint = std::chrono::system_clock::time_point;
+
   Scheduler(Allocator* allocator = Allocator::Default);
   ~Scheduler();
 
@@ -101,6 +107,14 @@
     // yield() must only be called on the currently executing fiber.
     void yield();
 
+    // yield_until() suspends execution of this Fiber, allowing the thread to
+    // work on other tasks. yield_until() may automatically resume sometime
+    // after timeout.
+    // yield_until() must only be called on the currently executing fiber.
+    template <typename Clock, typename Duration>
+    inline void yield_until(
+        const std::chrono::time_point<Clock, Duration>& timeout);
+
     // schedule() reschedules the suspended Fiber for execution.
     void schedule();
 
@@ -113,6 +127,8 @@
 
     Fiber(Allocator::unique_ptr<OSFiber>&&, uint32_t id);
 
+    void yield_until_sc(const TimePoint& timeout);
+
     // switchTo() switches execution to the given fiber.
     // switchTo() must only be called on the currently executing fiber.
     void switchTo(Fiber*);
@@ -143,10 +159,40 @@
   // Maximum number of worker threads.
   static constexpr size_t MaxWorkerThreads = 256;
 
+  // WaitingFibers holds all the fibers waiting on a timeout.
+  struct WaitingFibers {
+    // operator bool() returns true iff there are any wait fibers.
+    inline operator bool() const;
+
+    // take() returns the next fiber that has exceeded its timeout, or nullptr
+    // if there are no fibers that have yet exceeded their timeouts.
+    inline Fiber* take(const TimePoint& timepoint);
+
+    // next() returns the timepoint of the next fiber to timeout.
+    // next() can only be called if operator bool() returns true.
+    inline TimePoint next() const;
+
+    // add() adds another fiber and timeout to the list of waiting fibers.
+    inline void add(const TimePoint& timeout, Fiber* fiber);
+
+    // erase() removes the fiber from the waiting list.
+    inline void erase(Fiber* fiber);
+
+   private:
+    struct Timeout {
+      TimePoint timepoint;
+      Fiber* fiber;
+      inline bool operator<(const Timeout&) const;
+    };
+    std::set<Timeout> timeouts;
+    std::unordered_map<Fiber*, TimePoint> fibers;
+  };
+
   // TODO: Implement a queue that recycles elements to reduce number of
   // heap allocations.
   using TaskQueue = std::queue<Task>;
   using FiberQueue = std::queue<Fiber*>;
+  using FiberSet = std::unordered_set<Fiber*>;
 
   // Workers executes Tasks on a single thread.
   // Once a task is started, it may yield to other tasks on the same Worker.
@@ -172,7 +218,9 @@
 
     // yield() suspends execution of the current task, and looks for other
     // tasks to start or continue execution.
-    void yield(Fiber* fiber);
+    // If timeout is not nullptr, yield may automatically resume the current
+    // task sometime after timeout.
+    void yield(Fiber* fiber, const TimePoint* timeout);
 
     // enqueue(Fiber*) enqueues resuming of a suspended fiber.
     void enqueue(Fiber* fiber);
@@ -236,6 +284,10 @@
     // frequently putting the thread to sleep and re-waking.
     void spinForWork();
 
+    // enqueueFiberTimeouts() enqueues all the fibers that have finished
+    // waiting.
+    _Requires_lock_held_(lock) void enqueueFiberTimeouts();
+
     // numBlockedFibers() returns the number of fibers currently blocked and
     // held externally.
     _Requires_lock_held_(lock) inline size_t numBlockedFibers() const {
@@ -247,6 +299,7 @@
       std::atomic<uint64_t> num = {0};  // tasks.size() + fibers.size()
       TaskQueue tasks;                  // guarded by mutex
       FiberQueue fibers;                // guarded by mutex
+      WaitingFibers waiting;            // guarded by mutex
       std::condition_variable added;
       std::mutex mutex;
     };
@@ -274,7 +327,7 @@
     Fiber* currentFiber = nullptr;
     std::thread thread;
     Work work;
-    FiberQueue idleFibers;  // Fibers that have completed which can be reused.
+    FiberSet idleFibers;  // Fibers that have completed which can be reused.
     std::vector<Allocator::unique_ptr<Fiber>>
         workerFibers;  // All fibers created by this worker.
     FastRnd rng;
@@ -312,6 +365,14 @@
       singleThreadedWorkers;
 };
 
+template <typename Clock, typename Duration>
+void Scheduler::Fiber::yield_until(
+    const std::chrono::time_point<Clock, Duration>& timeout) {
+  using ToDuration = typename TimePoint::duration;
+  using ToClock = typename TimePoint::clock;
+  yield_until_sc(std::chrono::time_point_cast<ToDuration, ToClock>(timeout));
+}
+
 Scheduler::Worker* Scheduler::Worker::getCurrent() {
   return Worker::current;
 }
diff --git a/src/event_test.cpp b/src/event_test.cpp
new file mode 100644
index 0000000..aa4fdbe
--- /dev/null
+++ b/src/event_test.cpp
@@ -0,0 +1,226 @@
+// Copyright 2019 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "marl/event.h"
+#include "marl/waitgroup.h"
+
+#include "marl_test.h"
+
+namespace std {
+namespace chrono {
+template <typename Rep, typename Period>
+std::ostream& operator<<(std::ostream& os, const duration<Rep, Period>& d) {
+  return os << chrono::duration_cast<chrono::microseconds>(d).count() << "ms";
+}
+}  // namespace chrono
+}  // namespace std
+
+TEST_P(WithBoundScheduler, EventIsSignalled) {
+  std::vector<marl::Event::Mode> modes = {marl::Event::Mode::Manual,
+                                          marl::Event::Mode::Auto};
+  for (auto mode : modes) {
+    auto event = marl::Event(mode);
+    ASSERT_EQ(event.isSignalled(), false);
+    event.signal();
+    ASSERT_EQ(event.isSignalled(), true);
+    ASSERT_EQ(event.isSignalled(), true);
+    event.clear();
+    ASSERT_EQ(event.isSignalled(), false);
+  }
+}
+
+TEST_P(WithBoundScheduler, EventAutoTest) {
+  auto event = marl::Event(marl::Event::Mode::Auto);
+  ASSERT_EQ(event.test(), false);
+  event.signal();
+  ASSERT_EQ(event.test(), true);
+  ASSERT_EQ(event.test(), false);
+}
+
+TEST_P(WithBoundScheduler, EventManualTest) {
+  auto event = marl::Event(marl::Event::Mode::Manual);
+  ASSERT_EQ(event.test(), false);
+  event.signal();
+  ASSERT_EQ(event.test(), true);
+  ASSERT_EQ(event.test(), true);
+}
+
+TEST_P(WithBoundScheduler, EventAutoWait) {
+  std::atomic<int> counter = {0};
+  auto event = marl::Event(marl::Event::Mode::Auto);
+  auto done = marl::Event(marl::Event::Mode::Auto);
+
+  for (int i = 0; i < 3; i++) {
+    marl::schedule([=, &counter] {
+      event.wait();
+      counter++;
+      done.signal();
+    });
+  }
+
+  ASSERT_EQ(counter.load(), 0);
+  event.signal();
+  done.wait();
+  ASSERT_EQ(counter.load(), 1);
+  event.signal();
+  done.wait();
+  ASSERT_EQ(counter.load(), 2);
+  event.signal();
+  done.wait();
+  ASSERT_EQ(counter.load(), 3);
+}
+
+TEST_P(WithBoundScheduler, EventManualWait) {
+  std::atomic<int> counter = {0};
+  auto event = marl::Event(marl::Event::Mode::Manual);
+  auto wg = marl::WaitGroup(3);
+  for (int i = 0; i < 3; i++) {
+    marl::schedule([=, &counter] {
+      event.wait();
+      counter++;
+      wg.done();
+    });
+  }
+  event.signal();
+  wg.wait();
+  ASSERT_EQ(counter.load(), 3);
+}
+
+TEST_P(WithBoundScheduler, EventSequence) {
+  std::vector<marl::Event::Mode> modes = {marl::Event::Mode::Manual,
+                                          marl::Event::Mode::Auto};
+  for (auto mode : modes) {
+    std::string sequence;
+    auto eventA = marl::Event(mode);
+    auto eventB = marl::Event(mode);
+    auto eventC = marl::Event(mode);
+    auto done = marl::Event(mode);
+    marl::schedule([=, &sequence] {
+      eventB.wait();
+      sequence += "B";
+      eventC.signal();
+    });
+    marl::schedule([=, &sequence] {
+      eventA.wait();
+      sequence += "A";
+      eventB.signal();
+    });
+    marl::schedule([=, &sequence] {
+      eventC.wait();
+      sequence += "C";
+      done.signal();
+    });
+    ASSERT_EQ(sequence, "");
+    eventA.signal();
+    done.wait();
+    ASSERT_EQ(sequence, "ABC");
+  }
+}
+
+TEST_P(WithBoundScheduler, EventWaitForUnblocked) {
+  auto event = marl::Event(marl::Event::Mode::Manual);
+  auto wg = marl::WaitGroup(1000);
+  for (int i = 0; i < 1000; i++) {
+    marl::schedule([=] {
+      defer(wg.done());
+      auto duration = std::chrono::seconds(10);
+      event.wait_for(duration);
+    });
+  }
+  event.signal();  // unblock
+  wg.wait();
+}
+
+TEST_P(WithBoundScheduler, EventWaitForTimeTaken) {
+  auto event = marl::Event(marl::Event::Mode::Auto);
+  auto wg = marl::WaitGroup(1000);
+  for (int i = 0; i < 1000; i++) {
+    marl::schedule([=] {
+      defer(wg.done());
+      auto duration = std::chrono::milliseconds(10);
+      auto start = std::chrono::system_clock::now();
+      auto triggered = event.wait_for(duration);
+      auto end = std::chrono::system_clock::now();
+      ASSERT_FALSE(triggered);
+      ASSERT_GE(end - start, duration);
+    });
+  }
+  wg.wait();
+}
+
+TEST_P(WithBoundScheduler, EventWaitUntilUnblocked) {
+  auto event = marl::Event(marl::Event::Mode::Manual);
+  auto wg = marl::WaitGroup(1000);
+  for (int i = 0; i < 1000; i++) {
+    marl::schedule([=] {
+      defer(wg.done());
+      auto duration = std::chrono::seconds(10);
+      auto start = std::chrono::system_clock::now();
+      event.wait_until(start + duration);
+    });
+  }
+  event.signal();  // unblock
+  wg.wait();
+}
+
+TEST_P(WithBoundScheduler, EventWaitUntilTimeTaken) {
+  auto event = marl::Event(marl::Event::Mode::Auto);
+  auto wg = marl::WaitGroup(1000);
+  for (int i = 0; i < 1000; i++) {
+    marl::schedule([=] {
+      defer(wg.done());
+      auto duration = std::chrono::milliseconds(10);
+      auto start = std::chrono::system_clock::now();
+      auto triggered = event.wait_until(start + duration);
+      auto end = std::chrono::system_clock::now();
+      ASSERT_FALSE(triggered);
+      ASSERT_GE(end - start, duration);
+    });
+  }
+  wg.wait();
+}
+
+// EventWaitStressTest spins up a whole lot of wait_fors(), unblocks them early,
+// and then let's all the workers go to idle before repeating.
+// This is testing to ensure that the scheduler handles timeouts correctly when
+// they are early-unblocked. Specifically, this is to test that fibers are
+// not double-placed into the idle or working lists.
+TEST_P(WithBoundScheduler, EventWaitStressTest) {
+  auto event = marl::Event(marl::Event::Mode::Manual);
+  for (int i = 0; i < 10; i++) {
+    auto wg = marl::WaitGroup(1000);
+    for (int j = 0; j < 1000; j++) {
+      marl::schedule([=] {
+        defer(wg.done());
+        event.wait_for(std::chrono::milliseconds(100));
+      });
+    }
+    std::this_thread::sleep_for(std::chrono::milliseconds(50));
+    event.signal();  // unblock
+    wg.wait();
+  }
+}
+
+TEST_P(WithBoundScheduler, EventAny) {
+  for (int i = 0; i < 3; i++) {
+    std::vector<marl::Event> events = {
+        marl::Event(marl::Event::Mode::Auto),
+        marl::Event(marl::Event::Mode::Auto),
+        marl::Event(marl::Event::Mode::Auto),
+    };
+    auto any = marl::Event::any(events.begin(), events.end());
+    events[i].signal();
+    ASSERT_TRUE(any.isSignalled());
+  }
+}
\ No newline at end of file
diff --git a/src/memory.cpp b/src/memory.cpp
index 834c849..b9f6cc1 100644
--- a/src/memory.cpp
+++ b/src/memory.cpp
@@ -50,6 +50,48 @@
   MARL_ASSERT(res == 0, "Failed to protect page at %p", addr);
 }
 }  // anonymous namespace
+#elif defined(__Fuchsia__)
+#include <unistd.h>
+#include <zircon/process.h>
+#include <zircon/syscalls.h>
+namespace {
+// This was a static in pageSize(), but due to the following TSAN false-positive
+// bug, this has been moved out to a global.
+// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=68338
+const size_t kPageSize = sysconf(_SC_PAGESIZE);
+inline size_t pageSize() {
+  return kPageSize;
+}
+inline void* allocatePages(size_t count) {
+  auto length = count * kPageSize;
+  zx_handle_t vmo;
+  if (zx_vmo_create(length, 0, &vmo) != ZX_OK) {
+    return nullptr;
+  }
+  zx_vaddr_t reservation;
+  zx_status_t status =
+      zx_vmar_map(zx_vmar_root_self(), ZX_VM_PERM_READ | ZX_VM_PERM_WRITE, 0,
+                  vmo, 0, length, &reservation);
+  zx_handle_close(vmo);
+  (void)status;
+  MARL_ASSERT(status == ZX_OK, "Failed to allocate %d pages", int(count));
+  return reinterpret_cast<void*>(reservation);
+}
+inline void freePages(void* ptr, size_t count) {
+  auto length = count * kPageSize;
+  zx_status_t status = zx_vmar_unmap(zx_vmar_root_self(),
+                                     reinterpret_cast<zx_vaddr_t>(ptr), length);
+  (void)status;
+  MARL_ASSERT(status == ZX_OK, "Failed to free %d pages at %p", int(count),
+              ptr);
+}
+inline void protectPage(void* addr) {
+  zx_status_t status = zx_vmar_protect(
+      zx_vmar_root_self(), 0, reinterpret_cast<zx_vaddr_t>(addr), kPageSize);
+  (void)status;
+  MARL_ASSERT(status == ZX_OK, "Failed to protect page at %p", addr);
+}
+}  // anonymous namespace
 #elif defined(_WIN32)
 #define WIN32_LEAN_AND_MEAN 1
 #include <Windows.h>
@@ -82,7 +124,6 @@
 }
 }  // anonymous namespace
 #else
-// TODO: Fuchsia support
 #error "Page based allocation not implemented for this platform"
 #endif
 
diff --git a/src/scheduler.cpp b/src/scheduler.cpp
index 7dd013f..d352bef 100644
--- a/src/scheduler.cpp
+++ b/src/scheduler.cpp
@@ -43,6 +43,14 @@
   return out;
 }
 
+template <typename T>
+inline T take(std::unordered_set<T>& set) {
+  auto it = set.begin();
+  auto out = std::move(*it);
+  set.erase(it);
+  return out;
+}
+
 inline void nop() {
 #if defined(_WIN32)
   __nop();
@@ -216,7 +224,12 @@
 
 void Scheduler::Fiber::yield() {
   MARL_SCOPED_EVENT("YIELD");
-  worker->yield(this);
+  worker->yield(this, nullptr);
+}
+
+void Scheduler::Fiber::yield_until_sc(const TimePoint& timeout) {
+  MARL_SCOPED_EVENT("YIELD_UNTIL");
+  worker->yield(this, &timeout);
 }
 
 void Scheduler::Fiber::switchTo(Fiber* to) {
@@ -241,6 +254,60 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+// Scheduler::WaitingFibers
+////////////////////////////////////////////////////////////////////////////////
+Scheduler::WaitingFibers::operator bool() const {
+  return fibers.size() > 0;
+}
+
+Scheduler::Fiber* Scheduler::WaitingFibers::take(const TimePoint& timepoint) {
+  if (!*this) {
+    return nullptr;
+  }
+  auto it = timeouts.begin();
+  if (timepoint < it->timepoint) {
+    return nullptr;
+  }
+  auto fiber = it->fiber;
+  timeouts.erase(it);
+  auto deleted = fibers.erase(fiber) != 0;
+  (void)deleted;
+  MARL_ASSERT(deleted, "WaitingFibers::take() maps out of sync");
+  return fiber;
+}
+
+Scheduler::TimePoint Scheduler::WaitingFibers::next() const {
+  MARL_ASSERT(*this,
+              "WaitingFibers::next() called when there' no waiting fibers");
+  return timeouts.begin()->timepoint;
+}
+
+void Scheduler::WaitingFibers::add(const TimePoint& timepoint, Fiber* fiber) {
+  timeouts.emplace(Timeout{timepoint, fiber});
+  bool added = fibers.emplace(fiber, timepoint).second;
+  (void)added;
+  MARL_ASSERT(added, "WaitingFibers::add() fiber already waiting");
+}
+
+void Scheduler::WaitingFibers::erase(Fiber* fiber) {
+  auto it = fibers.find(fiber);
+  if (it != fibers.end()) {
+    auto timeout = it->second;
+    auto erased = timeouts.erase(Timeout{timeout, fiber}) != 0;
+    (void)erased;
+    MARL_ASSERT(erased, "WaitingFibers::erase() maps out of sync");
+    fibers.erase(it);
+  }
+}
+
+bool Scheduler::WaitingFibers::Timeout::operator<(const Timeout& o) const {
+  if (timepoint != o.timepoint) {
+    return timepoint < o.timepoint;
+  }
+  return fiber < o.fiber;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 // Scheduler::Worker
 ////////////////////////////////////////////////////////////////////////////////
 thread_local Scheduler::Worker* Scheduler::Worker::current = nullptr;
@@ -296,15 +363,20 @@
   }
 }
 
-void Scheduler::Worker::yield(Fiber* from) {
-  (void)from;  // unreferenced parameter
+void Scheduler::Worker::yield(
+    Fiber* from,
+    const std::chrono::system_clock::time_point* timeout) {
   MARL_ASSERT(currentFiber == from,
               "Attempting to call yield from a non-current fiber");
 
   // Current fiber is yielding as it is blocked.
 
-  // First wait until there's something else this worker can do.
   std::unique_lock<std::mutex> lock(work.mutex);
+  if (timeout != nullptr) {
+    work.waiting.add(*timeout, from);
+  }
+
+  // First wait until there's something else this worker can do.
   waitForWork(lock);
 
   if (work.fibers.size() > 0) {
@@ -332,6 +404,7 @@
 void Scheduler::Worker::enqueue(Fiber* fiber) {
   std::unique_lock<std::mutex> lock(work.mutex);
   auto wasIdle = work.num == 0;
+  work.waiting.erase(fiber);
   work.fibers.push(std::move(fiber));
   work.num++;
   lock.unlock();
@@ -385,7 +458,8 @@
                        Fiber::current()->id);
       {
         std::unique_lock<std::mutex> lock(work.mutex);
-        work.added.wait(lock, [this] { return work.num > 0 || shutdown; });
+        work.added.wait(
+            lock, [this] { return work.num > 0 || work.waiting || shutdown; });
         while (!shutdown || work.num > 0 || numBlockedFibers() > 0U) {
           waitForWork(lock);
           runUntilIdle(lock);
@@ -418,9 +492,25 @@
     spinForWork();
     lock.lock();
   }
-  work.added.wait(lock, [this] {
-    return work.num > 0 || (shutdown && numBlockedFibers() == 0U);
-  });
+
+  if (work.waiting) {
+    work.added.wait_until(lock, work.waiting.next(), [this] {
+      return work.num > 0 || (shutdown && numBlockedFibers() == 0U);
+    });
+    enqueueFiberTimeouts();
+  } else {
+    work.added.wait(lock, [this] {
+      return work.num > 0 || (shutdown && numBlockedFibers() == 0U);
+    });
+  }
+}
+
+_Requires_lock_held_(lock) void Scheduler::Worker::enqueueFiberTimeouts() {
+  auto now = std::chrono::system_clock::now();
+  while (auto fiber = work.waiting.take(now)) {
+    work.fibers.push(fiber);
+    work.num++;
+  }
 }
 
 void Scheduler::Worker::spinForWork() {
@@ -467,7 +557,11 @@
       work.num--;
       auto fiber = take(work.fibers);
       lock.unlock();
-      idleFibers.push(currentFiber);
+
+      auto added = idleFibers.emplace(currentFiber).second;
+      (void)added;
+      MARL_ASSERT(added, "fiber already idle");
+
       switchToFiber(fiber);
       lock.lock();
     }
@@ -499,6 +593,8 @@
 }
 
 void Scheduler::Worker::switchToFiber(Fiber* to) {
+  MARL_ASSERT(to == mainFiber.get() || idleFibers.count(to) == 0,
+              "switching to idle fiber");
   auto from = currentFiber;
   currentFiber = to;
   from->switchTo(to);
diff --git a/src/scheduler_test.cpp b/src/scheduler_test.cpp
index 982d9ab..bf40513 100644
--- a/src/scheduler_test.cpp
+++ b/src/scheduler_test.cpp
@@ -150,7 +150,7 @@
   }
   wg.wait();
 
-  ASSERT_EQ(threads.size(), 8U);
+  ASSERT_LE(threads.size(), 8U);
   ASSERT_EQ(threads.count(std::this_thread::get_id()), 0U);
 
   scheduler->unbind();