Update Marl to ca8408f68

Contains a number of optimizations that improve the Subzero coroutine performance up to 10x.

Changes:
    ca8408f68 Scheduler: Reduce the number of mutex locks / unlock.
    575b61e76 Fix compilation of marl::Ticket::onCall()
    e9f312688 waitForWork(): Early out if there work.num > 0
    3196a0539 Scheduler: Use std::deque instead of std::queue
    08a820171 Add flags to marl::Task
    cb3c481d0 Scheduler: Use a separate flag to indicate whether to call notify()
    598c993ec marl::ConditionVariable - use containers::list
    d0c501a9c Add marl::containers::list
    aa1de9091 Benchmarks: Add EventBaton

Commands:
    git subtree pull --prefix third_party/marl https://github.com/google/marl master --squash

Bug: b/140546382
Change-Id: I2b7adc3c624a1f3aef686de7e0e88c52a5666e3a
diff --git a/third_party/marl/include/marl/conditionvariable.h b/third_party/marl/include/marl/conditionvariable.h
index 2eb7094..8dfb9c7 100644
--- a/third_party/marl/include/marl/conditionvariable.h
+++ b/third_party/marl/include/marl/conditionvariable.h
@@ -15,14 +15,15 @@
 #ifndef marl_condition_variable_h
 #define marl_condition_variable_h
 
+#include "containers.h"
 #include "debug.h"
 #include "defer.h"
+#include "memory.h"
 #include "scheduler.h"
 
 #include <atomic>
 #include <condition_variable>
 #include <mutex>
-#include <unordered_set>
 
 namespace marl {
 
@@ -34,7 +35,7 @@
 // thread will work on other tasks until the ConditionVariable is unblocked.
 class ConditionVariable {
  public:
-  inline ConditionVariable();
+  inline ConditionVariable(Allocator* allocator = Allocator::Default);
 
   // notify_one() notifies and potentially unblocks one waiting fiber or thread.
   inline void notify_one();
@@ -73,13 +74,15 @@
   ConditionVariable& operator=(ConditionVariable&&) = delete;
 
   std::mutex mutex;
-  std::unordered_set<Scheduler::Fiber*> waiting;
+  containers::list<Scheduler::Fiber*> waiting;
   std::condition_variable condition;
   std::atomic<int> numWaiting = {0};
   std::atomic<int> numWaitingOnCondition = {0};
 };
 
-ConditionVariable::ConditionVariable() {}
+ConditionVariable::ConditionVariable(
+    Allocator* allocator /* = Allocator::Default */)
+    : waiting(allocator) {}
 
 void ConditionVariable::notify_one() {
   if (numWaiting == 0) {
@@ -122,13 +125,13 @@
     // Currently executing on a scheduler fiber.
     // Yield to let other tasks run that can unblock this fiber.
     mutex.lock();
-    waiting.emplace(fiber);
+    auto it = waiting.emplace_front(fiber);
     mutex.unlock();
 
     fiber->wait(lock, pred);
 
     mutex.lock();
-    waiting.erase(fiber);
+    waiting.erase(it);
     mutex.unlock();
   } else {
     // Currently running outside of the scheduler.
@@ -163,13 +166,13 @@
     // Currently executing on a scheduler fiber.
     // Yield to let other tasks run that can unblock this fiber.
     mutex.lock();
-    waiting.emplace(fiber);
+    auto it = waiting.emplace_front(fiber);
     mutex.unlock();
 
     auto res = fiber->wait(lock, timeout, pred);
 
     mutex.lock();
-    waiting.erase(fiber);
+    waiting.erase(it);
     mutex.unlock();
 
     return res;
diff --git a/third_party/marl/include/marl/containers.h b/third_party/marl/include/marl/containers.h
index ef8c5c0..fcac46e 100644
--- a/third_party/marl/include/marl/containers.h
+++ b/third_party/marl/include/marl/containers.h
@@ -19,10 +19,9 @@
 #include "memory.h"
 
 #include <algorithm>  // std::max
+#include <cstddef>    // size_t
 #include <utility>    // std::move
 
-#include <cstddef>  // size_t
-
 namespace marl {
 namespace containers {
 
@@ -243,6 +242,207 @@
   }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// list<T, BASE_CAPACITY>
+////////////////////////////////////////////////////////////////////////////////
+
+// list is a minimal std::list like container that supports constant time
+// insertion and removal of elements.
+// list keeps hold of allocations (it only releases allocations on destruction),
+// to avoid repeated heap allocations and frees when frequently inserting and
+// removing elements.
+template <typename T>
+class list {
+  struct Entry {
+    T data;
+    Entry* next;
+    Entry* prev;
+  };
+
+ public:
+  class iterator {
+   public:
+    inline iterator(Entry*);
+    inline T* operator->();
+    inline T& operator*();
+    inline iterator& operator++();
+    inline bool operator==(const iterator&) const;
+    inline bool operator!=(const iterator&) const;
+
+   private:
+    friend list;
+    Entry* entry;
+  };
+
+  inline list(Allocator* allocator = Allocator::Default);
+  inline ~list();
+
+  inline iterator begin();
+  inline iterator end();
+  inline size_t size() const;
+
+  template <typename... Args>
+  iterator emplace_front(Args&&... args);
+  inline void erase(iterator);
+
+ private:
+  // copy / move is currently unsupported.
+  list(const list&) = delete;
+  list(list&&) = delete;
+  list& operator=(const list&) = delete;
+  list& operator=(list&&) = delete;
+
+  void grow(size_t count);
+
+  static void unlink(Entry* entry, Entry*& list);
+  static void link(Entry* entry, Entry*& list);
+
+  Allocator* const allocator;
+  size_t size_ = 0;
+  size_t capacity = 0;
+  vector<Allocation, 8> allocations;
+  Entry* free = nullptr;
+  Entry* head = nullptr;
+};
+
+template <typename T>
+list<T>::iterator::iterator(Entry* entry) : entry(entry) {}
+
+template <typename T>
+T* list<T>::iterator::operator->() {
+  return &entry->data;
+}
+
+template <typename T>
+T& list<T>::iterator::operator*() {
+  return entry->data;
+}
+
+template <typename T>
+typename list<T>::iterator& list<T>::iterator::operator++() {
+  entry = entry->next;
+  return *this;
+}
+
+template <typename T>
+bool list<T>::iterator::operator==(const iterator& rhs) const {
+  return entry == rhs.entry;
+}
+
+template <typename T>
+bool list<T>::iterator::operator!=(const iterator& rhs) const {
+  return entry != rhs.entry;
+}
+
+template <typename T>
+list<T>::list(Allocator* allocator /* = Allocator::Default */)
+    : allocator(allocator), allocations(allocator) {
+  grow(8);
+}
+
+template <typename T>
+list<T>::~list() {
+  for (auto el = head; el != nullptr; el = el->next) {
+    el->data.~T();
+  }
+  for (auto alloc : allocations) {
+    allocator->free(alloc);
+  }
+}
+
+template <typename T>
+typename list<T>::iterator list<T>::begin() {
+  return {head};
+}
+
+template <typename T>
+typename list<T>::iterator list<T>::end() {
+  return {nullptr};
+}
+
+template <typename T>
+size_t list<T>::size() const {
+  return size_;
+}
+
+template <typename T>
+template <typename... Args>
+typename list<T>::iterator list<T>::emplace_front(Args&&... args) {
+  if (free == nullptr) {
+    grow(capacity);
+  }
+
+  auto entry = free;
+
+  unlink(entry, free);
+  link(entry, head);
+
+  new (&entry->data) T(std::forward<T>(args)...);
+  size_++;
+
+  return entry;
+}
+
+template <typename T>
+void list<T>::erase(iterator it) {
+  auto entry = it.entry;
+  unlink(entry, head);
+  link(entry, free);
+
+  entry->data.~T();
+  size_--;
+}
+
+template <typename T>
+void list<T>::grow(size_t count) {
+  Allocation::Request request;
+  request.size = sizeof(Entry) * count;
+  request.alignment = alignof(Entry);
+  request.usage = Allocation::Usage::List;
+  auto alloc = allocator->allocate(request);
+
+  auto entries = reinterpret_cast<Entry*>(alloc.ptr);
+  for (size_t i = 0; i < count; i++) {
+    auto entry = &entries[i];
+    entry->prev = nullptr;
+    entry->next = free;
+    if (free) {
+      free->prev = entry;
+    }
+    free = entry;
+  }
+
+  allocations.emplace_back(std::move(alloc));
+
+  capacity += count;
+}
+
+template <typename T>
+void list<T>::unlink(Entry* entry, Entry*& list) {
+  if (list == entry) {
+    list = list->next;
+  }
+  if (entry->prev) {
+    entry->prev->next = entry->next;
+  }
+  if (entry->next) {
+    entry->next->prev = entry->prev;
+  }
+  entry->prev = nullptr;
+  entry->next = nullptr;
+}
+
+template <typename T>
+void list<T>::link(Entry* entry, Entry*& list) {
+  MARL_ASSERT(entry->next == nullptr, "link() called on entry already linked");
+  MARL_ASSERT(entry->prev == nullptr, "link() called on entry already linked");
+  if (list) {
+    entry->next = list;
+    list->prev = entry;
+  }
+  list = entry;
+}
+
 }  // namespace containers
 }  // namespace marl
 
diff --git a/third_party/marl/include/marl/event.h b/third_party/marl/include/marl/event.h
index 2c8078e..f7b3023 100644
--- a/third_party/marl/include/marl/event.h
+++ b/third_party/marl/include/marl/event.h
@@ -102,7 +102,7 @@
 
  private:
   struct Shared {
-    inline Shared(Mode mode, bool initialState);
+    inline Shared(Allocator* allocator, Mode mode, bool initialState);
     inline void signal();
     inline void wait();
 
@@ -123,8 +123,8 @@
   const std::shared_ptr<Shared> shared;
 };
 
-Event::Shared::Shared(Mode mode, bool initialState)
-    : mode(mode), signalled(initialState) {}
+Event::Shared::Shared(Allocator* allocator, Mode mode, bool initialState)
+    : cv(allocator), mode(mode), signalled(initialState) {}
 
 void Event::Shared::signal() {
   std::unique_lock<std::mutex> lock(mutex);
@@ -179,7 +179,7 @@
 Event::Event(Mode mode /* = Mode::Auto */,
              bool initialState /* = false */,
              Allocator* allocator /* = Allocator::Default */)
-    : shared(allocator->make_shared<Shared>(mode, initialState)) {}
+    : shared(allocator->make_shared<Shared>(allocator, mode, initialState)) {}
 
 void Event::signal() const {
   shared->signal();
diff --git a/third_party/marl/include/marl/memory.h b/third_party/marl/include/marl/memory.h
index d150fdf..8c35e01 100644
--- a/third_party/marl/include/marl/memory.h
+++ b/third_party/marl/include/marl/memory.h
@@ -38,7 +38,8 @@
     Undefined = 0,
     Stack,   // Fiber stack
     Create,  // Allocator::create(), make_unique(), make_shared()
-    Vector,  // marl::vector<T>
+    Vector,  // marl::containers::vector<T>
+    List,    // marl::containers::list<T>
     Count,   // Not intended to be used as a usage type - used for upper bound.
   };
 
diff --git a/third_party/marl/include/marl/pool.h b/third_party/marl/include/marl/pool.h
index 693ef3f..934fa40 100644
--- a/third_party/marl/include/marl/pool.h
+++ b/third_party/marl/include/marl/pool.h
@@ -232,7 +232,7 @@
  private:
   class Storage : public Pool<T>::Storage {
    public:
-    inline Storage();
+    inline Storage(Allocator* allocator);
     inline ~Storage();
     inline void return_(Item*) override;
 
@@ -245,7 +245,8 @@
 };
 
 template <typename T, int N, PoolPolicy POLICY>
-BoundedPool<T, N, POLICY>::Storage::Storage() {
+BoundedPool<T, N, POLICY>::Storage::Storage(Allocator* allocator)
+    : returned(allocator) {
   for (int i = 0; i < N; i++) {
     if (POLICY == PoolPolicy::Preserve) {
       items[i].construct();
@@ -267,7 +268,7 @@
 template <typename T, int N, PoolPolicy POLICY>
 BoundedPool<T, N, POLICY>::BoundedPool(
     Allocator* allocator /* = Allocator::Default */)
-    : storage(allocator->make_shared<Storage>()) {}
+    : storage(allocator->make_shared<Storage>(allocator)) {}
 
 template <typename T, int N, PoolPolicy POLICY>
 typename BoundedPool<T, N, POLICY>::Loan BoundedPool<T, N, POLICY>::borrow()
diff --git a/third_party/marl/include/marl/scheduler.h b/third_party/marl/include/marl/scheduler.h
index e94d035..92acbde 100644
--- a/third_party/marl/include/marl/scheduler.h
+++ b/third_party/marl/include/marl/scheduler.h
@@ -18,28 +18,27 @@
 #include "debug.h"
 #include "memory.h"
 #include "sal.h"
+#include "task.h"
 #include "thread.h"
 
 #include <array>
 #include <atomic>
 #include <chrono>
 #include <condition_variable>
+#include <deque>
 #include <functional>
 #include <map>
 #include <mutex>
-#include <queue>
 #include <set>
 #include <thread>
 #include <unordered_map>
 #include <unordered_set>
+#include <vector>
 
 namespace marl {
 
 class OSFiber;
 
-// Task is a unit of work for the scheduler.
-using Task = std::function<void()>;
-
 // Scheduler asynchronously processes Tasks.
 // A scheduler can be bound to one or more threads using the bind() method.
 // Once bound to a thread, that thread can call marl::schedule() to enqueue
@@ -257,8 +256,8 @@
 
   // TODO: Implement a queue that recycles elements to reduce number of
   // heap allocations.
-  using TaskQueue = std::queue<Task>;
-  using FiberQueue = std::queue<Fiber*>;
+  using TaskQueue = std::deque<Task>;
+  using FiberQueue = std::deque<Fiber*>;
   using FiberSet = std::unordered_set<Fiber*>;
 
   // Workers executes Tasks on a single thread.
@@ -318,9 +317,9 @@
     // flush() processes all pending tasks before returning.
     void flush();
 
-    // dequeue() attempts to take a Task from the worker. Returns true if
-    // a task was taken and assigned to out, otherwise false.
-    bool dequeue(Task& out);
+    // steal() attempts to steal a Task from the worker for another worker.
+    // Returns true if a task was taken and assigned to out, otherwise false.
+    bool steal(Task& out);
 
     // getCurrent() returns the Worker currently bound to the current
     // thread.
@@ -338,14 +337,17 @@
     // continue to process tasks until stop() is called.
     // If the worker was constructed in Mode::SingleThreaded, run() call
     // flush() and return.
+    _Requires_lock_held_(work.mutex)
     void run();
 
     // createWorkerFiber() creates a new fiber that when executed calls
     // run().
+    _Requires_lock_held_(work.mutex)
     Fiber* createWorkerFiber();
 
     // switchToFiber() switches execution to the given fiber. The fiber
     // must belong to this worker.
+    _Requires_lock_held_(work.mutex)
     void switchToFiber(Fiber*);
 
     // runUntilIdle() executes all pending tasks and then returns.
@@ -387,8 +389,13 @@
       _Guarded_by_(mutex) TaskQueue tasks;
       _Guarded_by_(mutex) FiberQueue fibers;
       _Guarded_by_(mutex) WaitingFibers waiting;
+      _Guarded_by_(mutex) bool notifyAdded = true;
       std::condition_variable added;
       std::mutex mutex;
+
+      _Requires_lock_held_(mutex)
+      template <typename F>
+      inline void wait(F&&);
     };
 
     // https://en.wikipedia.org/wiki/Xorshift
@@ -418,7 +425,7 @@
     std::vector<Allocator::unique_ptr<Fiber>>
         workerFibers;  // All fibers created by this worker.
     FastRnd rng;
-    std::atomic<bool> shutdown = {false};
+    bool shutdown = false;
   };
 
   // stealWork() attempts to steal a task from the worker with the given id.
@@ -472,6 +479,14 @@
   return currentFiber;
 }
 
+// schedule() schedules the task T to be asynchronously called using the
+// currently bound scheduler.
+inline void schedule(Task&& t) {
+  MARL_ASSERT_HAS_BOUND_SCHEDULER("marl::schedule");
+  auto scheduler = Scheduler::get();
+  scheduler->enqueue(std::move(t));
+}
+
 // schedule() schedules the function f to be asynchronously called with the
 // given arguments using the currently bound scheduler.
 template <typename Function, typename... Args>
@@ -488,7 +503,7 @@
 inline void schedule(Function&& f) {
   MARL_ASSERT_HAS_BOUND_SCHEDULER("marl::schedule");
   auto scheduler = Scheduler::get();
-  scheduler->enqueue(std::forward<Function>(f));
+  scheduler->enqueue(Task(std::forward<Function>(f)));
 }
 
 }  // namespace marl
diff --git a/third_party/marl/include/marl/task.h b/third_party/marl/include/marl/task.h
new file mode 100644
index 0000000..440615b
--- /dev/null
+++ b/third_party/marl/include/marl/task.h
@@ -0,0 +1,104 @@
+// Copyright 2020 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef marl_task_h
+#define marl_task_h
+
+#include <functional>
+
+namespace marl {
+
+// Task is a unit of work for the scheduler.
+class Task {
+ public:
+  using Function = std::function<void()>;
+
+  enum class Flags {
+    None = 0,
+
+    // SameThread ensures the task will be run on the same thread that scheduled
+    // the task. This can offer performance improvements if the current thread
+    // is immediately going to block on the newly scheduled task, by reducing
+    // overheads of waking another thread.
+    SameThread = 1,
+  };
+
+  inline Task();
+  inline Task(const Task&);
+  inline Task(Task&&);
+  inline Task(const Function& function, Flags flags = Flags::None);
+  inline Task(Function&& function, Flags flags = Flags::None);
+  inline Task& operator=(const Task&);
+  inline Task& operator=(Task&&);
+  inline Task& operator=(const Function&);
+  inline Task& operator=(Function&&);
+
+  // operator bool() returns true if the Task has a valid function.
+  inline operator bool() const;
+
+  // operator()() runs the task.
+  inline void operator()() const;
+
+  // is() returns true if the Task was created with the given flag.
+  inline bool is(Flags flag) const;
+
+ private:
+  Function function;
+  Flags flags = Flags::None;
+};
+
+Task::Task() {}
+Task::Task(const Task& o) : function(o.function), flags(o.flags) {}
+Task::Task(Task&& o) : function(std::move(o.function)), flags(o.flags) {}
+Task::Task(const Function& function, Flags flags /* = Flags::None */)
+    : function(function), flags(flags) {}
+Task::Task(Function&& function, Flags flags /* = Flags::None */)
+    : function(std::move(function)), flags(flags) {}
+Task& Task::operator=(const Task& o) {
+  function = o.function;
+  flags = o.flags;
+  return *this;
+}
+Task& Task::operator=(Task&& o) {
+  function = std::move(o.function);
+  flags = o.flags;
+  return *this;
+}
+
+Task& Task::operator=(const Function& f) {
+  function = f;
+  flags = Flags::None;
+  return *this;
+}
+Task& Task::operator=(Function&& f) {
+  function = std::move(f);
+  flags = Flags::None;
+  return *this;
+}
+Task::operator bool() const {
+  return function.operator bool();
+}
+
+void Task::operator()() const {
+  function();
+}
+
+bool Task::is(Flags flag) const {
+  return (static_cast<int>(flags) & static_cast<int>(flag)) ==
+         static_cast<int>(flag);
+}
+
+}  // namespace marl
+
+#endif  // marl_task_h
diff --git a/third_party/marl/include/marl/ticket.h b/third_party/marl/include/marl/ticket.h
index 10c2b77..59a38fc 100644
--- a/third_party/marl/include/marl/ticket.h
+++ b/third_party/marl/include/marl/ticket.h
@@ -62,6 +62,8 @@
   struct Record;
 
  public:
+  using OnCall = std::function<void()>;
+
   // Queue hands out Tickets.
   class Queue {
    public:
@@ -93,7 +95,7 @@
   // onCall() registers the function f to be invoked when this ticket is
   // called. If the ticket is already called prior to calling onCall(), then
   // f() will be executed immediately.
-  // F must be a function of the signature: void F()
+  // F must be a function of the OnCall signature.
   template <typename F>
   inline void onCall(F&& f) const;
 
@@ -111,7 +113,7 @@
     Record* next = nullptr;  // guarded by shared->mutex
     Record* prev = nullptr;  // guarded by shared->mutex
     inline void unlink();    // guarded by shared->mutex
-    Task onCall;             // guarded by shared->mutex
+    OnCall onCall;           // guarded by shared->mutex
     bool isCalled = false;   // guarded by shared->mutex
     std::atomic<bool> isDone = {false};
   };
@@ -155,7 +157,7 @@
         a();
         b();
       }
-      Task a, b;
+      OnCall a, b;
     };
     record->onCall = std::move(Joined{std::move(record->onCall), std::move(f)});
   } else {
@@ -228,13 +230,13 @@
     return;
   }
   isCalled = true;
-  Task task;
-  std::swap(task, onCall);
+  OnCall callback;
+  std::swap(callback, onCall);
   isCalledCondVar.notify_all();
   lock.unlock();
 
-  if (task) {
-    marl::schedule(std::move(task));
+  if (callback) {
+    marl::schedule(std::move(callback));
   }
 }
 
diff --git a/third_party/marl/include/marl/waitgroup.h b/third_party/marl/include/marl/waitgroup.h
index df7c349..7adbb19 100644
--- a/third_party/marl/include/marl/waitgroup.h
+++ b/third_party/marl/include/marl/waitgroup.h
@@ -51,7 +51,8 @@
 class WaitGroup {
  public:
   // Constructs the WaitGroup with the specified initial count.
-  inline WaitGroup(unsigned int initialCount = 0);
+  inline WaitGroup(unsigned int initialCount = 0,
+                   Allocator* allocator = Allocator::Default);
 
   // add() increments the internal counter by count.
   inline void add(unsigned int count = 1) const;
@@ -65,14 +66,20 @@
 
  private:
   struct Data {
+    inline Data(Allocator* allocator);
+
     std::atomic<unsigned int> count = {0};
-    ConditionVariable condition;
+    ConditionVariable cv;
     std::mutex mutex;
   };
-  const std::shared_ptr<Data> data = std::make_shared<Data>();
+  const std::shared_ptr<Data> data;
 };
 
-inline WaitGroup::WaitGroup(unsigned int initialCount /* = 0 */) {
+WaitGroup::Data::Data(Allocator* allocator) : cv(allocator) {}
+
+WaitGroup::WaitGroup(unsigned int initialCount /* = 0 */,
+                     Allocator* allocator /* = Allocator::Default */)
+    : data(std::make_shared<Data>(allocator)) {
   data->count = initialCount;
 }
 
@@ -85,7 +92,7 @@
   auto count = --data->count;
   if (count == 0) {
     std::unique_lock<std::mutex> lock(data->mutex);
-    data->condition.notify_all();
+    data->cv.notify_all();
     return true;
   }
   return false;
@@ -93,7 +100,7 @@
 
 void WaitGroup::wait() const {
   std::unique_lock<std::mutex> lock(data->mutex);
-  data->condition.wait(lock, [this] { return data->count == 0; });
+  data->cv.wait(lock, [this] { return data->count == 0; });
 }
 
 }  // namespace marl
diff --git a/third_party/marl/src/containers_test.cpp b/third_party/marl/src/containers_test.cpp
index bb5c9b0..2bc981c 100644
--- a/third_party/marl/src/containers_test.cpp
+++ b/third_party/marl/src/containers_test.cpp
@@ -193,3 +193,93 @@
   ASSERT_EQ(vectorB[1], "B");
   ASSERT_EQ(vectorB[2], "C");
 }
+
+class ContainersListTest : public WithoutBoundScheduler {};
+
+TEST_F(ContainersListTest, Empty) {
+  marl::containers::list<std::string> list(allocator);
+  ASSERT_EQ(list.size(), size_t(0));
+}
+
+TEST_F(ContainersListTest, EmplaceOne) {
+  marl::containers::list<std::string> list(allocator);
+  auto itEntry = list.emplace_front("hello world");
+  ASSERT_EQ(*itEntry, "hello world");
+  ASSERT_EQ(list.size(), size_t(1));
+  auto it = list.begin();
+  ASSERT_EQ(it, itEntry);
+  ++it;
+  ASSERT_EQ(it, list.end());
+}
+
+TEST_F(ContainersListTest, EmplaceThree) {
+  marl::containers::list<std::string> list(allocator);
+  auto itA = list.emplace_front("a");
+  auto itB = list.emplace_front("b");
+  auto itC = list.emplace_front("c");
+  ASSERT_EQ(*itA, "a");
+  ASSERT_EQ(*itB, "b");
+  ASSERT_EQ(*itC, "c");
+  ASSERT_EQ(list.size(), size_t(3));
+  auto it = list.begin();
+  ASSERT_EQ(it, itC);
+  ++it;
+  ASSERT_EQ(it, itB);
+  ++it;
+  ASSERT_EQ(it, itA);
+  ++it;
+  ASSERT_EQ(it, list.end());
+}
+
+TEST_F(ContainersListTest, EraseFront) {
+  marl::containers::list<std::string> list(allocator);
+  auto itA = list.emplace_front("a");
+  auto itB = list.emplace_front("b");
+  auto itC = list.emplace_front("c");
+  list.erase(itC);
+  ASSERT_EQ(list.size(), size_t(2));
+  auto it = list.begin();
+  ASSERT_EQ(it, itB);
+  ++it;
+  ASSERT_EQ(it, itA);
+  ++it;
+  ASSERT_EQ(it, list.end());
+}
+
+TEST_F(ContainersListTest, EraseBack) {
+  marl::containers::list<std::string> list(allocator);
+  auto itA = list.emplace_front("a");
+  auto itB = list.emplace_front("b");
+  auto itC = list.emplace_front("c");
+  list.erase(itA);
+  ASSERT_EQ(list.size(), size_t(2));
+  auto it = list.begin();
+  ASSERT_EQ(it, itC);
+  ++it;
+  ASSERT_EQ(it, itB);
+  ++it;
+  ASSERT_EQ(it, list.end());
+}
+
+TEST_F(ContainersListTest, EraseMid) {
+  marl::containers::list<std::string> list(allocator);
+  auto itA = list.emplace_front("a");
+  auto itB = list.emplace_front("b");
+  auto itC = list.emplace_front("c");
+  list.erase(itB);
+  ASSERT_EQ(list.size(), size_t(2));
+  auto it = list.begin();
+  ASSERT_EQ(it, itC);
+  ++it;
+  ASSERT_EQ(it, itA);
+  ++it;
+  ASSERT_EQ(it, list.end());
+}
+
+TEST_F(ContainersListTest, Grow) {
+  marl::containers::list<std::string> list(allocator);
+  for (int i = 0; i < 256; i++) {
+    list.emplace_front(std::to_string(i));
+  }
+  ASSERT_EQ(list.size(), size_t(256));
+}
diff --git a/third_party/marl/src/event_bench.cpp b/third_party/marl/src/event_bench.cpp
index 985c718..dc8f569 100644
--- a/third_party/marl/src/event_bench.cpp
+++ b/third_party/marl/src/event_bench.cpp
@@ -36,3 +36,37 @@
   });
 }
 BENCHMARK_REGISTER_F(Schedule, Event)->Apply(Schedule::args<512>);
+
+// EventBaton benchmarks alternating execution of two tasks.
+BENCHMARK_DEFINE_F(Schedule, EventBaton)(benchmark::State& state) {
+  run(state, [&](int numPasses) {
+    for (auto _ : state) {
+      marl::Event passToA(marl::Event::Mode::Auto);
+      marl::Event passToB(marl::Event::Mode::Auto);
+      marl::Event done(marl::Event::Mode::Auto);
+
+      marl::schedule(marl::Task(
+          [=] {
+            for (int i = 0; i < numPasses; i++) {
+              passToA.wait();
+              passToB.signal();
+            }
+          },
+          marl::Task::Flags::SameThread));
+
+      marl::schedule(marl::Task(
+          [=] {
+            for (int i = 0; i < numPasses; i++) {
+              passToB.wait();
+              passToA.signal();
+            }
+            done.signal();
+          },
+          marl::Task::Flags::SameThread));
+
+      passToA.signal();
+      done.wait();
+    }
+  });
+}
+BENCHMARK_REGISTER_F(Schedule, EventBaton)->Apply(Schedule::args<1000000>);
diff --git a/third_party/marl/src/marl_bench.h b/third_party/marl/src/marl_bench.h
index 4932b40..e4c2cee 100644
--- a/third_party/marl/src/marl_bench.h
+++ b/third_party/marl/src/marl_bench.h
@@ -48,13 +48,13 @@
     }
   }
 
-  // numThreads return the number of threads in the benchmark run from the
+  // numThreads() return the number of threads in the benchmark run from the
   // state.
   static int numThreads(const ::benchmark::State& state) {
     return static_cast<int>(state.range(1));
   }
 
-  // numTasks return the number of tasks in the benchmark run from the state.
+  // numTasks() return the number of tasks in the benchmark run from the state.
   static int numTasks(const ::benchmark::State& state) {
     return static_cast<int>(state.range(0));
   }
diff --git a/third_party/marl/src/scheduler.cpp b/third_party/marl/src/scheduler.cpp
index d8de740..1f10f3a 100644
--- a/third_party/marl/src/scheduler.cpp
+++ b/third_party/marl/src/scheduler.cpp
@@ -60,9 +60,9 @@
 #endif
 
 template <typename T>
-inline T take(std::queue<T>& queue) {
+inline T take(std::deque<T>& queue) {
   auto out = std::move(queue.front());
-  queue.pop();
+  queue.pop_front();
   return out;
 }
 
@@ -189,6 +189,10 @@
 }
 
 void Scheduler::enqueue(Task&& task) {
+  if (task.is(Task::Flags::SameThread)) {
+    Scheduler::Worker::getCurrent()->enqueue(std::move(task));
+    return;
+  }
   if (numWorkerThreads > 0) {
     while (true) {
       // Prioritize workers that have recently started spinning.
@@ -220,7 +224,7 @@
   if (numWorkerThreads > 0) {
     auto thread = workerThreads[from % numWorkerThreads];
     if (thread != thief) {
-      if (thread->dequeue(out)) {
+      if (thread->steal(out)) {
         return true;
       }
     }
@@ -372,7 +376,10 @@
         Worker::current = this;
         mainFiber = Fiber::createFromCurrentThread(scheduler->allocator, 0);
         currentFiber = mainFiber.get();
-        run();
+        {
+          std::unique_lock<std::mutex> lock(work.mutex);
+          run();
+        }
         mainFiber.reset();
         Worker::current = nullptr;
       });
@@ -392,8 +399,7 @@
 void Scheduler::Worker::stop() {
   switch (mode) {
     case Mode::MultiThreaded:
-      shutdown = true;
-      enqueue([] {});  // Ensure the worker is woken up to notice the shutdown.
+      enqueue(Task([this] { shutdown = true; }, Task::Flags::SameThread));
       thread.join();
       break;
 
@@ -462,22 +468,16 @@
     work.num--;
     auto to = take(work.fibers);
     ASSERT_FIBER_STATE(to, Fiber::State::Queued);
-    work.mutex.unlock();
     switchToFiber(to);
-    work.mutex.lock();
   } else if (idleFibers.size() > 0) {
     // There's an old fiber we can reuse, resume that.
     auto to = take(idleFibers);
     ASSERT_FIBER_STATE(to, Fiber::State::Idle);
-    work.mutex.unlock();
     switchToFiber(to);
-    work.mutex.lock();
   } else {
     // Tasks to process and no existing fibers to resume.
     // Spawn a new fiber.
-    work.mutex.unlock();
     switchToFiber(createWorkerFiber());
-    work.mutex.lock();
   }
 
   setFiberState(currentFiber, Fiber::State::Running);
@@ -503,15 +503,15 @@
     case Fiber::State::Yielded:
       break;
   }
-  bool wasIdle = work.num == 0;
-  work.fibers.push(std::move(fiber));
+  bool notify = work.notifyAdded;
+  work.fibers.push_back(std::move(fiber));
   MARL_ASSERT(!work.waiting.contains(fiber),
               "fiber is unexpectedly in the waiting list");
   setFiberState(fiber, Fiber::State::Queued);
   work.num++;
   lock.unlock();
 
-  if (wasIdle) {
+  if (notify) {
     work.added.notify_one();
   }
 }
@@ -524,23 +524,24 @@
 _Requires_lock_held_(work.mutex)
 _Releases_lock_(work.mutex)
 void Scheduler::Worker::enqueueAndUnlock(Task&& task) {
-  auto wasIdle = work.num == 0;
-  work.tasks.push(std::move(task));
+  auto notify = work.notifyAdded;
+  work.tasks.push_back(std::move(task));
   work.num++;
   work.mutex.unlock();
-  if (wasIdle) {
+  if (notify) {
     work.added.notify_one();
   }
 }
 
-bool Scheduler::Worker::dequeue(Task& out) {
+bool Scheduler::Worker::steal(Task& out) {
   if (work.num.load() == 0) {
     return false;
   }
   if (!work.mutex.try_lock()) {
     return false;
   }
-  if (work.tasks.size() == 0) {
+  if (work.tasks.size() == 0 ||
+      work.tasks.front().is(Task::Flags::SameThread)) {
     work.mutex.unlock();
     return false;
   }
@@ -550,6 +551,7 @@
   return true;
 }
 
+_Requires_lock_held_(work.mutex)
 void Scheduler::Worker::flush() {
   MARL_ASSERT(mode == Mode::SingleThreaded,
               "flush() can only be used on a single-threaded worker");
@@ -557,33 +559,30 @@
   runUntilIdle();
 }
 
+_Requires_lock_held_(work.mutex)
 void Scheduler::Worker::run() {
   switch (mode) {
     case Mode::MultiThreaded: {
       MARL_NAME_THREAD("Thread<%.2d> Fiber<%.2d>", int(id),
                        Fiber::current()->id);
-      {
-        std::unique_lock<std::mutex> lock(work.mutex);
-        work.added.wait(
-            lock, [this] { return work.num > 0 || work.waiting || shutdown; });
-        while (!shutdown || work.num > 0 || numBlockedFibers() > 0U) {
-          waitForWork();
-          runUntilIdle();
-        }
-        Worker::current = nullptr;
+      work.wait([this] { return work.num > 0 || work.waiting || shutdown; });
+      while (!shutdown || work.num > 0 || numBlockedFibers() > 0U) {
+        waitForWork();
+        runUntilIdle();
       }
+      Worker::current = nullptr;
       switchToFiber(mainFiber.get());
       break;
     }
-    case Mode::SingleThreaded:
+    case Mode::SingleThreaded: {
       ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
       while (!shutdown) {
-        flush();
+        runUntilIdle();
         idleFibers.emplace(currentFiber);
         switchToFiber(mainFiber.get());
       }
       break;
-
+    }
     default:
       MARL_ASSERT(false, "Unknown mode: %d", int(mode));
   }
@@ -593,26 +592,22 @@
 void Scheduler::Worker::waitForWork() {
   MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(),
               "work.num out of sync");
-  if (work.num == 0 && mode == Mode::MultiThreaded) {
+  if (work.num > 0) {
+    return;
+  }
+
+  if (mode == Mode::MultiThreaded) {
     scheduler->onBeginSpinning(id);
     work.mutex.unlock();
     spinForWork();
     work.mutex.lock();
   }
 
+  work.wait([this] {
+    return work.num > 0 || (shutdown && numBlockedFibers() == 0U);
+  });
   if (work.waiting) {
-    std::unique_lock<std::mutex> lock(work.mutex, std::adopt_lock);
-    work.added.wait_until(lock, work.waiting.next(), [this] {
-      return work.num > 0 || (shutdown && numBlockedFibers() == 0U);
-    });
-    lock.release();  // Keep the lock held.
     enqueueFiberTimeouts();
-  } else {
-    std::unique_lock<std::mutex> lock(work.mutex, std::adopt_lock);
-    work.added.wait(lock, [this] {
-      return work.num > 0 || (shutdown && numBlockedFibers() == 0U);
-    });
-    lock.release();  // Keep the lock held.
   }
 }
 
@@ -622,7 +617,7 @@
   while (auto fiber = work.waiting.take(now)) {
     changeFiberState(fiber, Fiber::State::Waiting, Fiber::State::Queued);
     DBG_LOG("%d: TIMEOUT(%d)", (int)id, (int)fiber->id);
-    work.fibers.push(fiber);
+    work.fibers.push_back(fiber);
     work.num++;
   }
 }
@@ -667,7 +662,7 @@
 
     if (scheduler->stealWork(this, rng(), stolen)) {
       std::unique_lock<std::mutex> lock(work.mutex);
-      work.tasks.emplace(std::move(stolen));
+      work.tasks.emplace_back(std::move(stolen));
       work.num++;
       return;
     }
@@ -695,15 +690,11 @@
       ASSERT_FIBER_STATE(fiber, Fiber::State::Queued);
 
       changeFiberState(currentFiber, Fiber::State::Running, Fiber::State::Idle);
-      work.mutex.unlock();
-      {  // unlocked
-        auto added = idleFibers.emplace(currentFiber).second;
-        (void)added;
-        MARL_ASSERT(added, "fiber already idle");
+      auto added = idleFibers.emplace(currentFiber).second;
+      (void)added;
+      MARL_ASSERT(added, "fiber already idle");
 
-        switchToFiber(fiber);
-      }
-      work.mutex.lock();
+      switchToFiber(fiber);
       changeFiberState(currentFiber, Fiber::State::Idle, Fiber::State::Running);
     }
 
@@ -724,6 +715,7 @@
   }
 }
 
+_Requires_lock_held_(work.mutex)
 Scheduler::Fiber* Scheduler::Worker::createWorkerFiber() {
   auto fiberId = static_cast<uint32_t>(workerFibers.size() + 1);
   DBG_LOG("%d: CREATE(%d)", (int)id, (int)fiberId);
@@ -734,6 +726,7 @@
   return ptr;
 }
 
+_Requires_lock_held_(work.mutex)
 void Scheduler::Worker::switchToFiber(Fiber* to) {
   DBG_LOG("%d: SWITCH(%d -> %d)", (int)id, (int)currentFiber->id, (int)to->id);
   MARL_ASSERT(to == mainFiber.get() || idleFibers.count(to) == 0,
@@ -743,4 +736,21 @@
   from->switchTo(to);
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// Scheduler::Worker::Work
+////////////////////////////////////////////////////////////////////////////////
+_Requires_lock_held_(mutex)
+template <typename F>
+void Scheduler::Worker::Work::wait(F&& f) {
+  std::unique_lock<std::mutex> lock(mutex, std::adopt_lock);
+  notifyAdded = true;
+  if (waiting) {
+    added.wait_until(lock, waiting.next(), f);
+  } else {
+    added.wait(lock, f);
+  }
+  notifyAdded = false;
+  lock.release();  // Keep the lock held.
+}
+
 }  // namespace marl