Squashed 'third_party/marl/' changes from 16e1dc37c..539094011
539094011 CMake: Export MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED
c7f70ba7a CMake: Bump min version + include(CheckCXXSourceCompiles)
658a204fc Replace SAL annotations with clang's TSA annotations
9630bec2f Fix CMake warning: "Policy CMP0023 is not set"
9f369ad5d Update yarn:: to marl:: in an example
git-subtree-dir: third_party/marl
git-subtree-split: 53909401165022553ed9d1f0c572178559dc25ec
diff --git a/.clang-format b/.clang-format
index b0823b4..08178fe 100644
--- a/.clang-format
+++ b/.clang-format
@@ -3,8 +3,3 @@
---
Language: Cpp
-StatementMacros:
- - _Acquires_lock_
- - _Releases_lock_
- - _Requires_lock_held_
- - _When_
\ No newline at end of file
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2a8a695..005b855 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -12,12 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-cmake_minimum_required(VERSION 2.8)
+cmake_minimum_required(VERSION 3.0)
-set (CMAKE_CXX_STANDARD 11)
+set(CMAKE_CXX_STANDARD 11)
project(Marl C CXX ASM)
+include(CheckCXXSourceCompiles)
+
###########################################################
# Options
###########################################################
@@ -76,6 +78,32 @@
endif(MARL_BUILD_BENCHMARKS)
###########################################################
+# Compiler feature tests
+###########################################################
+# Check that the Clang Thread Safety Analysis' try_acquire_capability behaves
+# correctly. This is broken on some earlier versions of clang.
+# See: https://bugs.llvm.org/show_bug.cgi?id=32954
+set(SAVE_CMAKE_REQUIRED_FLAGS ${CMAKE_REQUIRED_FLAGS})
+set(CMAKE_REQUIRED_FLAGS "-Wthread-safety -Werror")
+check_cxx_source_compiles(
+ "int main() {
+ struct __attribute__((capability(\"mutex\"))) Mutex {
+ void Unlock() __attribute__((release_capability)) {};
+ bool TryLock() __attribute__((try_acquire_capability(true))) { return true; };
+ };
+ Mutex m;
+ if (m.TryLock()) {
+ m.Unlock(); // Should not warn.
+ }
+ return 0;
+ }"
+ MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED)
+set(CMAKE_REQUIRED_FLAGS ${SAVE_CMAKE_REQUIRED_FLAGS})
+
+# Export MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED as this may be useful to parent projects
+set(MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED PARENT_SCOPE ${MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED})
+
+###########################################################
# File lists
###########################################################
set(MARL_LIST
@@ -111,6 +139,10 @@
# Functions
###########################################################
function(marl_set_target_options target)
+ if (MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED)
+ target_compile_options(${target} PRIVATE "-Wthread-safety")
+ endif()
+
# Enable all warnings
if(MSVC)
target_compile_options(${target} PRIVATE
@@ -137,13 +169,13 @@
if(MARL_ASAN)
target_compile_options(${target} PUBLIC "-fsanitize=address")
- target_link_libraries(${target} "-fsanitize=address")
+ target_link_libraries(${target} PUBLIC "-fsanitize=address")
elseif(MARL_MSAN)
target_compile_options(${target} PUBLIC "-fsanitize=memory")
- target_link_libraries(${target} "-fsanitize=memory")
+ target_link_libraries(${target} PUBLIC "-fsanitize=memory")
elseif(MARL_TSAN)
target_compile_options(${target} PUBLIC "-fsanitize=thread")
- target_link_libraries(${target} "-fsanitize=thread")
+ target_link_libraries(${target} PUBLIC "-fsanitize=thread")
endif()
target_include_directories(${target} PUBLIC $<BUILD_INTERFACE:${MARL_INCLUDE_DIR}>)
@@ -232,7 +264,7 @@
marl_set_target_options(marl-unittests)
- target_link_libraries(marl-unittests marl)
+ target_link_libraries(marl-unittests PRIVATE marl)
endif(MARL_BUILD_TESTS)
# benchmarks
@@ -252,7 +284,7 @@
marl_set_target_options(marl-benchmarks)
- target_link_libraries(marl-benchmarks benchmark::benchmark marl)
+ target_link_libraries(marl-benchmarks PRIVATE benchmark::benchmark marl)
endif(MARL_BUILD_BENCHMARKS)
# examples
@@ -263,7 +295,7 @@
FOLDER "Examples"
)
marl_set_target_options(${target})
- target_link_libraries(${target} marl)
+ target_link_libraries(${target} PRIVATE marl)
endfunction(build_example)
build_example(fractal)
diff --git a/include/marl/blockingcall.h b/include/marl/blockingcall.h
index 983ee82..1ef0277 100644
--- a/include/marl/blockingcall.h
+++ b/include/marl/blockingcall.h
@@ -85,10 +85,10 @@
// void runABlockingFunctionOnATask()
// {
// // Schedule a task that calls a blocking, non-yielding function.
-// yarn::schedule([=] {
+// marl::schedule([=] {
// // call_blocking_function() may block indefinitely.
// // Ensure this call does not block other tasks from running.
-// auto result = yarn::blocking_call(call_blocking_function);
+// auto result = marl::blocking_call(call_blocking_function);
// // call_blocking_function() has now returned.
// // result holds the return value of the blocking function call.
// });
diff --git a/include/marl/conditionvariable.h b/include/marl/conditionvariable.h
index 8dfb9c7..5a57db2 100644
--- a/include/marl/conditionvariable.h
+++ b/include/marl/conditionvariable.h
@@ -19,11 +19,12 @@
#include "debug.h"
#include "defer.h"
#include "memory.h"
+#include "mutex.h"
#include "scheduler.h"
+#include "tsa.h"
#include <atomic>
#include <condition_variable>
-#include <mutex>
namespace marl {
@@ -47,14 +48,14 @@
// wait() blocks the current fiber or thread until the predicate is satisfied
// and the ConditionVariable is notified.
template <typename Predicate>
- inline void wait(std::unique_lock<std::mutex>& lock, Predicate&& pred);
+ inline void wait(marl::lock& lock, Predicate&& pred);
// wait_for() blocks the current fiber or thread until the predicate is
// satisfied, and the ConditionVariable is notified, or the timeout has been
// reached. Returns false if pred still evaluates to false after the timeout
// has been reached, otherwise true.
template <typename Rep, typename Period, typename Predicate>
- bool wait_for(std::unique_lock<std::mutex>& lock,
+ bool wait_for(marl::lock& lock,
const std::chrono::duration<Rep, Period>& duration,
Predicate&& pred);
@@ -63,7 +64,7 @@
// reached. Returns false if pred still evaluates to false after the timeout
// has been reached, otherwise true.
template <typename Clock, typename Duration, typename Predicate>
- bool wait_until(std::unique_lock<std::mutex>& lock,
+ bool wait_until(marl::lock& lock,
const std::chrono::time_point<Clock, Duration>& timeout,
Predicate&& pred);
@@ -73,7 +74,7 @@
ConditionVariable& operator=(const ConditionVariable&) = delete;
ConditionVariable& operator=(ConditionVariable&&) = delete;
- std::mutex mutex;
+ marl::mutex mutex;
containers::list<Scheduler::Fiber*> waiting;
std::condition_variable condition;
std::atomic<int> numWaiting = {0};
@@ -89,7 +90,7 @@
return;
}
{
- std::unique_lock<std::mutex> lock(mutex);
+ marl::lock lock(mutex);
for (auto fiber : waiting) {
fiber->notify();
}
@@ -104,7 +105,7 @@
return;
}
{
- std::unique_lock<std::mutex> lock(mutex);
+ marl::lock lock(mutex);
for (auto fiber : waiting) {
fiber->notify();
}
@@ -115,8 +116,7 @@
}
template <typename Predicate>
-void ConditionVariable::wait(std::unique_lock<std::mutex>& lock,
- Predicate&& pred) {
+void ConditionVariable::wait(marl::lock& lock, Predicate&& pred) {
if (pred()) {
return;
}
@@ -137,7 +137,7 @@
// Currently running outside of the scheduler.
// Delegate to the std::condition_variable.
numWaitingOnCondition++;
- condition.wait(lock, pred);
+ lock.wait(condition, pred);
numWaitingOnCondition--;
}
numWaiting--;
@@ -145,7 +145,7 @@
template <typename Rep, typename Period, typename Predicate>
bool ConditionVariable::wait_for(
- std::unique_lock<std::mutex>& lock,
+ marl::lock& lock,
const std::chrono::duration<Rep, Period>& duration,
Predicate&& pred) {
return wait_until(lock, std::chrono::system_clock::now() + duration, pred);
@@ -153,7 +153,7 @@
template <typename Clock, typename Duration, typename Predicate>
bool ConditionVariable::wait_until(
- std::unique_lock<std::mutex>& lock,
+ marl::lock& lock,
const std::chrono::time_point<Clock, Duration>& timeout,
Predicate&& pred) {
if (pred()) {
@@ -181,7 +181,7 @@
// Delegate to the std::condition_variable.
numWaitingOnCondition++;
defer(numWaitingOnCondition--);
- return condition.wait_until(lock, timeout, pred);
+ return lock.wait_until(condition, timeout, pred);
}
}
diff --git a/include/marl/event.h b/include/marl/event.h
index f7b3023..bac6078 100644
--- a/include/marl/event.h
+++ b/include/marl/event.h
@@ -113,7 +113,7 @@
inline bool wait_until(
const std::chrono::time_point<Clock, Duration>& timeout);
- std::mutex mutex;
+ marl::mutex mutex;
ConditionVariable cv;
const Mode mode;
bool signalled;
@@ -127,7 +127,7 @@
: cv(allocator), mode(mode), signalled(initialState) {}
void Event::Shared::signal() {
- std::unique_lock<std::mutex> lock(mutex);
+ marl::lock lock(mutex);
if (signalled) {
return;
}
@@ -143,7 +143,7 @@
}
void Event::Shared::wait() {
- std::unique_lock<std::mutex> lock(mutex);
+ marl::lock lock(mutex);
cv.wait(lock, [&] { return signalled; });
if (mode == Mode::Auto) {
signalled = false;
@@ -153,7 +153,7 @@
template <typename Rep, typename Period>
bool Event::Shared::wait_for(
const std::chrono::duration<Rep, Period>& duration) {
- std::unique_lock<std::mutex> lock(mutex);
+ marl::lock lock(mutex);
if (!cv.wait_for(lock, duration, [&] { return signalled; })) {
return false;
}
@@ -166,7 +166,7 @@
template <typename Clock, typename Duration>
bool Event::Shared::wait_until(
const std::chrono::time_point<Clock, Duration>& timeout) {
- std::unique_lock<std::mutex> lock(mutex);
+ marl::lock lock(mutex);
if (!cv.wait_until(lock, timeout, [&] { return signalled; })) {
return false;
}
@@ -186,7 +186,7 @@
}
void Event::clear() const {
- std::unique_lock<std::mutex> lock(shared->mutex);
+ marl::lock lock(shared->mutex);
shared->signalled = false;
}
@@ -206,7 +206,7 @@
}
bool Event::test() const {
- std::unique_lock<std::mutex> lock(shared->mutex);
+ marl::lock lock(shared->mutex);
if (!shared->signalled) {
return false;
}
@@ -217,7 +217,7 @@
}
bool Event::isSignalled() const {
- std::unique_lock<std::mutex> lock(shared->mutex);
+ marl::lock lock(shared->mutex);
return shared->signalled;
}
@@ -226,7 +226,7 @@
Event any(mode, false);
for (auto it = begin; it != end; it++) {
auto s = it->shared;
- std::unique_lock<std::mutex> lock(s->mutex);
+ marl::lock lock(s->mutex);
if (s->signalled) {
any.signal();
}
diff --git a/include/marl/mutex.h b/include/marl/mutex.h
new file mode 100644
index 0000000..72ecaf7
--- /dev/null
+++ b/include/marl/mutex.h
@@ -0,0 +1,106 @@
+// Copyright 2020 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Wrappers around std::mutex and std::unique_lock that provide clang's
+// Thread Safety Analysis annotations.
+// See: https://clang.llvm.org/docs/ThreadSafetyAnalysis.html
+
+#ifndef marl_mutex_h
+#define marl_mutex_h
+
+#include "tsa.h"
+
+#include <condition_variable>
+#include <mutex>
+
+namespace marl {
+
+// mutex is a wrapper around std::mutex that offers Thread Safety Analysis
+// annotations.
+// mutex also holds methods for performing std::condition_variable::wait() calls
+// as these require a std::unique_lock<> which are unsupported by the TSA.
+class CAPABILITY("mutex") mutex {
+ public:
+ inline void lock() ACQUIRE() { _.lock(); }
+
+ inline void unlock() RELEASE() { _.unlock(); }
+
+ inline bool try_lock() TRY_ACQUIRE(true) { return _.try_lock(); }
+
+ // wait_locked calls cv.wait() on this already locked mutex.
+ template <typename Predicate>
+ inline void wait_locked(std::condition_variable& cv, Predicate&& p)
+ REQUIRES(this) {
+ std::unique_lock<std::mutex> lock(_, std::adopt_lock);
+ cv.wait(lock, std::forward<Predicate>(p));
+ lock.release(); // Keep lock held.
+ }
+
+ // wait_until_locked calls cv.wait() on this already locked mutex.
+ template <typename Predicate, typename Time>
+ inline bool wait_until_locked(std::condition_variable& cv,
+ Time&& time,
+ Predicate&& p) REQUIRES(this) {
+ std::unique_lock<std::mutex> lock(_, std::adopt_lock);
+ auto res = cv.wait_until(lock, std::forward<Time>(time),
+ std::forward<Predicate>(p));
+ lock.release(); // Keep lock held.
+ return res;
+ }
+
+ private:
+ friend class lock;
+ std::mutex _;
+};
+
+// lock is a RAII lock helper that offers Thread Safety Analysis annotations.
+// lock also holds methods for performing std::condition_variable::wait()
+// calls as these require a std::unique_lock<> which are unsupported by the TSA.
+class SCOPED_CAPABILITY lock {
+ public:
+ inline lock(mutex& m) ACQUIRE(m) : _(m._) {}
+ inline ~lock() RELEASE() {}
+
+ // wait calls cv.wait() on this lock.
+ template <typename Predicate>
+ inline void wait(std::condition_variable& cv, Predicate&& p) {
+ cv.wait(_, std::forward<Predicate>(p));
+ }
+
+ // wait_until calls cv.wait() on this lock.
+ template <typename Predicate, typename Time>
+ inline bool wait_until(std::condition_variable& cv,
+ Time&& time,
+ Predicate&& p) {
+ return cv.wait_until(_, std::forward<Time>(time),
+ std::forward<Predicate>(p));
+ }
+
+ inline bool owns_lock() const { return _.owns_lock(); }
+
+ // lock_no_tsa locks the mutex outside of the visiblity of the thread
+ // safety analysis. Use with caution.
+ inline void lock_no_tsa() { _.lock(); }
+
+ // unlock_no_tsa unlocks the mutex outside of the visiblity of the thread
+ // safety analysis. Use with caution.
+ inline void unlock_no_tsa() { _.unlock(); }
+
+ private:
+ std::unique_lock<std::mutex> _;
+};
+
+} // namespace marl
+
+#endif // marl_mutex_h
diff --git a/include/marl/pool.h b/include/marl/pool.h
index 934fa40..393b457 100644
--- a/include/marl/pool.h
+++ b/include/marl/pool.h
@@ -17,9 +17,9 @@
#include "conditionvariable.h"
#include "memory.h"
+#include "mutex.h"
#include <atomic>
-#include <mutex>
namespace marl {
@@ -237,7 +237,7 @@
inline void return_(Item*) override;
Item items[N];
- std::mutex mutex;
+ marl::mutex mutex;
ConditionVariable returned;
Item* free = nullptr;
};
@@ -281,7 +281,7 @@
template <typename T, int N, PoolPolicy POLICY>
template <typename F>
void BoundedPool<T, N, POLICY>::borrow(size_t n, const F& f) const {
- std::unique_lock<std::mutex> lock(storage->mutex);
+ marl::lock lock(storage->mutex);
for (size_t i = 0; i < n; i++) {
storage->returned.wait(lock, [&] { return storage->free != nullptr; });
auto item = storage->free;
@@ -296,14 +296,16 @@
template <typename T, int N, PoolPolicy POLICY>
std::pair<typename BoundedPool<T, N, POLICY>::Loan, bool>
BoundedPool<T, N, POLICY>::tryBorrow() const {
- std::unique_lock<std::mutex> lock(storage->mutex);
- if (storage->free == nullptr) {
- return std::make_pair(Loan(), false);
+ Item* item = nullptr;
+ {
+ marl::lock lock(storage->mutex);
+ if (storage->free == nullptr) {
+ return std::make_pair(Loan(), false);
+ }
+ item = storage->free;
+ storage->free = storage->free->next;
+ item->pool = this;
}
- auto item = storage->free;
- storage->free = storage->free->next;
- item->pool = this;
- lock.unlock();
if (POLICY == PoolPolicy::Reconstruct) {
item->construct();
}
@@ -315,10 +317,11 @@
if (POLICY == PoolPolicy::Reconstruct) {
item->destruct();
}
- std::unique_lock<std::mutex> lock(mutex);
- item->next = free;
- free = item;
- lock.unlock();
+ {
+ marl::lock lock(mutex);
+ item->next = free;
+ free = item;
+ }
returned.notify_one();
}
@@ -359,7 +362,7 @@
inline void return_(Item*) override;
Allocator* allocator;
- std::mutex mutex;
+ marl::mutex mutex;
std::vector<Item*> items;
Item* free = nullptr;
};
@@ -398,7 +401,7 @@
template <typename T, PoolPolicy POLICY>
template <typename F>
inline void UnboundedPool<T, POLICY>::borrow(size_t n, const F& f) const {
- std::unique_lock<std::mutex> lock(storage->mutex);
+ marl::lock lock(storage->mutex);
for (size_t i = 0; i < n; i++) {
if (storage->free == nullptr) {
auto count = std::max<size_t>(storage->items.size(), 32);
@@ -427,10 +430,9 @@
if (POLICY == PoolPolicy::Reconstruct) {
item->destruct();
}
- std::unique_lock<std::mutex> lock(mutex);
+ marl::lock lock(mutex);
item->next = free;
free = item;
- lock.unlock();
}
} // namespace marl
diff --git a/include/marl/sal.h b/include/marl/sal.h
deleted file mode 100644
index de47a49..0000000
--- a/include/marl/sal.h
+++ /dev/null
@@ -1,42 +0,0 @@
-// Copyright 2019 The Marl Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Stubs SAL annotation macros for platforms that do not support them.
-// See
-// https://docs.microsoft.com/en-us/visualstudio/code-quality/annotating-locking-behavior?view=vs-2019
-
-#ifndef marl_sal_h
-#define marl_sal_h
-
-#ifndef _Acquires_lock_
-#define _Acquires_lock_(...)
-#endif
-
-#ifndef _Guarded_by_
-#define _Guarded_by_(...)
-#endif
-
-#ifndef _Releases_lock_
-#define _Releases_lock_(...)
-#endif
-
-#ifndef _Requires_lock_held_
-#define _Requires_lock_held_(...)
-#endif
-
-#ifndef _When_
-#define _When_(...)
-#endif
-
-#endif // marl_sal_h
diff --git a/include/marl/scheduler.h b/include/marl/scheduler.h
index b96e147..4db4b4d 100644
--- a/include/marl/scheduler.h
+++ b/include/marl/scheduler.h
@@ -17,7 +17,7 @@
#include "debug.h"
#include "memory.h"
-#include "sal.h"
+#include "mutex.h"
#include "task.h"
#include "thread.h"
@@ -28,7 +28,6 @@
#include <deque>
#include <functional>
#include <map>
-#include <mutex>
#include <set>
#include <thread>
#include <unordered_map>
@@ -104,8 +103,6 @@
// thread that previously executed it.
class Fiber {
public:
- using Lock = std::unique_lock<std::mutex>;
-
// current() returns the currently executing fiber, or nullptr if called
// without a bound scheduler.
static Fiber* current();
@@ -122,8 +119,7 @@
// will be locked before wait() returns.
// pred will be always be called with the lock held.
// wait() must only be called on the currently executing fiber.
- _Requires_lock_held_(lock)
- void wait(Lock& lock, const Predicate& pred);
+ void wait(marl::lock& lock, const Predicate& pred);
// wait() suspends execution of this Fiber until the Fiber is woken up with
// a call to notify() and the predicate pred returns true, or sometime after
@@ -139,9 +135,8 @@
// will be locked before wait() returns.
// pred will be always be called with the lock held.
// wait() must only be called on the currently executing fiber.
- _Requires_lock_held_(lock)
template <typename Clock, typename Duration>
- inline bool wait(Lock& lock,
+ inline bool wait(marl::lock& lock,
const std::chrono::time_point<Clock, Duration>& timeout,
const Predicate& pred);
@@ -307,56 +302,51 @@
Worker(Scheduler* scheduler, Mode mode, uint32_t id);
// start() begins execution of the worker.
- void start();
+ void start() EXCLUDES(work.mutex);
// stop() ceases execution of the worker, blocking until all pending
// tasks have fully finished.
- void stop();
+ void stop() EXCLUDES(work.mutex);
// wait() suspends execution of the current task until the predicate pred
// returns true or the optional timeout is reached.
// See Fiber::wait() for more information.
- _Requires_lock_held_(lock)
- bool wait(Fiber::Lock& lock,
- const TimePoint* timeout,
- const Predicate& pred);
+ bool wait(marl::lock& lock, const TimePoint* timeout, const Predicate& pred)
+ EXCLUDES(work.mutex);
// wait() suspends execution of the current task until the fiber is
// notified, or the optional timeout is reached.
// See Fiber::wait() for more information.
- bool wait(const TimePoint* timeout);
+ bool wait(const TimePoint* timeout) EXCLUDES(work.mutex);
// suspend() suspends the currenetly executing Fiber until the fiber is
// woken with a call to enqueue(Fiber*), or automatically sometime after the
// optional timeout.
- _Requires_lock_held_(work.mutex)
- void suspend(const TimePoint* timeout);
+ void suspend(const TimePoint* timeout) REQUIRES(work.mutex);
// enqueue(Fiber*) enqueues resuming of a suspended fiber.
- void enqueue(Fiber* fiber);
+ void enqueue(Fiber* fiber) EXCLUDES(work.mutex);
// enqueue(Task&&) enqueues a new, unstarted task.
- void enqueue(Task&& task);
+ void enqueue(Task&& task) EXCLUDES(work.mutex);
// tryLock() attempts to lock the worker for task enqueing.
// If the lock was successful then true is returned, and the caller must
// call enqueueAndUnlock().
- _When_(return == true, _Acquires_lock_(work.mutex))
- bool tryLock();
+ bool tryLock() EXCLUDES(work.mutex) TRY_ACQUIRE(true, work.mutex);
// enqueueAndUnlock() enqueues the task and unlocks the worker.
// Must only be called after a call to tryLock() which returned true.
- _Requires_lock_held_(work.mutex)
- _Releases_lock_(work.mutex)
- void enqueueAndUnlock(Task&& task);
+ // _Releases_lock_(work.mutex)
+ void enqueueAndUnlock(Task&& task) REQUIRES(work.mutex) RELEASE(work.mutex);
// runUntilShutdown() processes all tasks and fibers until there are no more
// and shutdown is true, upon runUntilShutdown() returns.
- void runUntilShutdown();
+ void runUntilShutdown() REQUIRES(work.mutex);
// steal() attempts to steal a Task from the worker for another worker.
// Returns true if a task was taken and assigned to out, otherwise false.
- bool steal(Task& out);
+ bool steal(Task& out) EXCLUDES(work.mutex);
// getCurrent() returns the Worker currently bound to the current
// thread.
@@ -371,27 +361,22 @@
private:
// run() is the task processing function for the worker.
// run() processes tasks until stop() is called.
- _Requires_lock_held_(work.mutex)
- void run();
+ void run() REQUIRES(work.mutex);
// createWorkerFiber() creates a new fiber that when executed calls
// run().
- _Requires_lock_held_(work.mutex)
- Fiber* createWorkerFiber();
+ Fiber* createWorkerFiber() REQUIRES(work.mutex);
// switchToFiber() switches execution to the given fiber. The fiber
// must belong to this worker.
- _Requires_lock_held_(work.mutex)
- void switchToFiber(Fiber*);
+ void switchToFiber(Fiber*) REQUIRES(work.mutex);
// runUntilIdle() executes all pending tasks and then returns.
- _Requires_lock_held_(work.mutex)
- void runUntilIdle();
+ void runUntilIdle() REQUIRES(work.mutex);
// waitForWork() blocks until new work is available, potentially calling
// spinForWork().
- _Requires_lock_held_(work.mutex)
- void waitForWork();
+ void waitForWork() REQUIRES(work.mutex);
// spinForWork() attempts to steal work from another Worker, and keeps
// the thread awake for a short duration. This reduces overheads of
@@ -400,31 +385,28 @@
// enqueueFiberTimeouts() enqueues all the fibers that have finished
// waiting.
- _Requires_lock_held_(work.mutex)
- void enqueueFiberTimeouts();
+ void enqueueFiberTimeouts() REQUIRES(work.mutex);
- _Requires_lock_held_(work.mutex)
inline void changeFiberState(Fiber* fiber,
Fiber::State from,
- Fiber::State to) const;
+ Fiber::State to) const REQUIRES(work.mutex);
- _Requires_lock_held_(work.mutex)
- inline void setFiberState(Fiber* fiber, Fiber::State to) const;
+ inline void setFiberState(Fiber* fiber, Fiber::State to) const
+ REQUIRES(work.mutex);
// Work holds tasks and fibers that are enqueued on the Worker.
struct Work {
std::atomic<uint64_t> num = {0}; // tasks.size() + fibers.size()
- _Guarded_by_(mutex) uint64_t numBlockedFibers = 0;
- _Guarded_by_(mutex) TaskQueue tasks;
- _Guarded_by_(mutex) FiberQueue fibers;
- _Guarded_by_(mutex) WaitingFibers waiting;
- _Guarded_by_(mutex) bool notifyAdded = true;
+ GUARDED_BY(mutex) uint64_t numBlockedFibers = 0;
+ GUARDED_BY(mutex) TaskQueue tasks;
+ GUARDED_BY(mutex) FiberQueue fibers;
+ GUARDED_BY(mutex) WaitingFibers waiting;
+ GUARDED_BY(mutex) bool notifyAdded = true;
std::condition_variable added;
- std::mutex mutex;
+ marl::mutex mutex;
- _Requires_lock_held_(mutex)
template <typename F>
- inline void wait(F&&);
+ inline void wait(F&&) REQUIRES(mutex);
};
// https://en.wikipedia.org/wiki/Xorshift
@@ -472,7 +454,7 @@
Allocator* const allocator;
std::function<void()> threadInitFunc;
- std::mutex threadInitFuncMutex;
+ mutex threadInitFuncMutex;
std::array<std::atomic<int>, 8> spinningWorkers;
std::atomic<unsigned int> nextSpinningWorkerIdx = {0x8000000};
@@ -484,17 +466,18 @@
std::array<Worker*, MaxWorkerThreads> workerThreads;
struct SingleThreadedWorkers {
- std::mutex mutex;
- std::condition_variable unbind;
- std::unordered_map<std::thread::id, Allocator::unique_ptr<Worker>> byTid;
+ using WorkerByTid =
+ std::unordered_map<std::thread::id, Allocator::unique_ptr<Worker>>;
+ marl::mutex mutex;
+ GUARDED_BY(mutex) std::condition_variable unbind;
+ GUARDED_BY(mutex) WorkerByTid byTid;
};
SingleThreadedWorkers singleThreadedWorkers;
};
-_Requires_lock_held_(lock)
template <typename Clock, typename Duration>
bool Scheduler::Fiber::wait(
- Lock& lock,
+ marl::lock& lock,
const std::chrono::time_point<Clock, Duration>& timeout,
const Predicate& pred) {
using ToDuration = typename TimePoint::duration;
diff --git a/include/marl/ticket.h b/include/marl/ticket.h
index 59a38fc..6aa21ed 100644
--- a/include/marl/ticket.h
+++ b/include/marl/ticket.h
@@ -105,14 +105,14 @@
inline ~Record();
inline void done();
- inline void callAndUnlock(std::unique_lock<std::mutex>& lock);
+ inline void callAndUnlock(marl::lock& lock);
+ inline void unlink(); // guarded by shared->mutex
ConditionVariable isCalledCondVar;
std::shared_ptr<Shared> shared;
Record* next = nullptr; // guarded by shared->mutex
Record* prev = nullptr; // guarded by shared->mutex
- inline void unlink(); // guarded by shared->mutex
OnCall onCall; // guarded by shared->mutex
bool isCalled = false; // guarded by shared->mutex
std::atomic<bool> isDone = {false};
@@ -120,7 +120,7 @@
// Data shared between all tickets and the queue.
struct Shared {
- std::mutex mutex;
+ marl::mutex mutex;
Record tail;
};
@@ -136,7 +136,7 @@
Ticket::Ticket(Loan<Record>&& record) : record(std::move(record)) {}
void Ticket::wait() const {
- std::unique_lock<std::mutex> lock(record->shared->mutex);
+ marl::lock lock(record->shared->mutex);
record->isCalledCondVar.wait(lock, [this] { return record->isCalled; });
}
@@ -146,7 +146,7 @@
template <typename Function>
void Ticket::onCall(Function&& f) const {
- std::unique_lock<std::mutex> lock(record->shared->mutex);
+ marl::lock lock(record->shared->mutex);
if (record->isCalled) {
marl::schedule(std::move(f));
return;
@@ -192,7 +192,7 @@
f(std::move(Ticket(std::move(rec))));
});
last->next = &shared->tail;
- std::unique_lock<std::mutex> lock(shared->mutex);
+ marl::lock lock(shared->mutex);
first->prev = shared->tail.prev;
shared->tail.prev = last.get();
if (first->prev == nullptr) {
@@ -216,7 +216,7 @@
if (isDone.exchange(true)) {
return;
}
- std::unique_lock<std::mutex> lock(shared->mutex);
+ marl::lock lock(shared->mutex);
auto callNext = (prev == nullptr && next != nullptr) ? next : nullptr;
unlink();
if (callNext != nullptr) {
@@ -225,7 +225,7 @@
}
}
-void Ticket::Record::callAndUnlock(std::unique_lock<std::mutex>& lock) {
+void Ticket::Record::callAndUnlock(marl::lock& lock) {
if (isCalled) {
return;
}
@@ -233,7 +233,7 @@
OnCall callback;
std::swap(callback, onCall);
isCalledCondVar.notify_all();
- lock.unlock();
+ lock.unlock_no_tsa();
if (callback) {
marl::schedule(std::move(callback));
diff --git a/include/marl/tsa.h b/include/marl/tsa.h
new file mode 100644
index 0000000..84d0623
--- /dev/null
+++ b/include/marl/tsa.h
@@ -0,0 +1,80 @@
+// Copyright 2020 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Stubs Thread-Safty-Analysis annotation macros for platforms that do not
+// support them.
+// See https://clang.llvm.org/docs/ThreadSafetyAnalysis.html
+
+#ifndef marl_tsa_h
+#define marl_tsa_h
+
+// Enable thread safety attributes only with clang.
+// The attributes can be safely erased when compiling with other compilers.
+#if defined(__clang__) && (!defined(SWIG))
+#define THREAD_ANNOTATION_ATTRIBUTE__(x) __attribute__((x))
+#else
+#define THREAD_ANNOTATION_ATTRIBUTE__(x) // no-op
+#endif
+
+#define CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(capability(x))
+
+#define SCOPED_CAPABILITY THREAD_ANNOTATION_ATTRIBUTE__(scoped_lockable)
+
+#define GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(guarded_by(x))
+
+#define PT_GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(pt_guarded_by(x))
+
+#define ACQUIRED_BEFORE(...) \
+ THREAD_ANNOTATION_ATTRIBUTE__(acquired_before(__VA_ARGS__))
+
+#define ACQUIRED_AFTER(...) \
+ THREAD_ANNOTATION_ATTRIBUTE__(acquired_after(__VA_ARGS__))
+
+#define REQUIRES(...) \
+ THREAD_ANNOTATION_ATTRIBUTE__(requires_capability(__VA_ARGS__))
+
+#define REQUIRES_SHARED(...) \
+ THREAD_ANNOTATION_ATTRIBUTE__(requires_shared_capability(__VA_ARGS__))
+
+#define ACQUIRE(...) \
+ THREAD_ANNOTATION_ATTRIBUTE__(acquire_capability(__VA_ARGS__))
+
+#define ACQUIRE_SHARED(...) \
+ THREAD_ANNOTATION_ATTRIBUTE__(acquire_shared_capability(__VA_ARGS__))
+
+#define RELEASE(...) \
+ THREAD_ANNOTATION_ATTRIBUTE__(release_capability(__VA_ARGS__))
+
+#define RELEASE_SHARED(...) \
+ THREAD_ANNOTATION_ATTRIBUTE__(release_shared_capability(__VA_ARGS__))
+
+#define TRY_ACQUIRE(...) \
+ THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_capability(__VA_ARGS__))
+
+#define TRY_ACQUIRE_SHARED(...) \
+ THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_shared_capability(__VA_ARGS__))
+
+#define EXCLUDES(...) THREAD_ANNOTATION_ATTRIBUTE__(locks_excluded(__VA_ARGS__))
+
+#define ASSERT_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(assert_capability(x))
+
+#define ASSERT_SHARED_CAPABILITY(x) \
+ THREAD_ANNOTATION_ATTRIBUTE__(assert_shared_capability(x))
+
+#define RETURN_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(lock_returned(x))
+
+#define NO_THREAD_SAFETY_ANALYSIS \
+ THREAD_ANNOTATION_ATTRIBUTE__(no_thread_safety_analysis)
+
+#endif // marl_tsa_h
diff --git a/include/marl/waitgroup.h b/include/marl/waitgroup.h
index 7adbb19..a53a446 100644
--- a/include/marl/waitgroup.h
+++ b/include/marl/waitgroup.h
@@ -70,7 +70,7 @@
std::atomic<unsigned int> count = {0};
ConditionVariable cv;
- std::mutex mutex;
+ marl::mutex mutex;
};
const std::shared_ptr<Data> data;
};
@@ -91,7 +91,7 @@
MARL_ASSERT(data->count > 0, "marl::WaitGroup::done() called too many times");
auto count = --data->count;
if (count == 0) {
- std::unique_lock<std::mutex> lock(data->mutex);
+ marl::lock lock(data->mutex);
data->cv.notify_all();
return true;
}
@@ -99,7 +99,7 @@
}
void WaitGroup::wait() const {
- std::unique_lock<std::mutex> lock(data->mutex);
+ marl::lock lock(data->mutex);
data->cv.wait(lock, [this] { return data->count == 0; });
}
diff --git a/src/conditionvariable_test.cpp b/src/conditionvariable_test.cpp
index 2930e4d..18ece7c 100644
--- a/src/conditionvariable_test.cpp
+++ b/src/conditionvariable_test.cpp
@@ -18,17 +18,16 @@
#include "marl_test.h"
#include <condition_variable>
-#include <mutex>
TEST_F(WithoutBoundScheduler, ConditionVariable) {
bool trigger[3] = {false, false, false};
bool signal[3] = {false, false, false};
- std::mutex mutex;
+ marl::mutex mutex;
marl::ConditionVariable cv;
std::thread thread([&] {
for (int i = 0; i < 3; i++) {
- std::unique_lock<std::mutex> lock(mutex);
+ marl::lock lock(mutex);
cv.wait(lock, [&] {
EXPECT_TRUE(lock.owns_lock());
return trigger[i];
@@ -45,7 +44,7 @@
for (int i = 0; i < 3; i++) {
{
- std::unique_lock<std::mutex> lock(mutex);
+ marl::lock lock(mutex);
trigger[i] = true;
cv.notify_one();
cv.wait(lock, [&] {
@@ -66,12 +65,12 @@
TEST_P(WithBoundScheduler, ConditionVariable) {
bool trigger[3] = {false, false, false};
bool signal[3] = {false, false, false};
- std::mutex mutex;
+ marl::mutex mutex;
marl::ConditionVariable cv;
std::thread thread([&] {
for (int i = 0; i < 3; i++) {
- std::unique_lock<std::mutex> lock(mutex);
+ marl::lock lock(mutex);
cv.wait(lock, [&] {
EXPECT_TRUE(lock.owns_lock());
return trigger[i];
@@ -88,7 +87,7 @@
for (int i = 0; i < 3; i++) {
{
- std::unique_lock<std::mutex> lock(mutex);
+ marl::lock lock(mutex);
trigger[i] = true;
cv.notify_one();
cv.wait(lock, [&] {
@@ -113,14 +112,14 @@
// they are early-unblocked, along with expected lock state.
TEST_P(WithBoundScheduler, ConditionVariableTimeouts) {
for (int i = 0; i < 10; i++) {
- std::mutex mutex;
+ marl::mutex mutex;
marl::ConditionVariable cv;
bool signaled = false; // guarded by mutex
auto wg = marl::WaitGroup(100);
for (int j = 0; j < 100; j++) {
marl::schedule([=, &mutex, &cv, &signaled] {
{
- std::unique_lock<std::mutex> lock(mutex);
+ marl::lock lock(mutex);
cv.wait_for(lock, std::chrono::milliseconds(j), [&] {
EXPECT_TRUE(lock.owns_lock());
return signaled;
@@ -134,7 +133,7 @@
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
{
- std::unique_lock<std::mutex> lock(mutex);
+ marl::lock lock(mutex);
signaled = true;
cv.notify_all();
}
diff --git a/src/scheduler.cpp b/src/scheduler.cpp
index c97d9ec..533892a 100644
--- a/src/scheduler.cpp
+++ b/src/scheduler.cpp
@@ -99,7 +99,7 @@
MARL_ASSERT(bound == nullptr, "Scheduler already bound");
bound = this;
{
- std::unique_lock<std::mutex> lock(singleThreadedWorkers.mutex);
+ marl::lock lock(singleThreadedWorkers.mutex);
auto worker =
allocator->make_unique<Worker>(this, Worker::Mode::SingleThreaded, -1);
worker->start();
@@ -113,7 +113,7 @@
auto worker = Scheduler::Worker::getCurrent();
worker->stop();
{
- std::unique_lock<std::mutex> lock(bound->singleThreadedWorkers.mutex);
+ marl::lock lock(bound->singleThreadedWorkers.mutex);
auto tid = std::this_thread::get_id();
auto it = bound->singleThreadedWorkers.byTid.find(tid);
MARL_ASSERT(it != bound->singleThreadedWorkers.byTid.end(),
@@ -137,9 +137,11 @@
Scheduler::~Scheduler() {
{
// Wait until all the single threaded workers have been unbound.
- std::unique_lock<std::mutex> lock(singleThreadedWorkers.mutex);
- singleThreadedWorkers.unbind.wait(
- lock, [this] { return singleThreadedWorkers.byTid.size() == 0; });
+ marl::lock lock(singleThreadedWorkers.mutex);
+ lock.wait(singleThreadedWorkers.unbind,
+ [this]() REQUIRES(singleThreadedWorkers.mutex) {
+ return singleThreadedWorkers.byTid.size() == 0;
+ });
}
// Release all worker threads.
@@ -148,12 +150,12 @@
}
void Scheduler::setThreadInitializer(const std::function<void()>& func) {
- std::unique_lock<std::mutex> lock(threadInitFuncMutex);
+ marl::lock lock(threadInitFuncMutex);
threadInitFunc = func;
}
const std::function<void()>& Scheduler::getThreadInitializer() {
- std::unique_lock<std::mutex> lock(threadInitFuncMutex);
+ marl::lock lock(threadInitFuncMutex);
return threadInitFunc;
}
@@ -251,7 +253,7 @@
worker->enqueue(this);
}
-void Scheduler::Fiber::wait(Lock& lock, const Predicate& pred) {
+void Scheduler::Fiber::wait(marl::lock& lock, const Predicate& pred) {
worker->wait(lock, nullptr, pred);
}
@@ -374,7 +376,7 @@
mainFiber = Fiber::createFromCurrentThread(scheduler->allocator, 0);
currentFiber = mainFiber.get();
{
- std::unique_lock<std::mutex> lock(work.mutex);
+ marl::lock lock(work.mutex);
run();
}
mainFiber.reset();
@@ -401,7 +403,7 @@
break;
}
case Mode::SingleThreaded: {
- std::unique_lock<std::mutex> lock(work.mutex);
+ marl::lock lock(work.mutex);
shutdown = true;
runUntilShutdown();
Worker::current = nullptr;
@@ -415,14 +417,13 @@
bool Scheduler::Worker::wait(const TimePoint* timeout) {
DBG_LOG("%d: WAIT(%d)", (int)id, (int)currentFiber->id);
{
- std::unique_lock<std::mutex> lock(work.mutex);
+ marl::lock lock(work.mutex);
suspend(timeout);
}
return timeout == nullptr || std::chrono::system_clock::now() < *timeout;
}
-_Requires_lock_held_(waitLock)
-bool Scheduler::Worker::wait(Fiber::Lock& waitLock,
+bool Scheduler::Worker::wait(lock& waitLock,
const TimePoint* timeout,
const Predicate& pred) {
DBG_LOG("%d: WAIT(%d)", (int)id, (int)currentFiber->id);
@@ -435,7 +436,7 @@
// enqueued (via Fiber::notify()) between the waitLock.unlock() and fiber
// switch, otherwise the Fiber::notify() call may be ignored and the fiber
// is never woken.
- waitLock.unlock();
+ waitLock.unlock_no_tsa();
// suspend the fiber.
suspend(timeout);
@@ -444,7 +445,7 @@
work.mutex.unlock();
// Re-lock to either return due to timeout, or call pred().
- waitLock.lock();
+ waitLock.lock_no_tsa();
// Check timeout.
if (timeout != nullptr && std::chrono::system_clock::now() >= *timeout) {
@@ -456,7 +457,6 @@
return true;
}
-_Requires_lock_held_(work.mutex)
void Scheduler::Worker::suspend(
const std::chrono::system_clock::time_point* timeout) {
// Current fiber is yielding as it is blocked.
@@ -496,33 +496,34 @@
setFiberState(currentFiber, Fiber::State::Running);
}
-_When_(return == true, _Acquires_lock_(work.mutex))
bool Scheduler::Worker::tryLock() {
return work.mutex.try_lock();
}
void Scheduler::Worker::enqueue(Fiber* fiber) {
- std::unique_lock<std::mutex> lock(work.mutex);
- DBG_LOG("%d: ENQUEUE(%d %s)", (int)id, (int)fiber->id,
- Fiber::toString(fiber->state));
- switch (fiber->state) {
- case Fiber::State::Running:
- case Fiber::State::Queued:
- return; // Nothing to do here - task is already queued or running.
- case Fiber::State::Waiting:
- work.waiting.erase(fiber);
- break;
- case Fiber::State::Idle:
- case Fiber::State::Yielded:
- break;
+ bool notify = false;
+ {
+ marl::lock lock(work.mutex);
+ DBG_LOG("%d: ENQUEUE(%d %s)", (int)id, (int)fiber->id,
+ Fiber::toString(fiber->state));
+ switch (fiber->state) {
+ case Fiber::State::Running:
+ case Fiber::State::Queued:
+ return; // Nothing to do here - task is already queued or running.
+ case Fiber::State::Waiting:
+ work.waiting.erase(fiber);
+ break;
+ case Fiber::State::Idle:
+ case Fiber::State::Yielded:
+ break;
+ }
+ notify = work.notifyAdded;
+ work.fibers.push_back(std::move(fiber));
+ MARL_ASSERT(!work.waiting.contains(fiber),
+ "fiber is unexpectedly in the waiting list");
+ setFiberState(fiber, Fiber::State::Queued);
+ work.num++;
}
- bool notify = work.notifyAdded;
- work.fibers.push_back(std::move(fiber));
- MARL_ASSERT(!work.waiting.contains(fiber),
- "fiber is unexpectedly in the waiting list");
- setFiberState(fiber, Fiber::State::Queued);
- work.num++;
- lock.unlock();
if (notify) {
work.added.notify_one();
@@ -534,8 +535,6 @@
enqueueAndUnlock(std::move(task));
}
-_Requires_lock_held_(work.mutex)
-_Releases_lock_(work.mutex)
void Scheduler::Worker::enqueueAndUnlock(Task&& task) {
auto notify = work.notifyAdded;
work.tasks.push_back(std::move(task));
@@ -564,21 +563,21 @@
return true;
}
-_Requires_lock_held_(work.mutex)
void Scheduler::Worker::run() {
if (mode == Mode::MultiThreaded) {
MARL_NAME_THREAD("Thread<%.2d> Fiber<%.2d>", int(id), Fiber::current()->id);
// This is the entry point for a multi-threaded worker.
// Start with a regular condition-variable wait for work. This avoids
// starting the thread with a spinForWork().
- work.wait([this] { return work.num > 0 || work.waiting || shutdown; });
+ work.wait([this]() REQUIRES(work.mutex) {
+ return work.num > 0 || work.waiting || shutdown;
+ });
}
ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
runUntilShutdown();
switchToFiber(mainFiber.get());
}
-_Requires_lock_held_(work.mutex)
void Scheduler::Worker::runUntilShutdown() {
while (!shutdown || work.num > 0 || work.numBlockedFibers > 0U) {
waitForWork();
@@ -586,7 +585,6 @@
}
}
-_Requires_lock_held_(work.mutex)
void Scheduler::Worker::waitForWork() {
MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(),
"work.num out of sync");
@@ -601,7 +599,7 @@
work.mutex.lock();
}
- work.wait([this] {
+ work.wait([this]() REQUIRES(work.mutex) {
return work.num > 0 || (shutdown && work.numBlockedFibers == 0U);
});
if (work.waiting) {
@@ -609,7 +607,6 @@
}
}
-_Requires_lock_held_(work.mutex)
void Scheduler::Worker::enqueueFiberTimeouts() {
auto now = std::chrono::system_clock::now();
while (auto fiber = work.waiting.take(now)) {
@@ -620,7 +617,6 @@
}
}
-_Requires_lock_held_(work.mutex)
void Scheduler::Worker::changeFiberState(Fiber* fiber,
Fiber::State from,
Fiber::State to) const {
@@ -631,7 +627,6 @@
fiber->state = to;
}
-_Requires_lock_held_(work.mutex)
void Scheduler::Worker::setFiberState(Fiber* fiber, Fiber::State to) const {
DBG_LOG("%d: SET_FIBER_STATE(%d %s -> %s)", (int)id, (int)fiber->id,
Fiber::toString(fiber->state), Fiber::toString(to));
@@ -659,7 +654,7 @@
}
if (scheduler->stealWork(this, rng(), stolen)) {
- std::unique_lock<std::mutex> lock(work.mutex);
+ marl::lock lock(work.mutex);
work.tasks.emplace_back(std::move(stolen));
work.num++;
return;
@@ -669,7 +664,6 @@
}
}
-_Requires_lock_held_(work.mutex)
void Scheduler::Worker::runUntilIdle() {
ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(),
@@ -713,18 +707,16 @@
}
}
-_Requires_lock_held_(work.mutex)
Scheduler::Fiber* Scheduler::Worker::createWorkerFiber() {
auto fiberId = static_cast<uint32_t>(workerFibers.size() + 1);
DBG_LOG("%d: CREATE(%d)", (int)id, (int)fiberId);
auto fiber = Fiber::create(scheduler->allocator, fiberId, FiberStackSize,
- [&] { run(); });
+ [&]() REQUIRES(work.mutex) { run(); });
auto ptr = fiber.get();
workerFibers.push_back(std::move(fiber));
return ptr;
}
-_Requires_lock_held_(work.mutex)
void Scheduler::Worker::switchToFiber(Fiber* to) {
DBG_LOG("%d: SWITCH(%d -> %d)", (int)id, (int)currentFiber->id, (int)to->id);
MARL_ASSERT(to == mainFiber.get() || idleFibers.count(to) == 0,
@@ -737,18 +729,15 @@
////////////////////////////////////////////////////////////////////////////////
// Scheduler::Worker::Work
////////////////////////////////////////////////////////////////////////////////
-_Requires_lock_held_(mutex)
template <typename F>
void Scheduler::Worker::Work::wait(F&& f) {
- std::unique_lock<std::mutex> lock(mutex, std::adopt_lock);
notifyAdded = true;
if (waiting) {
- added.wait_until(lock, waiting.next(), f);
+ mutex.wait_until_locked(added, waiting.next(), f);
} else {
- added.wait(lock, f);
+ mutex.wait_locked(added, f);
}
notifyAdded = false;
- lock.release(); // Keep the lock held.
}
} // namespace marl