Squashed 'third_party/marl/' changes from 16e1dc37c..539094011

539094011 CMake: Export MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED
c7f70ba7a CMake: Bump min version + include(CheckCXXSourceCompiles)
658a204fc Replace SAL annotations with clang's TSA annotations
9630bec2f Fix CMake warning: "Policy CMP0023 is not set"
9f369ad5d Update yarn:: to marl:: in an example

git-subtree-dir: third_party/marl
git-subtree-split: 53909401165022553ed9d1f0c572178559dc25ec
diff --git a/.clang-format b/.clang-format
index b0823b4..08178fe 100644
--- a/.clang-format
+++ b/.clang-format
@@ -3,8 +3,3 @@
 
 ---
 Language:        Cpp
-StatementMacros:
- - _Acquires_lock_
- - _Releases_lock_
- - _Requires_lock_held_
- - _When_
\ No newline at end of file
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2a8a695..005b855 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -12,12 +12,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-cmake_minimum_required(VERSION 2.8)
+cmake_minimum_required(VERSION 3.0)
 
-set (CMAKE_CXX_STANDARD 11)
+set(CMAKE_CXX_STANDARD 11)
 
 project(Marl C CXX ASM)
 
+include(CheckCXXSourceCompiles)
+
 ###########################################################
 # Options
 ###########################################################
@@ -76,6 +78,32 @@
 endif(MARL_BUILD_BENCHMARKS)
 
 ###########################################################
+# Compiler feature tests
+###########################################################
+# Check that the Clang Thread Safety Analysis' try_acquire_capability behaves
+# correctly. This is broken on some earlier versions of clang.
+# See: https://bugs.llvm.org/show_bug.cgi?id=32954
+set(SAVE_CMAKE_REQUIRED_FLAGS ${CMAKE_REQUIRED_FLAGS})
+set(CMAKE_REQUIRED_FLAGS "-Wthread-safety -Werror")
+check_cxx_source_compiles(
+    "int main() {
+      struct __attribute__((capability(\"mutex\"))) Mutex {
+        void Unlock() __attribute__((release_capability)) {};
+        bool TryLock() __attribute__((try_acquire_capability(true))) { return true; };
+      };
+      Mutex m;
+      if (m.TryLock()) {
+        m.Unlock();  // Should not warn.
+      }
+      return 0;
+    }"
+    MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED)
+set(CMAKE_REQUIRED_FLAGS ${SAVE_CMAKE_REQUIRED_FLAGS})
+
+# Export MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED as this may be useful to parent projects
+set(MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED PARENT_SCOPE ${MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED})
+
+###########################################################
 # File lists
 ###########################################################
 set(MARL_LIST
@@ -111,6 +139,10 @@
 # Functions
 ###########################################################
 function(marl_set_target_options target)
+    if (MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED)
+        target_compile_options(${target} PRIVATE "-Wthread-safety")
+    endif()
+
     # Enable all warnings
     if(MSVC)
         target_compile_options(${target} PRIVATE
@@ -137,13 +169,13 @@
 
     if(MARL_ASAN)
         target_compile_options(${target} PUBLIC "-fsanitize=address")
-        target_link_libraries(${target} "-fsanitize=address")
+        target_link_libraries(${target} PUBLIC "-fsanitize=address")
     elseif(MARL_MSAN)
         target_compile_options(${target} PUBLIC "-fsanitize=memory")
-        target_link_libraries(${target} "-fsanitize=memory")
+        target_link_libraries(${target} PUBLIC "-fsanitize=memory")
     elseif(MARL_TSAN)
         target_compile_options(${target} PUBLIC "-fsanitize=thread")
-        target_link_libraries(${target} "-fsanitize=thread")
+        target_link_libraries(${target} PUBLIC "-fsanitize=thread")
     endif()
 
     target_include_directories(${target} PUBLIC $<BUILD_INTERFACE:${MARL_INCLUDE_DIR}>)
@@ -232,7 +264,7 @@
 
     marl_set_target_options(marl-unittests)
 
-    target_link_libraries(marl-unittests marl)
+    target_link_libraries(marl-unittests PRIVATE marl)
 endif(MARL_BUILD_TESTS)
 
 # benchmarks
@@ -252,7 +284,7 @@
 
     marl_set_target_options(marl-benchmarks)
 
-    target_link_libraries(marl-benchmarks benchmark::benchmark marl)
+    target_link_libraries(marl-benchmarks PRIVATE benchmark::benchmark marl)
 endif(MARL_BUILD_BENCHMARKS)
 
 # examples
@@ -263,7 +295,7 @@
             FOLDER "Examples"
         )
         marl_set_target_options(${target})
-        target_link_libraries(${target} marl)
+        target_link_libraries(${target} PRIVATE marl)
     endfunction(build_example)
 
     build_example(fractal)
diff --git a/include/marl/blockingcall.h b/include/marl/blockingcall.h
index 983ee82..1ef0277 100644
--- a/include/marl/blockingcall.h
+++ b/include/marl/blockingcall.h
@@ -85,10 +85,10 @@
 //  void runABlockingFunctionOnATask()
 //  {
 //      // Schedule a task that calls a blocking, non-yielding function.
-//      yarn::schedule([=] {
+//      marl::schedule([=] {
 //          // call_blocking_function() may block indefinitely.
 //          // Ensure this call does not block other tasks from running.
-//          auto result = yarn::blocking_call(call_blocking_function);
+//          auto result = marl::blocking_call(call_blocking_function);
 //          // call_blocking_function() has now returned.
 //          // result holds the return value of the blocking function call.
 //      });
diff --git a/include/marl/conditionvariable.h b/include/marl/conditionvariable.h
index 8dfb9c7..5a57db2 100644
--- a/include/marl/conditionvariable.h
+++ b/include/marl/conditionvariable.h
@@ -19,11 +19,12 @@
 #include "debug.h"
 #include "defer.h"
 #include "memory.h"
+#include "mutex.h"
 #include "scheduler.h"
+#include "tsa.h"
 
 #include <atomic>
 #include <condition_variable>
-#include <mutex>
 
 namespace marl {
 
@@ -47,14 +48,14 @@
   // wait() blocks the current fiber or thread until the predicate is satisfied
   // and the ConditionVariable is notified.
   template <typename Predicate>
-  inline void wait(std::unique_lock<std::mutex>& lock, Predicate&& pred);
+  inline void wait(marl::lock& lock, Predicate&& pred);
 
   // wait_for() blocks the current fiber or thread until the predicate is
   // satisfied, and the ConditionVariable is notified, or the timeout has been
   // reached. Returns false if pred still evaluates to false after the timeout
   // has been reached, otherwise true.
   template <typename Rep, typename Period, typename Predicate>
-  bool wait_for(std::unique_lock<std::mutex>& lock,
+  bool wait_for(marl::lock& lock,
                 const std::chrono::duration<Rep, Period>& duration,
                 Predicate&& pred);
 
@@ -63,7 +64,7 @@
   // reached. Returns false if pred still evaluates to false after the timeout
   // has been reached, otherwise true.
   template <typename Clock, typename Duration, typename Predicate>
-  bool wait_until(std::unique_lock<std::mutex>& lock,
+  bool wait_until(marl::lock& lock,
                   const std::chrono::time_point<Clock, Duration>& timeout,
                   Predicate&& pred);
 
@@ -73,7 +74,7 @@
   ConditionVariable& operator=(const ConditionVariable&) = delete;
   ConditionVariable& operator=(ConditionVariable&&) = delete;
 
-  std::mutex mutex;
+  marl::mutex mutex;
   containers::list<Scheduler::Fiber*> waiting;
   std::condition_variable condition;
   std::atomic<int> numWaiting = {0};
@@ -89,7 +90,7 @@
     return;
   }
   {
-    std::unique_lock<std::mutex> lock(mutex);
+    marl::lock lock(mutex);
     for (auto fiber : waiting) {
       fiber->notify();
     }
@@ -104,7 +105,7 @@
     return;
   }
   {
-    std::unique_lock<std::mutex> lock(mutex);
+    marl::lock lock(mutex);
     for (auto fiber : waiting) {
       fiber->notify();
     }
@@ -115,8 +116,7 @@
 }
 
 template <typename Predicate>
-void ConditionVariable::wait(std::unique_lock<std::mutex>& lock,
-                             Predicate&& pred) {
+void ConditionVariable::wait(marl::lock& lock, Predicate&& pred) {
   if (pred()) {
     return;
   }
@@ -137,7 +137,7 @@
     // Currently running outside of the scheduler.
     // Delegate to the std::condition_variable.
     numWaitingOnCondition++;
-    condition.wait(lock, pred);
+    lock.wait(condition, pred);
     numWaitingOnCondition--;
   }
   numWaiting--;
@@ -145,7 +145,7 @@
 
 template <typename Rep, typename Period, typename Predicate>
 bool ConditionVariable::wait_for(
-    std::unique_lock<std::mutex>& lock,
+    marl::lock& lock,
     const std::chrono::duration<Rep, Period>& duration,
     Predicate&& pred) {
   return wait_until(lock, std::chrono::system_clock::now() + duration, pred);
@@ -153,7 +153,7 @@
 
 template <typename Clock, typename Duration, typename Predicate>
 bool ConditionVariable::wait_until(
-    std::unique_lock<std::mutex>& lock,
+    marl::lock& lock,
     const std::chrono::time_point<Clock, Duration>& timeout,
     Predicate&& pred) {
   if (pred()) {
@@ -181,7 +181,7 @@
     // Delegate to the std::condition_variable.
     numWaitingOnCondition++;
     defer(numWaitingOnCondition--);
-    return condition.wait_until(lock, timeout, pred);
+    return lock.wait_until(condition, timeout, pred);
   }
 }
 
diff --git a/include/marl/event.h b/include/marl/event.h
index f7b3023..bac6078 100644
--- a/include/marl/event.h
+++ b/include/marl/event.h
@@ -113,7 +113,7 @@
     inline bool wait_until(
         const std::chrono::time_point<Clock, Duration>& timeout);
 
-    std::mutex mutex;
+    marl::mutex mutex;
     ConditionVariable cv;
     const Mode mode;
     bool signalled;
@@ -127,7 +127,7 @@
     : cv(allocator), mode(mode), signalled(initialState) {}
 
 void Event::Shared::signal() {
-  std::unique_lock<std::mutex> lock(mutex);
+  marl::lock lock(mutex);
   if (signalled) {
     return;
   }
@@ -143,7 +143,7 @@
 }
 
 void Event::Shared::wait() {
-  std::unique_lock<std::mutex> lock(mutex);
+  marl::lock lock(mutex);
   cv.wait(lock, [&] { return signalled; });
   if (mode == Mode::Auto) {
     signalled = false;
@@ -153,7 +153,7 @@
 template <typename Rep, typename Period>
 bool Event::Shared::wait_for(
     const std::chrono::duration<Rep, Period>& duration) {
-  std::unique_lock<std::mutex> lock(mutex);
+  marl::lock lock(mutex);
   if (!cv.wait_for(lock, duration, [&] { return signalled; })) {
     return false;
   }
@@ -166,7 +166,7 @@
 template <typename Clock, typename Duration>
 bool Event::Shared::wait_until(
     const std::chrono::time_point<Clock, Duration>& timeout) {
-  std::unique_lock<std::mutex> lock(mutex);
+  marl::lock lock(mutex);
   if (!cv.wait_until(lock, timeout, [&] { return signalled; })) {
     return false;
   }
@@ -186,7 +186,7 @@
 }
 
 void Event::clear() const {
-  std::unique_lock<std::mutex> lock(shared->mutex);
+  marl::lock lock(shared->mutex);
   shared->signalled = false;
 }
 
@@ -206,7 +206,7 @@
 }
 
 bool Event::test() const {
-  std::unique_lock<std::mutex> lock(shared->mutex);
+  marl::lock lock(shared->mutex);
   if (!shared->signalled) {
     return false;
   }
@@ -217,7 +217,7 @@
 }
 
 bool Event::isSignalled() const {
-  std::unique_lock<std::mutex> lock(shared->mutex);
+  marl::lock lock(shared->mutex);
   return shared->signalled;
 }
 
@@ -226,7 +226,7 @@
   Event any(mode, false);
   for (auto it = begin; it != end; it++) {
     auto s = it->shared;
-    std::unique_lock<std::mutex> lock(s->mutex);
+    marl::lock lock(s->mutex);
     if (s->signalled) {
       any.signal();
     }
diff --git a/include/marl/mutex.h b/include/marl/mutex.h
new file mode 100644
index 0000000..72ecaf7
--- /dev/null
+++ b/include/marl/mutex.h
@@ -0,0 +1,106 @@
+// Copyright 2020 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Wrappers around std::mutex and std::unique_lock that provide clang's
+// Thread Safety Analysis annotations.
+// See: https://clang.llvm.org/docs/ThreadSafetyAnalysis.html
+
+#ifndef marl_mutex_h
+#define marl_mutex_h
+
+#include "tsa.h"
+
+#include <condition_variable>
+#include <mutex>
+
+namespace marl {
+
+// mutex is a wrapper around std::mutex that offers Thread Safety Analysis
+// annotations.
+// mutex also holds methods for performing std::condition_variable::wait() calls
+// as these require a std::unique_lock<> which are unsupported by the TSA.
+class CAPABILITY("mutex") mutex {
+ public:
+  inline void lock() ACQUIRE() { _.lock(); }
+
+  inline void unlock() RELEASE() { _.unlock(); }
+
+  inline bool try_lock() TRY_ACQUIRE(true) { return _.try_lock(); }
+
+  // wait_locked calls cv.wait() on this already locked mutex.
+  template <typename Predicate>
+  inline void wait_locked(std::condition_variable& cv, Predicate&& p)
+      REQUIRES(this) {
+    std::unique_lock<std::mutex> lock(_, std::adopt_lock);
+    cv.wait(lock, std::forward<Predicate>(p));
+    lock.release();  // Keep lock held.
+  }
+
+  // wait_until_locked calls cv.wait() on this already locked mutex.
+  template <typename Predicate, typename Time>
+  inline bool wait_until_locked(std::condition_variable& cv,
+                                Time&& time,
+                                Predicate&& p) REQUIRES(this) {
+    std::unique_lock<std::mutex> lock(_, std::adopt_lock);
+    auto res = cv.wait_until(lock, std::forward<Time>(time),
+                             std::forward<Predicate>(p));
+    lock.release();  // Keep lock held.
+    return res;
+  }
+
+ private:
+  friend class lock;
+  std::mutex _;
+};
+
+// lock is a RAII lock helper that offers Thread Safety Analysis annotations.
+// lock also holds methods for performing std::condition_variable::wait()
+// calls as these require a std::unique_lock<> which are unsupported by the TSA.
+class SCOPED_CAPABILITY lock {
+ public:
+  inline lock(mutex& m) ACQUIRE(m) : _(m._) {}
+  inline ~lock() RELEASE() {}
+
+  // wait calls cv.wait() on this lock.
+  template <typename Predicate>
+  inline void wait(std::condition_variable& cv, Predicate&& p) {
+    cv.wait(_, std::forward<Predicate>(p));
+  }
+
+  // wait_until calls cv.wait() on this lock.
+  template <typename Predicate, typename Time>
+  inline bool wait_until(std::condition_variable& cv,
+                         Time&& time,
+                         Predicate&& p) {
+    return cv.wait_until(_, std::forward<Time>(time),
+                         std::forward<Predicate>(p));
+  }
+
+  inline bool owns_lock() const { return _.owns_lock(); }
+
+  // lock_no_tsa locks the mutex outside of the visiblity of the thread
+  // safety analysis. Use with caution.
+  inline void lock_no_tsa() { _.lock(); }
+
+  // unlock_no_tsa unlocks the mutex outside of the visiblity of the thread
+  // safety analysis. Use with caution.
+  inline void unlock_no_tsa() { _.unlock(); }
+
+ private:
+  std::unique_lock<std::mutex> _;
+};
+
+}  // namespace marl
+
+#endif  // marl_mutex_h
diff --git a/include/marl/pool.h b/include/marl/pool.h
index 934fa40..393b457 100644
--- a/include/marl/pool.h
+++ b/include/marl/pool.h
@@ -17,9 +17,9 @@
 
 #include "conditionvariable.h"
 #include "memory.h"
+#include "mutex.h"
 
 #include <atomic>
-#include <mutex>
 
 namespace marl {
 
@@ -237,7 +237,7 @@
     inline void return_(Item*) override;
 
     Item items[N];
-    std::mutex mutex;
+    marl::mutex mutex;
     ConditionVariable returned;
     Item* free = nullptr;
   };
@@ -281,7 +281,7 @@
 template <typename T, int N, PoolPolicy POLICY>
 template <typename F>
 void BoundedPool<T, N, POLICY>::borrow(size_t n, const F& f) const {
-  std::unique_lock<std::mutex> lock(storage->mutex);
+  marl::lock lock(storage->mutex);
   for (size_t i = 0; i < n; i++) {
     storage->returned.wait(lock, [&] { return storage->free != nullptr; });
     auto item = storage->free;
@@ -296,14 +296,16 @@
 template <typename T, int N, PoolPolicy POLICY>
 std::pair<typename BoundedPool<T, N, POLICY>::Loan, bool>
 BoundedPool<T, N, POLICY>::tryBorrow() const {
-  std::unique_lock<std::mutex> lock(storage->mutex);
-  if (storage->free == nullptr) {
-    return std::make_pair(Loan(), false);
+  Item* item = nullptr;
+  {
+    marl::lock lock(storage->mutex);
+    if (storage->free == nullptr) {
+      return std::make_pair(Loan(), false);
+    }
+    item = storage->free;
+    storage->free = storage->free->next;
+    item->pool = this;
   }
-  auto item = storage->free;
-  storage->free = storage->free->next;
-  item->pool = this;
-  lock.unlock();
   if (POLICY == PoolPolicy::Reconstruct) {
     item->construct();
   }
@@ -315,10 +317,11 @@
   if (POLICY == PoolPolicy::Reconstruct) {
     item->destruct();
   }
-  std::unique_lock<std::mutex> lock(mutex);
-  item->next = free;
-  free = item;
-  lock.unlock();
+  {
+    marl::lock lock(mutex);
+    item->next = free;
+    free = item;
+  }
   returned.notify_one();
 }
 
@@ -359,7 +362,7 @@
     inline void return_(Item*) override;
 
     Allocator* allocator;
-    std::mutex mutex;
+    marl::mutex mutex;
     std::vector<Item*> items;
     Item* free = nullptr;
   };
@@ -398,7 +401,7 @@
 template <typename T, PoolPolicy POLICY>
 template <typename F>
 inline void UnboundedPool<T, POLICY>::borrow(size_t n, const F& f) const {
-  std::unique_lock<std::mutex> lock(storage->mutex);
+  marl::lock lock(storage->mutex);
   for (size_t i = 0; i < n; i++) {
     if (storage->free == nullptr) {
       auto count = std::max<size_t>(storage->items.size(), 32);
@@ -427,10 +430,9 @@
   if (POLICY == PoolPolicy::Reconstruct) {
     item->destruct();
   }
-  std::unique_lock<std::mutex> lock(mutex);
+  marl::lock lock(mutex);
   item->next = free;
   free = item;
-  lock.unlock();
 }
 
 }  // namespace marl
diff --git a/include/marl/sal.h b/include/marl/sal.h
deleted file mode 100644
index de47a49..0000000
--- a/include/marl/sal.h
+++ /dev/null
@@ -1,42 +0,0 @@
-// Copyright 2019 The Marl Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Stubs SAL annotation macros for platforms that do not support them.
-// See
-// https://docs.microsoft.com/en-us/visualstudio/code-quality/annotating-locking-behavior?view=vs-2019
-
-#ifndef marl_sal_h
-#define marl_sal_h
-
-#ifndef _Acquires_lock_
-#define _Acquires_lock_(...)
-#endif
-
-#ifndef _Guarded_by_
-#define _Guarded_by_(...)
-#endif
-
-#ifndef _Releases_lock_
-#define _Releases_lock_(...)
-#endif
-
-#ifndef _Requires_lock_held_
-#define _Requires_lock_held_(...)
-#endif
-
-#ifndef _When_
-#define _When_(...)
-#endif
-
-#endif  // marl_sal_h
diff --git a/include/marl/scheduler.h b/include/marl/scheduler.h
index b96e147..4db4b4d 100644
--- a/include/marl/scheduler.h
+++ b/include/marl/scheduler.h
@@ -17,7 +17,7 @@
 
 #include "debug.h"
 #include "memory.h"
-#include "sal.h"
+#include "mutex.h"
 #include "task.h"
 #include "thread.h"
 
@@ -28,7 +28,6 @@
 #include <deque>
 #include <functional>
 #include <map>
-#include <mutex>
 #include <set>
 #include <thread>
 #include <unordered_map>
@@ -104,8 +103,6 @@
   // thread that previously executed it.
   class Fiber {
    public:
-    using Lock = std::unique_lock<std::mutex>;
-
     // current() returns the currently executing fiber, or nullptr if called
     // without a bound scheduler.
     static Fiber* current();
@@ -122,8 +119,7 @@
     // will be locked before wait() returns.
     // pred will be always be called with the lock held.
     // wait() must only be called on the currently executing fiber.
-    _Requires_lock_held_(lock)
-    void wait(Lock& lock, const Predicate& pred);
+    void wait(marl::lock& lock, const Predicate& pred);
 
     // wait() suspends execution of this Fiber until the Fiber is woken up with
     // a call to notify() and the predicate pred returns true, or sometime after
@@ -139,9 +135,8 @@
     // will be locked before wait() returns.
     // pred will be always be called with the lock held.
     // wait() must only be called on the currently executing fiber.
-    _Requires_lock_held_(lock)
     template <typename Clock, typename Duration>
-    inline bool wait(Lock& lock,
+    inline bool wait(marl::lock& lock,
                      const std::chrono::time_point<Clock, Duration>& timeout,
                      const Predicate& pred);
 
@@ -307,56 +302,51 @@
     Worker(Scheduler* scheduler, Mode mode, uint32_t id);
 
     // start() begins execution of the worker.
-    void start();
+    void start() EXCLUDES(work.mutex);
 
     // stop() ceases execution of the worker, blocking until all pending
     // tasks have fully finished.
-    void stop();
+    void stop() EXCLUDES(work.mutex);
 
     // wait() suspends execution of the current task until the predicate pred
     // returns true or the optional timeout is reached.
     // See Fiber::wait() for more information.
-    _Requires_lock_held_(lock)
-    bool wait(Fiber::Lock& lock,
-              const TimePoint* timeout,
-              const Predicate& pred);
+    bool wait(marl::lock& lock, const TimePoint* timeout, const Predicate& pred)
+        EXCLUDES(work.mutex);
 
     // wait() suspends execution of the current task until the fiber is
     // notified, or the optional timeout is reached.
     // See Fiber::wait() for more information.
-    bool wait(const TimePoint* timeout);
+    bool wait(const TimePoint* timeout) EXCLUDES(work.mutex);
 
     // suspend() suspends the currenetly executing Fiber until the fiber is
     // woken with a call to enqueue(Fiber*), or automatically sometime after the
     // optional timeout.
-    _Requires_lock_held_(work.mutex)
-    void suspend(const TimePoint* timeout);
+    void suspend(const TimePoint* timeout) REQUIRES(work.mutex);
 
     // enqueue(Fiber*) enqueues resuming of a suspended fiber.
-    void enqueue(Fiber* fiber);
+    void enqueue(Fiber* fiber) EXCLUDES(work.mutex);
 
     // enqueue(Task&&) enqueues a new, unstarted task.
-    void enqueue(Task&& task);
+    void enqueue(Task&& task) EXCLUDES(work.mutex);
 
     // tryLock() attempts to lock the worker for task enqueing.
     // If the lock was successful then true is returned, and the caller must
     // call enqueueAndUnlock().
-    _When_(return == true, _Acquires_lock_(work.mutex))
-    bool tryLock();
+    bool tryLock() EXCLUDES(work.mutex) TRY_ACQUIRE(true, work.mutex);
 
     // enqueueAndUnlock() enqueues the task and unlocks the worker.
     // Must only be called after a call to tryLock() which returned true.
-    _Requires_lock_held_(work.mutex)
-    _Releases_lock_(work.mutex)
-    void enqueueAndUnlock(Task&& task);
+    // _Releases_lock_(work.mutex)
+    void enqueueAndUnlock(Task&& task) REQUIRES(work.mutex) RELEASE(work.mutex);
 
     // runUntilShutdown() processes all tasks and fibers until there are no more
     // and shutdown is true, upon runUntilShutdown() returns.
-    void runUntilShutdown();
+    void runUntilShutdown() REQUIRES(work.mutex);
 
     // steal() attempts to steal a Task from the worker for another worker.
     // Returns true if a task was taken and assigned to out, otherwise false.
-    bool steal(Task& out);
+    bool steal(Task& out) EXCLUDES(work.mutex);
 
     // getCurrent() returns the Worker currently bound to the current
     // thread.
@@ -371,27 +361,22 @@
    private:
     // run() is the task processing function for the worker.
     // run() processes tasks until stop() is called.
-    _Requires_lock_held_(work.mutex)
-    void run();
+    void run() REQUIRES(work.mutex);
 
     // createWorkerFiber() creates a new fiber that when executed calls
     // run().
-    _Requires_lock_held_(work.mutex)
-    Fiber* createWorkerFiber();
+    Fiber* createWorkerFiber() REQUIRES(work.mutex);
 
     // switchToFiber() switches execution to the given fiber. The fiber
     // must belong to this worker.
-    _Requires_lock_held_(work.mutex)
-    void switchToFiber(Fiber*);
+    void switchToFiber(Fiber*) REQUIRES(work.mutex);
 
     // runUntilIdle() executes all pending tasks and then returns.
-    _Requires_lock_held_(work.mutex)
-    void runUntilIdle();
+    void runUntilIdle() REQUIRES(work.mutex);
 
     // waitForWork() blocks until new work is available, potentially calling
     // spinForWork().
-    _Requires_lock_held_(work.mutex)
-    void waitForWork();
+    void waitForWork() REQUIRES(work.mutex);
 
     // spinForWork() attempts to steal work from another Worker, and keeps
     // the thread awake for a short duration. This reduces overheads of
@@ -400,31 +385,28 @@
 
     // enqueueFiberTimeouts() enqueues all the fibers that have finished
     // waiting.
-    _Requires_lock_held_(work.mutex)
-    void enqueueFiberTimeouts();
+    void enqueueFiberTimeouts() REQUIRES(work.mutex);
 
-    _Requires_lock_held_(work.mutex)
     inline void changeFiberState(Fiber* fiber,
                                  Fiber::State from,
-                                 Fiber::State to) const;
+                                 Fiber::State to) const REQUIRES(work.mutex);
 
-    _Requires_lock_held_(work.mutex)
-    inline void setFiberState(Fiber* fiber, Fiber::State to) const;
+    inline void setFiberState(Fiber* fiber, Fiber::State to) const
+        REQUIRES(work.mutex);
 
     // Work holds tasks and fibers that are enqueued on the Worker.
     struct Work {
       std::atomic<uint64_t> num = {0};  // tasks.size() + fibers.size()
-      _Guarded_by_(mutex) uint64_t numBlockedFibers = 0;
-      _Guarded_by_(mutex) TaskQueue tasks;
-      _Guarded_by_(mutex) FiberQueue fibers;
-      _Guarded_by_(mutex) WaitingFibers waiting;
-      _Guarded_by_(mutex) bool notifyAdded = true;
+      GUARDED_BY(mutex) uint64_t numBlockedFibers = 0;
+      GUARDED_BY(mutex) TaskQueue tasks;
+      GUARDED_BY(mutex) FiberQueue fibers;
+      GUARDED_BY(mutex) WaitingFibers waiting;
+      GUARDED_BY(mutex) bool notifyAdded = true;
       std::condition_variable added;
-      std::mutex mutex;
+      marl::mutex mutex;
 
-      _Requires_lock_held_(mutex)
       template <typename F>
-      inline void wait(F&&);
+      inline void wait(F&&) REQUIRES(mutex);
     };
 
     // https://en.wikipedia.org/wiki/Xorshift
@@ -472,7 +454,7 @@
   Allocator* const allocator;
 
   std::function<void()> threadInitFunc;
-  std::mutex threadInitFuncMutex;
+  mutex threadInitFuncMutex;
 
   std::array<std::atomic<int>, 8> spinningWorkers;
   std::atomic<unsigned int> nextSpinningWorkerIdx = {0x8000000};
@@ -484,17 +466,18 @@
   std::array<Worker*, MaxWorkerThreads> workerThreads;
 
   struct SingleThreadedWorkers {
-    std::mutex mutex;
-    std::condition_variable unbind;
-    std::unordered_map<std::thread::id, Allocator::unique_ptr<Worker>> byTid;
+    using WorkerByTid =
+        std::unordered_map<std::thread::id, Allocator::unique_ptr<Worker>>;
+    marl::mutex mutex;
+    GUARDED_BY(mutex) std::condition_variable unbind;
+    GUARDED_BY(mutex) WorkerByTid byTid;
   };
   SingleThreadedWorkers singleThreadedWorkers;
 };
 
-_Requires_lock_held_(lock)
 template <typename Clock, typename Duration>
 bool Scheduler::Fiber::wait(
-    Lock& lock,
+    marl::lock& lock,
     const std::chrono::time_point<Clock, Duration>& timeout,
     const Predicate& pred) {
   using ToDuration = typename TimePoint::duration;
diff --git a/include/marl/ticket.h b/include/marl/ticket.h
index 59a38fc..6aa21ed 100644
--- a/include/marl/ticket.h
+++ b/include/marl/ticket.h
@@ -105,14 +105,14 @@
     inline ~Record();
 
     inline void done();
-    inline void callAndUnlock(std::unique_lock<std::mutex>& lock);
+    inline void callAndUnlock(marl::lock& lock);
+    inline void unlink();  // guarded by shared->mutex
 
     ConditionVariable isCalledCondVar;
 
     std::shared_ptr<Shared> shared;
     Record* next = nullptr;  // guarded by shared->mutex
     Record* prev = nullptr;  // guarded by shared->mutex
-    inline void unlink();    // guarded by shared->mutex
     OnCall onCall;           // guarded by shared->mutex
     bool isCalled = false;   // guarded by shared->mutex
     std::atomic<bool> isDone = {false};
@@ -120,7 +120,7 @@
 
   // Data shared between all tickets and the queue.
   struct Shared {
-    std::mutex mutex;
+    marl::mutex mutex;
     Record tail;
   };
 
@@ -136,7 +136,7 @@
 Ticket::Ticket(Loan<Record>&& record) : record(std::move(record)) {}
 
 void Ticket::wait() const {
-  std::unique_lock<std::mutex> lock(record->shared->mutex);
+  marl::lock lock(record->shared->mutex);
   record->isCalledCondVar.wait(lock, [this] { return record->isCalled; });
 }
 
@@ -146,7 +146,7 @@
 
 template <typename Function>
 void Ticket::onCall(Function&& f) const {
-  std::unique_lock<std::mutex> lock(record->shared->mutex);
+  marl::lock lock(record->shared->mutex);
   if (record->isCalled) {
     marl::schedule(std::move(f));
     return;
@@ -192,7 +192,7 @@
     f(std::move(Ticket(std::move(rec))));
   });
   last->next = &shared->tail;
-  std::unique_lock<std::mutex> lock(shared->mutex);
+  marl::lock lock(shared->mutex);
   first->prev = shared->tail.prev;
   shared->tail.prev = last.get();
   if (first->prev == nullptr) {
@@ -216,7 +216,7 @@
   if (isDone.exchange(true)) {
     return;
   }
-  std::unique_lock<std::mutex> lock(shared->mutex);
+  marl::lock lock(shared->mutex);
   auto callNext = (prev == nullptr && next != nullptr) ? next : nullptr;
   unlink();
   if (callNext != nullptr) {
@@ -225,7 +225,7 @@
   }
 }
 
-void Ticket::Record::callAndUnlock(std::unique_lock<std::mutex>& lock) {
+void Ticket::Record::callAndUnlock(marl::lock& lock) {
   if (isCalled) {
     return;
   }
@@ -233,7 +233,7 @@
   OnCall callback;
   std::swap(callback, onCall);
   isCalledCondVar.notify_all();
-  lock.unlock();
+  lock.unlock_no_tsa();
 
   if (callback) {
     marl::schedule(std::move(callback));
diff --git a/include/marl/tsa.h b/include/marl/tsa.h
new file mode 100644
index 0000000..84d0623
--- /dev/null
+++ b/include/marl/tsa.h
@@ -0,0 +1,80 @@
+// Copyright 2020 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Stubs Thread-Safty-Analysis annotation macros for platforms that do not
+// support them.
+// See https://clang.llvm.org/docs/ThreadSafetyAnalysis.html
+
+#ifndef marl_tsa_h
+#define marl_tsa_h
+
+// Enable thread safety attributes only with clang.
+// The attributes can be safely erased when compiling with other compilers.
+#if defined(__clang__) && (!defined(SWIG))
+#define THREAD_ANNOTATION_ATTRIBUTE__(x) __attribute__((x))
+#else
+#define THREAD_ANNOTATION_ATTRIBUTE__(x)  // no-op
+#endif
+
+#define CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(capability(x))
+
+#define SCOPED_CAPABILITY THREAD_ANNOTATION_ATTRIBUTE__(scoped_lockable)
+
+#define GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(guarded_by(x))
+
+#define PT_GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(pt_guarded_by(x))
+
+#define ACQUIRED_BEFORE(...) \
+  THREAD_ANNOTATION_ATTRIBUTE__(acquired_before(__VA_ARGS__))
+
+#define ACQUIRED_AFTER(...) \
+  THREAD_ANNOTATION_ATTRIBUTE__(acquired_after(__VA_ARGS__))
+
+#define REQUIRES(...) \
+  THREAD_ANNOTATION_ATTRIBUTE__(requires_capability(__VA_ARGS__))
+
+#define REQUIRES_SHARED(...) \
+  THREAD_ANNOTATION_ATTRIBUTE__(requires_shared_capability(__VA_ARGS__))
+
+#define ACQUIRE(...) \
+  THREAD_ANNOTATION_ATTRIBUTE__(acquire_capability(__VA_ARGS__))
+
+#define ACQUIRE_SHARED(...) \
+  THREAD_ANNOTATION_ATTRIBUTE__(acquire_shared_capability(__VA_ARGS__))
+
+#define RELEASE(...) \
+  THREAD_ANNOTATION_ATTRIBUTE__(release_capability(__VA_ARGS__))
+
+#define RELEASE_SHARED(...) \
+  THREAD_ANNOTATION_ATTRIBUTE__(release_shared_capability(__VA_ARGS__))
+
+#define TRY_ACQUIRE(...) \
+  THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_capability(__VA_ARGS__))
+
+#define TRY_ACQUIRE_SHARED(...) \
+  THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_shared_capability(__VA_ARGS__))
+
+#define EXCLUDES(...) THREAD_ANNOTATION_ATTRIBUTE__(locks_excluded(__VA_ARGS__))
+
+#define ASSERT_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(assert_capability(x))
+
+#define ASSERT_SHARED_CAPABILITY(x) \
+  THREAD_ANNOTATION_ATTRIBUTE__(assert_shared_capability(x))
+
+#define RETURN_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(lock_returned(x))
+
+#define NO_THREAD_SAFETY_ANALYSIS \
+  THREAD_ANNOTATION_ATTRIBUTE__(no_thread_safety_analysis)
+
+#endif  // marl_tsa_h
diff --git a/include/marl/waitgroup.h b/include/marl/waitgroup.h
index 7adbb19..a53a446 100644
--- a/include/marl/waitgroup.h
+++ b/include/marl/waitgroup.h
@@ -70,7 +70,7 @@
 
     std::atomic<unsigned int> count = {0};
     ConditionVariable cv;
-    std::mutex mutex;
+    marl::mutex mutex;
   };
   const std::shared_ptr<Data> data;
 };
@@ -91,7 +91,7 @@
   MARL_ASSERT(data->count > 0, "marl::WaitGroup::done() called too many times");
   auto count = --data->count;
   if (count == 0) {
-    std::unique_lock<std::mutex> lock(data->mutex);
+    marl::lock lock(data->mutex);
     data->cv.notify_all();
     return true;
   }
@@ -99,7 +99,7 @@
 }
 
 void WaitGroup::wait() const {
-  std::unique_lock<std::mutex> lock(data->mutex);
+  marl::lock lock(data->mutex);
   data->cv.wait(lock, [this] { return data->count == 0; });
 }
 
diff --git a/src/conditionvariable_test.cpp b/src/conditionvariable_test.cpp
index 2930e4d..18ece7c 100644
--- a/src/conditionvariable_test.cpp
+++ b/src/conditionvariable_test.cpp
@@ -18,17 +18,16 @@
 #include "marl_test.h"
 
 #include <condition_variable>
-#include <mutex>
 
 TEST_F(WithoutBoundScheduler, ConditionVariable) {
   bool trigger[3] = {false, false, false};
   bool signal[3] = {false, false, false};
-  std::mutex mutex;
+  marl::mutex mutex;
   marl::ConditionVariable cv;
 
   std::thread thread([&] {
     for (int i = 0; i < 3; i++) {
-      std::unique_lock<std::mutex> lock(mutex);
+      marl::lock lock(mutex);
       cv.wait(lock, [&] {
         EXPECT_TRUE(lock.owns_lock());
         return trigger[i];
@@ -45,7 +44,7 @@
 
   for (int i = 0; i < 3; i++) {
     {
-      std::unique_lock<std::mutex> lock(mutex);
+      marl::lock lock(mutex);
       trigger[i] = true;
       cv.notify_one();
       cv.wait(lock, [&] {
@@ -66,12 +65,12 @@
 TEST_P(WithBoundScheduler, ConditionVariable) {
   bool trigger[3] = {false, false, false};
   bool signal[3] = {false, false, false};
-  std::mutex mutex;
+  marl::mutex mutex;
   marl::ConditionVariable cv;
 
   std::thread thread([&] {
     for (int i = 0; i < 3; i++) {
-      std::unique_lock<std::mutex> lock(mutex);
+      marl::lock lock(mutex);
       cv.wait(lock, [&] {
         EXPECT_TRUE(lock.owns_lock());
         return trigger[i];
@@ -88,7 +87,7 @@
 
   for (int i = 0; i < 3; i++) {
     {
-      std::unique_lock<std::mutex> lock(mutex);
+      marl::lock lock(mutex);
       trigger[i] = true;
       cv.notify_one();
       cv.wait(lock, [&] {
@@ -113,14 +112,14 @@
 // they are early-unblocked, along with expected lock state.
 TEST_P(WithBoundScheduler, ConditionVariableTimeouts) {
   for (int i = 0; i < 10; i++) {
-    std::mutex mutex;
+    marl::mutex mutex;
     marl::ConditionVariable cv;
     bool signaled = false;  // guarded by mutex
     auto wg = marl::WaitGroup(100);
     for (int j = 0; j < 100; j++) {
       marl::schedule([=, &mutex, &cv, &signaled] {
         {
-          std::unique_lock<std::mutex> lock(mutex);
+          marl::lock lock(mutex);
           cv.wait_for(lock, std::chrono::milliseconds(j), [&] {
             EXPECT_TRUE(lock.owns_lock());
             return signaled;
@@ -134,7 +133,7 @@
     }
     std::this_thread::sleep_for(std::chrono::milliseconds(50));
     {
-      std::unique_lock<std::mutex> lock(mutex);
+      marl::lock lock(mutex);
       signaled = true;
       cv.notify_all();
     }
diff --git a/src/scheduler.cpp b/src/scheduler.cpp
index c97d9ec..533892a 100644
--- a/src/scheduler.cpp
+++ b/src/scheduler.cpp
@@ -99,7 +99,7 @@
   MARL_ASSERT(bound == nullptr, "Scheduler already bound");
   bound = this;
   {
-    std::unique_lock<std::mutex> lock(singleThreadedWorkers.mutex);
+    marl::lock lock(singleThreadedWorkers.mutex);
     auto worker =
         allocator->make_unique<Worker>(this, Worker::Mode::SingleThreaded, -1);
     worker->start();
@@ -113,7 +113,7 @@
   auto worker = Scheduler::Worker::getCurrent();
   worker->stop();
   {
-    std::unique_lock<std::mutex> lock(bound->singleThreadedWorkers.mutex);
+    marl::lock lock(bound->singleThreadedWorkers.mutex);
     auto tid = std::this_thread::get_id();
     auto it = bound->singleThreadedWorkers.byTid.find(tid);
     MARL_ASSERT(it != bound->singleThreadedWorkers.byTid.end(),
@@ -137,9 +137,11 @@
 Scheduler::~Scheduler() {
   {
     // Wait until all the single threaded workers have been unbound.
-    std::unique_lock<std::mutex> lock(singleThreadedWorkers.mutex);
-    singleThreadedWorkers.unbind.wait(
-        lock, [this] { return singleThreadedWorkers.byTid.size() == 0; });
+    marl::lock lock(singleThreadedWorkers.mutex);
+    lock.wait(singleThreadedWorkers.unbind,
+              [this]() REQUIRES(singleThreadedWorkers.mutex) {
+                return singleThreadedWorkers.byTid.size() == 0;
+              });
   }
 
   // Release all worker threads.
@@ -148,12 +150,12 @@
 }
 
 void Scheduler::setThreadInitializer(const std::function<void()>& func) {
-  std::unique_lock<std::mutex> lock(threadInitFuncMutex);
+  marl::lock lock(threadInitFuncMutex);
   threadInitFunc = func;
 }
 
 const std::function<void()>& Scheduler::getThreadInitializer() {
-  std::unique_lock<std::mutex> lock(threadInitFuncMutex);
+  marl::lock lock(threadInitFuncMutex);
   return threadInitFunc;
 }
 
@@ -251,7 +253,7 @@
   worker->enqueue(this);
 }
 
-void Scheduler::Fiber::wait(Lock& lock, const Predicate& pred) {
+void Scheduler::Fiber::wait(marl::lock& lock, const Predicate& pred) {
   worker->wait(lock, nullptr, pred);
 }
 
@@ -374,7 +376,7 @@
         mainFiber = Fiber::createFromCurrentThread(scheduler->allocator, 0);
         currentFiber = mainFiber.get();
         {
-          std::unique_lock<std::mutex> lock(work.mutex);
+          marl::lock lock(work.mutex);
           run();
         }
         mainFiber.reset();
@@ -401,7 +403,7 @@
       break;
     }
     case Mode::SingleThreaded: {
-      std::unique_lock<std::mutex> lock(work.mutex);
+      marl::lock lock(work.mutex);
       shutdown = true;
       runUntilShutdown();
       Worker::current = nullptr;
@@ -415,14 +417,13 @@
 bool Scheduler::Worker::wait(const TimePoint* timeout) {
   DBG_LOG("%d: WAIT(%d)", (int)id, (int)currentFiber->id);
   {
-    std::unique_lock<std::mutex> lock(work.mutex);
+    marl::lock lock(work.mutex);
     suspend(timeout);
   }
   return timeout == nullptr || std::chrono::system_clock::now() < *timeout;
 }
 
-_Requires_lock_held_(waitLock)
-bool Scheduler::Worker::wait(Fiber::Lock& waitLock,
+bool Scheduler::Worker::wait(lock& waitLock,
                              const TimePoint* timeout,
                              const Predicate& pred) {
   DBG_LOG("%d: WAIT(%d)", (int)id, (int)currentFiber->id);
@@ -435,7 +436,7 @@
     // enqueued (via Fiber::notify()) between the waitLock.unlock() and fiber
     // switch, otherwise the Fiber::notify() call may be ignored and the fiber
     // is never woken.
-    waitLock.unlock();
+    waitLock.unlock_no_tsa();
 
     // suspend the fiber.
     suspend(timeout);
@@ -444,7 +445,7 @@
     work.mutex.unlock();
 
     // Re-lock to either return due to timeout, or call pred().
-    waitLock.lock();
+    waitLock.lock_no_tsa();
 
     // Check timeout.
     if (timeout != nullptr && std::chrono::system_clock::now() >= *timeout) {
@@ -456,7 +457,6 @@
   return true;
 }
 
-_Requires_lock_held_(work.mutex)
 void Scheduler::Worker::suspend(
     const std::chrono::system_clock::time_point* timeout) {
   // Current fiber is yielding as it is blocked.
@@ -496,33 +496,34 @@
   setFiberState(currentFiber, Fiber::State::Running);
 }
 
-_When_(return == true, _Acquires_lock_(work.mutex))
 bool Scheduler::Worker::tryLock() {
   return work.mutex.try_lock();
 }
 
 void Scheduler::Worker::enqueue(Fiber* fiber) {
-  std::unique_lock<std::mutex> lock(work.mutex);
-  DBG_LOG("%d: ENQUEUE(%d %s)", (int)id, (int)fiber->id,
-          Fiber::toString(fiber->state));
-  switch (fiber->state) {
-    case Fiber::State::Running:
-    case Fiber::State::Queued:
-      return;  // Nothing to do here - task is already queued or running.
-    case Fiber::State::Waiting:
-      work.waiting.erase(fiber);
-      break;
-    case Fiber::State::Idle:
-    case Fiber::State::Yielded:
-      break;
+  bool notify = false;
+  {
+    marl::lock lock(work.mutex);
+    DBG_LOG("%d: ENQUEUE(%d %s)", (int)id, (int)fiber->id,
+            Fiber::toString(fiber->state));
+    switch (fiber->state) {
+      case Fiber::State::Running:
+      case Fiber::State::Queued:
+        return;  // Nothing to do here - task is already queued or running.
+      case Fiber::State::Waiting:
+        work.waiting.erase(fiber);
+        break;
+      case Fiber::State::Idle:
+      case Fiber::State::Yielded:
+        break;
+    }
+    notify = work.notifyAdded;
+    work.fibers.push_back(std::move(fiber));
+    MARL_ASSERT(!work.waiting.contains(fiber),
+                "fiber is unexpectedly in the waiting list");
+    setFiberState(fiber, Fiber::State::Queued);
+    work.num++;
   }
-  bool notify = work.notifyAdded;
-  work.fibers.push_back(std::move(fiber));
-  MARL_ASSERT(!work.waiting.contains(fiber),
-              "fiber is unexpectedly in the waiting list");
-  setFiberState(fiber, Fiber::State::Queued);
-  work.num++;
-  lock.unlock();
 
   if (notify) {
     work.added.notify_one();
@@ -534,8 +535,6 @@
   enqueueAndUnlock(std::move(task));
 }
 
-_Requires_lock_held_(work.mutex)
-_Releases_lock_(work.mutex)
 void Scheduler::Worker::enqueueAndUnlock(Task&& task) {
   auto notify = work.notifyAdded;
   work.tasks.push_back(std::move(task));
@@ -564,21 +563,21 @@
   return true;
 }
 
-_Requires_lock_held_(work.mutex)
 void Scheduler::Worker::run() {
   if (mode == Mode::MultiThreaded) {
     MARL_NAME_THREAD("Thread<%.2d> Fiber<%.2d>", int(id), Fiber::current()->id);
     // This is the entry point for a multi-threaded worker.
     // Start with a regular condition-variable wait for work. This avoids
     // starting the thread with a spinForWork().
-    work.wait([this] { return work.num > 0 || work.waiting || shutdown; });
+    work.wait([this]() REQUIRES(work.mutex) {
+      return work.num > 0 || work.waiting || shutdown;
+    });
   }
   ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
   runUntilShutdown();
   switchToFiber(mainFiber.get());
 }
 
-_Requires_lock_held_(work.mutex)
 void Scheduler::Worker::runUntilShutdown() {
   while (!shutdown || work.num > 0 || work.numBlockedFibers > 0U) {
     waitForWork();
@@ -586,7 +585,6 @@
   }
 }
 
-_Requires_lock_held_(work.mutex)
 void Scheduler::Worker::waitForWork() {
   MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(),
               "work.num out of sync");
@@ -601,7 +599,7 @@
     work.mutex.lock();
   }
 
-  work.wait([this] {
+  work.wait([this]() REQUIRES(work.mutex) {
     return work.num > 0 || (shutdown && work.numBlockedFibers == 0U);
   });
   if (work.waiting) {
@@ -609,7 +607,6 @@
   }
 }
 
-_Requires_lock_held_(work.mutex)
 void Scheduler::Worker::enqueueFiberTimeouts() {
   auto now = std::chrono::system_clock::now();
   while (auto fiber = work.waiting.take(now)) {
@@ -620,7 +617,6 @@
   }
 }
 
-_Requires_lock_held_(work.mutex)
 void Scheduler::Worker::changeFiberState(Fiber* fiber,
                                          Fiber::State from,
                                          Fiber::State to) const {
@@ -631,7 +627,6 @@
   fiber->state = to;
 }
 
-_Requires_lock_held_(work.mutex)
 void Scheduler::Worker::setFiberState(Fiber* fiber, Fiber::State to) const {
   DBG_LOG("%d: SET_FIBER_STATE(%d %s -> %s)", (int)id, (int)fiber->id,
           Fiber::toString(fiber->state), Fiber::toString(to));
@@ -659,7 +654,7 @@
     }
 
     if (scheduler->stealWork(this, rng(), stolen)) {
-      std::unique_lock<std::mutex> lock(work.mutex);
+      marl::lock lock(work.mutex);
       work.tasks.emplace_back(std::move(stolen));
       work.num++;
       return;
@@ -669,7 +664,6 @@
   }
 }
 
-_Requires_lock_held_(work.mutex)
 void Scheduler::Worker::runUntilIdle() {
   ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
   MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(),
@@ -713,18 +707,16 @@
   }
 }
 
-_Requires_lock_held_(work.mutex)
 Scheduler::Fiber* Scheduler::Worker::createWorkerFiber() {
   auto fiberId = static_cast<uint32_t>(workerFibers.size() + 1);
   DBG_LOG("%d: CREATE(%d)", (int)id, (int)fiberId);
   auto fiber = Fiber::create(scheduler->allocator, fiberId, FiberStackSize,
-                             [&] { run(); });
+                             [&]() REQUIRES(work.mutex) { run(); });
   auto ptr = fiber.get();
   workerFibers.push_back(std::move(fiber));
   return ptr;
 }
 
-_Requires_lock_held_(work.mutex)
 void Scheduler::Worker::switchToFiber(Fiber* to) {
   DBG_LOG("%d: SWITCH(%d -> %d)", (int)id, (int)currentFiber->id, (int)to->id);
   MARL_ASSERT(to == mainFiber.get() || idleFibers.count(to) == 0,
@@ -737,18 +729,15 @@
 ////////////////////////////////////////////////////////////////////////////////
 // Scheduler::Worker::Work
 ////////////////////////////////////////////////////////////////////////////////
-_Requires_lock_held_(mutex)
 template <typename F>
 void Scheduler::Worker::Work::wait(F&& f) {
-  std::unique_lock<std::mutex> lock(mutex, std::adopt_lock);
   notifyAdded = true;
   if (waiting) {
-    added.wait_until(lock, waiting.next(), f);
+    mutex.wait_until_locked(added, waiting.next(), f);
   } else {
-    added.wait(lock, f);
+    mutex.wait_locked(added, f);
   }
   notifyAdded = false;
-  lock.release();  // Keep the lock held.
 }
 
 }  // namespace marl