Update Marl to 748d3c161
Changes:
748d3c161 Add usage recommendations to README.md
2939fcfed Replace marl::parallelize()
4b4600208 Fix stupid typo in front page code samples.
742bba9c8 Fix clang-tidy and go-vet warnings.
Commands:
./third_party/update-marl.sh --squash
Bug: b/140546382
Change-Id: Ia8226c11bafbf40907a4622cfdfae3cdcdc303ba
diff --git a/third_party/marl/BUILD.gn b/third_party/marl/BUILD.gn
index e5d68ac..fd23721 100644
--- a/third_party/marl/BUILD.gn
+++ b/third_party/marl/BUILD.gn
@@ -31,6 +31,7 @@
"include/marl/finally.h",
"include/marl/memory.h",
"include/marl/mutex.h",
+ "include/marl/parallelize.h",
"include/marl/pool.h",
"include/marl/sanitizers.h",
"include/marl/scheduler.h",
@@ -39,7 +40,6 @@
"include/marl/ticket.h",
"include/marl/trace.h",
"include/marl/tsa.h",
- "include/marl/utils.h",
"include/marl/waitgroup.h",
]
diff --git a/third_party/marl/CMakeLists.txt b/third_party/marl/CMakeLists.txt
index 005b855..4707a86 100644
--- a/third_party/marl/CMakeLists.txt
+++ b/third_party/marl/CMakeLists.txt
@@ -242,6 +242,7 @@
${MARL_SRC_DIR}/marl_test.h
${MARL_SRC_DIR}/memory_test.cpp
${MARL_SRC_DIR}/osfiber_test.cpp
+ ${MARL_SRC_DIR}/parallelize_test.cpp
${MARL_SRC_DIR}/pool_test.cpp
${MARL_SRC_DIR}/scheduler_test.cpp
${MARL_SRC_DIR}/ticket_test.cpp
diff --git a/third_party/marl/README.md b/third_party/marl/README.md
index cea4522..06408dc 100644
--- a/third_party/marl/README.md
+++ b/third_party/marl/README.md
@@ -33,31 +33,31 @@
constexpr int numTasks = 10;
// Create an event that is manually reset.
- marl::Event sayHellow(marl::Event::Mode::Manual);
+ marl::Event sayHello(marl::Event::Mode::Manual);
// Create a WaitGroup with an initial count of numTasks.
- marl::WaitGroup saidHellow(numTasks);
+ marl::WaitGroup saidHello(numTasks);
// Schedule some tasks to run asynchronously.
for (int i = 0; i < numTasks; i++) {
// Each task will run on one of the 4 worker threads.
marl::schedule([=] { // All marl primitives are capture-by-value.
// Decrement the WaitGroup counter when the task has finished.
- defer(saidHellow.done());
+ defer(saidHello.done());
printf("Task %d waiting to say hello...\n", i);
// Blocking in a task?
// The scheduler will find something else for this thread to do.
- sayHellow.wait();
+ sayHello.wait();
printf("Hello from task %d!\n", i);
});
}
- sayHellow.signal(); // Unblock all the tasks.
+ sayHello.signal(); // Unblock all the tasks.
- saidHellow.wait(); // Wait for all tasks to complete.
+ saidHello.wait(); // Wait for all tasks to complete.
printf("All tasks said hello.\n");
@@ -123,6 +123,81 @@
add_subdirectory(${MARL_DIR})
```
+### Usage Recommendations
+
+#### Capture marl synchronization primitves by value
+
+All marl synchronization primitves aside from `marl::ConditionVariable` should be lambda-captured by **value**:
+
+```c++
+marl::Event event;
+marl::schedule([=]{ // [=] Good, [&] Bad.
+ event.signal();
+})
+```
+
+Internally, these primitives hold a shared pointer to the primitive state. By capturing by value we avoid common issues where the primitive may be destructed before the last reference is used.
+
+#### Create one instance of `marl::Scheduler`, use it for the lifetime of the process.
+
+`marl::Scheduler::setWorkerThreadCount()` is an expensive operation as it spawn a number of hardware threads. \
+Destructing the `marl::Scheduler` requires waiting on all tasks to complete.
+
+Multiple `marl::Scheduler`s may fight each other for hardware thread utilization.
+
+For these reasons, it is recommended to create a single `marl::Scheduler` for the lifetime of your process.
+
+For example:
+
+```c++
+int main() {
+ marl::Scheduler scheduler;
+ scheduler.bind();
+ scheduler.setWorkerThreadCount(marl::Thread::numLogicalCPUs());
+ defer(scheduler.unbind());
+
+ return do_program_stuff();
+}
+```
+
+#### Bind the scheduler to externally created threads
+
+In order to call `marl::schedule()` the scheduler must be bound to the calling thread. Failure to bind the scheduler to the thread before calling `marl::schedule()` will result in undefined behavior.
+
+`marl::Scheduler` may be simultaneously bound to any number of threads, and the scheduler can be retrieved from a bound thread with `marl::Scheduler::get()`.
+
+A typical way to pass the scheduler from one thread to another would be:
+
+```c++
+std::thread spawn_new_thread() {
+ // Grab the scheduler from the currently running thread.
+ marl::Scheduler* scheduler = marl::Scheduler::get();
+
+ // Spawn the new thread.
+ return std::thread([=] {
+ // Bind the scheduler to the new thread.
+ scheduler->bind();
+ defer(scheduler->unbind());
+
+ // You can now safely call `marl::schedule()`
+ run_thread_logic();
+ });
+}
+
+```
+
+Always remember to unbind the scheduler before terminating the thread. Forgetting to unbind will result in the `marl::Scheduler` destructor blocking indefinitely.
+
+#### Don't use externally blocking calls in marl tasks
+
+The `marl::Scheduler` internally holds a number of worker threads which will execute the scheduled tasks. If a marl task becomes blocked on a marl synchronization primitive, marl can yield from the blocked task and continue execution of other scheduled tasks.
+
+Calling a non-marl blocking function on a marl worker thread will prevent that worker thread from being able to switch to execute other tasks until the blocking function has returned. Examples of these non-marl blocking functions include: [`std::mutex::lock()`](https://en.cppreference.com/w/cpp/thread/mutex/lock), [`std::condition_variable::wait()`](https://en.cppreference.com/w/cpp/thread/condition_variable/wait), [`accept()`](http://man7.org/linux/man-pages/man2/accept.2.html).
+
+Short blocking calls are acceptable, such as a mutex lock to access a data structure. However be careful that you do not use a marl blocking call with a `std::mutex` lock held - the marl task may yield with the lock held, and block other tasks from re-locking the mutex. This sort of situation may end up with a deadlock.
+
+If you need to make a blocking call from a marl worker thread, you may wish to use [`marl::blocking_call()`](https://github.com/google/marl/blob/master/include/marl/blockingcall.h), which will spawn a new thread for performing the call, allowing the marl worker to continue processing other scheduled tasks.
+
---
Note: This is not an officially supported Google product
diff --git a/third_party/marl/examples/hello_task.cpp b/third_party/marl/examples/hello_task.cpp
index 6dfff3c..3206c3c 100644
--- a/third_party/marl/examples/hello_task.cpp
+++ b/third_party/marl/examples/hello_task.cpp
@@ -32,31 +32,31 @@
constexpr int numTasks = 10;
// Create an event that is manually reset.
- marl::Event sayHellow(marl::Event::Mode::Manual);
+ marl::Event sayHello(marl::Event::Mode::Manual);
// Create a WaitGroup with an initial count of numTasks.
- marl::WaitGroup saidHellow(numTasks);
+ marl::WaitGroup saidHello(numTasks);
// Schedule some tasks to run asynchronously.
for (int i = 0; i < numTasks; i++) {
// Each task will run on one of the 4 worker threads.
marl::schedule([=] { // All marl primitives are capture-by-value.
// Decrement the WaitGroup counter when the task has finished.
- defer(saidHellow.done());
+ defer(saidHello.done());
printf("Task %d waiting to say hello...\n", i);
// Blocking in a task?
// The scheduler will find something else for this thread to do.
- sayHellow.wait();
+ sayHello.wait();
printf("Hello from task %d!\n", i);
});
}
- sayHellow.signal(); // Unblock all the tasks.
+ sayHello.signal(); // Unblock all the tasks.
- saidHellow.wait(); // Wait for all tasks to complete.
+ saidHello.wait(); // Wait for all tasks to complete.
printf("All tasks said hello.\n");
diff --git a/third_party/marl/include/marl/parallelize.h b/third_party/marl/include/marl/parallelize.h
new file mode 100644
index 0000000..d7ceadc
--- /dev/null
+++ b/third_party/marl/include/marl/parallelize.h
@@ -0,0 +1,50 @@
+// Copyright 2020 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef marl_parallelize_h
+#define marl_parallelize_h
+
+#include "scheduler.h"
+#include "waitgroup.h"
+
+namespace marl {
+
+namespace detail {
+
+void parallelizeChain(WaitGroup*) {}
+
+template <typename F, typename... L>
+void parallelizeChain(WaitGroup* wg, F&& f, L&&... l) {
+ schedule([=] {
+ f();
+ wg->done();
+ });
+ parallelizeChain(wg, std::forward<L>(l)...);
+}
+
+} // namespace detail
+
+// parallelize() schedules all the function parameters and waits for them to
+// complete. These functions may execute concurrently.
+// Each function must take no parameters.
+template <typename... FUNCTIONS>
+inline void parallelize(FUNCTIONS&&... functions) {
+ WaitGroup wg(sizeof...(FUNCTIONS));
+ detail::parallelizeChain(&wg, functions...);
+ wg.wait();
+}
+
+} // namespace marl
+
+#endif // marl_parallelize_h
diff --git a/third_party/marl/include/marl/scheduler.h b/third_party/marl/include/marl/scheduler.h
index 4db4b4d..0acfdb9 100644
--- a/third_party/marl/include/marl/scheduler.h
+++ b/third_party/marl/include/marl/scheduler.h
@@ -255,7 +255,7 @@
// take() returns the next fiber that has exceeded its timeout, or nullptr
// if there are no fibers that have yet exceeded their timeouts.
- inline Fiber* take(const TimePoint& timepoint);
+ inline Fiber* take(const TimePoint& timeout);
// next() returns the timepoint of the next fiber to timeout.
// next() can only be called if operator bool() returns true.
diff --git a/third_party/marl/include/marl/utils.h b/third_party/marl/include/marl/utils.h
deleted file mode 100644
index 5ecae80..0000000
--- a/third_party/marl/include/marl/utils.h
+++ /dev/null
@@ -1,52 +0,0 @@
-// Copyright 2019 The Marl Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#ifndef marl_util_h
-#define marl_util_h
-
-#include "scheduler.h"
-#include "waitgroup.h"
-
-namespace marl {
-
-// parallelize() is used to split a number of work items into N smaller batches
-// which can be processed in parallel with the function f().
-// numTotal is the total number of work items to process.
-// numPerTask is the maximum number of work items to process per call to f().
-// There will always be at least one call to f().
-// F must be a function with the signature:
-// void(COUNTER taskIndex, COUNTER first, COUNTER count)
-// COUNTER is any integer type.
-template <typename F, typename COUNTER>
-inline void parallelize(COUNTER numTotal, COUNTER numPerTask, const F& f) {
- auto numTasks = (numTotal + numPerTask - 1) / numPerTask;
- WaitGroup wg(numTasks - 1);
- for (unsigned int task = 1; task < numTasks; task++) {
- schedule([=] {
- auto first = task * numPerTask;
- auto count = std::min(first + numPerTask, numTotal) - first;
- f(task, first, count);
- wg.done();
- });
- }
-
- // Run the first chunk on this fiber to reduce the amount of time spent
- // waiting.
- f(0, 0, std::min(numPerTask, numTotal));
- wg.wait();
-}
-
-} // namespace marl
-
-#endif // marl_util_h
diff --git a/third_party/marl/src/parallelize_test.cpp b/third_party/marl/src/parallelize_test.cpp
new file mode 100644
index 0000000..c111d40
--- /dev/null
+++ b/third_party/marl/src/parallelize_test.cpp
@@ -0,0 +1,27 @@
+// Copyright 2020 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "marl_test.h"
+
+#include "marl/parallelize.h"
+
+TEST_P(WithBoundScheduler, Parallelize) {
+ bool a = false;
+ bool b = false;
+ bool c = false;
+ marl::parallelize([&] { a = true; }, [&] { b = true; }, [&] { c = true; });
+ ASSERT_TRUE(a);
+ ASSERT_TRUE(b);
+ ASSERT_TRUE(c);
+}
diff --git a/third_party/marl/src/scheduler.cpp b/third_party/marl/src/scheduler.cpp
index 533892a..b6443d1 100644
--- a/third_party/marl/src/scheduler.cpp
+++ b/third_party/marl/src/scheduler.cpp
@@ -120,7 +120,7 @@
"singleThreadedWorker not found");
MARL_ASSERT(it->second.get() == worker, "worker is not bound?");
bound->singleThreadedWorkers.byTid.erase(it);
- if (bound->singleThreadedWorkers.byTid.size() == 0) {
+ if (bound->singleThreadedWorkers.byTid.empty()) {
bound->singleThreadedWorkers.unbind.notify_one();
}
}
@@ -140,7 +140,7 @@
marl::lock lock(singleThreadedWorkers.mutex);
lock.wait(singleThreadedWorkers.unbind,
[this]() REQUIRES(singleThreadedWorkers.mutex) {
- return singleThreadedWorkers.byTid.size() == 0;
+ return singleThreadedWorkers.byTid.empty();
});
}
@@ -149,9 +149,9 @@
setWorkerThreadCount(0);
}
-void Scheduler::setThreadInitializer(const std::function<void()>& func) {
+void Scheduler::setThreadInitializer(const std::function<void()>& init) {
marl::lock lock(threadInitFuncMutex);
- threadInitFunc = func;
+ threadInitFunc = init;
}
const std::function<void()>& Scheduler::getThreadInitializer() {
@@ -299,15 +299,15 @@
// Scheduler::WaitingFibers
////////////////////////////////////////////////////////////////////////////////
Scheduler::WaitingFibers::operator bool() const {
- return fibers.size() > 0;
+ return !fibers.empty();
}
-Scheduler::Fiber* Scheduler::WaitingFibers::take(const TimePoint& timepoint) {
+Scheduler::Fiber* Scheduler::WaitingFibers::take(const TimePoint& timeout) {
if (!*this) {
return nullptr;
}
auto it = timeouts.begin();
- if (timepoint < it->timepoint) {
+ if (timeout < it->timepoint) {
return nullptr;
}
auto fiber = it->fiber;
@@ -324,9 +324,9 @@
return timeouts.begin()->timepoint;
}
-void Scheduler::WaitingFibers::add(const TimePoint& timepoint, Fiber* fiber) {
- timeouts.emplace(Timeout{timepoint, fiber});
- bool added = fibers.emplace(fiber, timepoint).second;
+void Scheduler::WaitingFibers::add(const TimePoint& timeout, Fiber* fiber) {
+ timeouts.emplace(Timeout{timeout, fiber});
+ bool added = fibers.emplace(fiber, timeout).second;
(void)added;
MARL_ASSERT(added, "WaitingFibers::add() fiber already waiting");
}
@@ -474,13 +474,13 @@
work.numBlockedFibers++;
- if (work.fibers.size() > 0) {
+ if (!work.fibers.empty()) {
// There's another fiber that has become unblocked, resume that.
work.num--;
auto to = take(work.fibers);
ASSERT_FIBER_STATE(to, Fiber::State::Queued);
switchToFiber(to);
- } else if (idleFibers.size() > 0) {
+ } else if (!idleFibers.empty()) {
// There's an old fiber we can reuse, resume that.
auto to = take(idleFibers);
ASSERT_FIBER_STATE(to, Fiber::State::Idle);
@@ -518,7 +518,7 @@
break;
}
notify = work.notifyAdded;
- work.fibers.push_back(std::move(fiber));
+ work.fibers.push_back(fiber);
MARL_ASSERT(!work.waiting.contains(fiber),
"fiber is unexpectedly in the waiting list");
setFiberState(fiber, Fiber::State::Queued);
@@ -552,8 +552,7 @@
if (!work.mutex.try_lock()) {
return false;
}
- if (work.tasks.size() == 0 ||
- work.tasks.front().is(Task::Flags::SameThread)) {
+ if (work.tasks.empty() || work.tasks.front().is(Task::Flags::SameThread)) {
work.mutex.unlock();
return false;
}
@@ -668,12 +667,12 @@
ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(),
"work.num out of sync");
- while (work.fibers.size() > 0 || work.tasks.size() > 0) {
+ while (!work.fibers.empty() || !work.tasks.empty()) {
// Note: we cannot take and store on the stack more than a single fiber
// or task at a time, as the Fiber may yield and these items may get
// held on suspended fiber stack.
- while (work.fibers.size() > 0) {
+ while (!work.fibers.empty()) {
work.num--;
auto fiber = take(work.fibers);
// Sanity checks,
@@ -690,7 +689,7 @@
changeFiberState(currentFiber, Fiber::State::Idle, Fiber::State::Running);
}
- if (work.tasks.size() > 0) {
+ if (!work.tasks.empty()) {
work.num--;
auto task = take(work.tasks);
work.mutex.unlock();
diff --git a/third_party/marl/tools/bench/bench.go b/third_party/marl/tools/bench/bench.go
index 81f2d72..ad81181 100644
--- a/third_party/marl/tools/bench/bench.go
+++ b/third_party/marl/tools/bench/bench.go
@@ -69,7 +69,6 @@
case nil:
return b, nil
case errWrongFormat:
- break
default:
return Benchmark{}, err
}