Squashed 'third_party/marl/' changes from 539094011..748d3c161

748d3c161 Add usage recommendations to README.md
2939fcfed Replace marl::parallelize()
4b4600208 Fix stupid typo in front page code samples.
742bba9c8 Fix clang-tidy and go-vet warnings.

git-subtree-dir: third_party/marl
git-subtree-split: 748d3c16165c7dcff018273cfaf9d8f26a33333a
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 005b855..4707a86 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -242,6 +242,7 @@
         ${MARL_SRC_DIR}/marl_test.h
         ${MARL_SRC_DIR}/memory_test.cpp
         ${MARL_SRC_DIR}/osfiber_test.cpp
+        ${MARL_SRC_DIR}/parallelize_test.cpp
         ${MARL_SRC_DIR}/pool_test.cpp
         ${MARL_SRC_DIR}/scheduler_test.cpp
         ${MARL_SRC_DIR}/ticket_test.cpp
diff --git a/README.md b/README.md
index cea4522..06408dc 100644
--- a/README.md
+++ b/README.md
@@ -33,31 +33,31 @@
   constexpr int numTasks = 10;
 
   // Create an event that is manually reset.
-  marl::Event sayHellow(marl::Event::Mode::Manual);
+  marl::Event sayHello(marl::Event::Mode::Manual);
 
   // Create a WaitGroup with an initial count of numTasks.
-  marl::WaitGroup saidHellow(numTasks);
+  marl::WaitGroup saidHello(numTasks);
 
   // Schedule some tasks to run asynchronously.
   for (int i = 0; i < numTasks; i++) {
     // Each task will run on one of the 4 worker threads.
     marl::schedule([=] {  // All marl primitives are capture-by-value.
       // Decrement the WaitGroup counter when the task has finished.
-      defer(saidHellow.done());
+      defer(saidHello.done());
 
       printf("Task %d waiting to say hello...\n", i);
 
       // Blocking in a task?
       // The scheduler will find something else for this thread to do.
-      sayHellow.wait();
+      sayHello.wait();
 
       printf("Hello from task %d!\n", i);
     });
   }
 
-  sayHellow.signal();  // Unblock all the tasks.
+  sayHello.signal();  // Unblock all the tasks.
 
-  saidHellow.wait();  // Wait for all tasks to complete.
+  saidHello.wait();  // Wait for all tasks to complete.
 
   printf("All tasks said hello.\n");
 
@@ -123,6 +123,81 @@
 add_subdirectory(${MARL_DIR})
 ```
 
+### Usage Recommendations
+
+#### Capture marl synchronization primitves by value
+
+All marl synchronization primitves aside from `marl::ConditionVariable` should be lambda-captured by **value**:
+
+```c++
+marl::Event event;
+marl::schedule([=]{ // [=] Good, [&] Bad.
+  event.signal();
+})
+```
+
+Internally, these primitives hold a shared pointer to the primitive state. By capturing by value we avoid common issues where the primitive may be destructed before the last reference is used.
+
+#### Create one instance of `marl::Scheduler`, use it for the lifetime of the process.
+
+`marl::Scheduler::setWorkerThreadCount()` is an expensive operation as it spawn a number of hardware threads. \
+Destructing the `marl::Scheduler` requires waiting on all tasks to complete.
+
+Multiple `marl::Scheduler`s may fight each other for hardware thread utilization.
+
+For these reasons, it is recommended to create a single `marl::Scheduler` for the lifetime of your process.
+
+For example:
+
+```c++
+int main() {
+  marl::Scheduler scheduler;
+  scheduler.bind();
+  scheduler.setWorkerThreadCount(marl::Thread::numLogicalCPUs());
+  defer(scheduler.unbind());
+
+  return do_program_stuff();
+}
+```
+
+#### Bind the scheduler to externally created threads
+
+In order to call `marl::schedule()` the scheduler must be bound to the calling thread. Failure to bind the scheduler to the thread before calling `marl::schedule()` will result in undefined behavior.
+
+`marl::Scheduler` may be simultaneously bound to any number of threads, and the scheduler can be retrieved from a bound thread with `marl::Scheduler::get()`.
+
+A typical way to pass the scheduler from one thread to another would be:
+
+```c++
+std::thread spawn_new_thread() {
+  // Grab the scheduler from the currently running thread.
+  marl::Scheduler* scheduler = marl::Scheduler::get();
+
+  // Spawn the new thread.
+  return std::thread([=] {
+    // Bind the scheduler to the new thread.
+    scheduler->bind();
+    defer(scheduler->unbind());
+
+    // You can now safely call `marl::schedule()`
+    run_thread_logic();
+  });
+}
+
+```
+
+Always remember to unbind the scheduler before terminating the thread. Forgetting to unbind will result in the `marl::Scheduler` destructor blocking indefinitely.
+
+#### Don't use externally blocking calls in marl tasks
+
+The `marl::Scheduler` internally holds a number of worker threads which will execute the scheduled tasks. If a marl task becomes blocked on a marl synchronization primitive, marl can yield from the blocked task and continue execution of other scheduled tasks.
+
+Calling a non-marl blocking function on a marl worker thread will prevent that worker thread from being able to switch to execute other tasks until the blocking function has returned. Examples of these non-marl blocking functions include: [`std::mutex::lock()`](https://en.cppreference.com/w/cpp/thread/mutex/lock), [`std::condition_variable::wait()`](https://en.cppreference.com/w/cpp/thread/condition_variable/wait), [`accept()`](http://man7.org/linux/man-pages/man2/accept.2.html).
+
+Short blocking calls are acceptable, such as a mutex lock to access a data structure. However be careful that you do not use a marl blocking call with a `std::mutex` lock held - the marl task may yield with the lock held, and block other tasks from re-locking the mutex. This sort of situation may end up with a deadlock.
+
+If you need to make a blocking call from a marl worker thread, you may wish to use [`marl::blocking_call()`](https://github.com/google/marl/blob/master/include/marl/blockingcall.h), which will spawn a new thread for performing the call, allowing the marl worker to continue processing other scheduled tasks.
+
 ---
 
 Note: This is not an officially supported Google product
diff --git a/examples/hello_task.cpp b/examples/hello_task.cpp
index 6dfff3c..3206c3c 100644
--- a/examples/hello_task.cpp
+++ b/examples/hello_task.cpp
@@ -32,31 +32,31 @@
   constexpr int numTasks = 10;
 
   // Create an event that is manually reset.
-  marl::Event sayHellow(marl::Event::Mode::Manual);
+  marl::Event sayHello(marl::Event::Mode::Manual);
 
   // Create a WaitGroup with an initial count of numTasks.
-  marl::WaitGroup saidHellow(numTasks);
+  marl::WaitGroup saidHello(numTasks);
 
   // Schedule some tasks to run asynchronously.
   for (int i = 0; i < numTasks; i++) {
     // Each task will run on one of the 4 worker threads.
     marl::schedule([=] {  // All marl primitives are capture-by-value.
       // Decrement the WaitGroup counter when the task has finished.
-      defer(saidHellow.done());
+      defer(saidHello.done());
 
       printf("Task %d waiting to say hello...\n", i);
 
       // Blocking in a task?
       // The scheduler will find something else for this thread to do.
-      sayHellow.wait();
+      sayHello.wait();
 
       printf("Hello from task %d!\n", i);
     });
   }
 
-  sayHellow.signal();  // Unblock all the tasks.
+  sayHello.signal();  // Unblock all the tasks.
 
-  saidHellow.wait();  // Wait for all tasks to complete.
+  saidHello.wait();  // Wait for all tasks to complete.
 
   printf("All tasks said hello.\n");
 
diff --git a/include/marl/parallelize.h b/include/marl/parallelize.h
new file mode 100644
index 0000000..d7ceadc
--- /dev/null
+++ b/include/marl/parallelize.h
@@ -0,0 +1,50 @@
+// Copyright 2020 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef marl_parallelize_h
+#define marl_parallelize_h
+
+#include "scheduler.h"
+#include "waitgroup.h"
+
+namespace marl {
+
+namespace detail {
+
+void parallelizeChain(WaitGroup*) {}
+
+template <typename F, typename... L>
+void parallelizeChain(WaitGroup* wg, F&& f, L&&... l) {
+  schedule([=] {
+    f();
+    wg->done();
+  });
+  parallelizeChain(wg, std::forward<L>(l)...);
+}
+
+}  // namespace detail
+
+// parallelize() schedules all the function parameters and waits for them to
+// complete. These functions may execute concurrently.
+// Each function must take no parameters.
+template <typename... FUNCTIONS>
+inline void parallelize(FUNCTIONS&&... functions) {
+  WaitGroup wg(sizeof...(FUNCTIONS));
+  detail::parallelizeChain(&wg, functions...);
+  wg.wait();
+}
+
+}  // namespace marl
+
+#endif  // marl_parallelize_h
diff --git a/include/marl/scheduler.h b/include/marl/scheduler.h
index 4db4b4d..0acfdb9 100644
--- a/include/marl/scheduler.h
+++ b/include/marl/scheduler.h
@@ -255,7 +255,7 @@
 
     // take() returns the next fiber that has exceeded its timeout, or nullptr
     // if there are no fibers that have yet exceeded their timeouts.
-    inline Fiber* take(const TimePoint& timepoint);
+    inline Fiber* take(const TimePoint& timeout);
 
     // next() returns the timepoint of the next fiber to timeout.
     // next() can only be called if operator bool() returns true.
diff --git a/include/marl/utils.h b/include/marl/utils.h
deleted file mode 100644
index 5ecae80..0000000
--- a/include/marl/utils.h
+++ /dev/null
@@ -1,52 +0,0 @@
-// Copyright 2019 The Marl Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#ifndef marl_util_h
-#define marl_util_h
-
-#include "scheduler.h"
-#include "waitgroup.h"
-
-namespace marl {
-
-// parallelize() is used to split a number of work items into N smaller batches
-// which can be processed in parallel with the function f().
-// numTotal is the total number of work items to process.
-// numPerTask is the maximum number of work items to process per call to f().
-// There will always be at least one call to f().
-// F must be a function with the signature:
-//    void(COUNTER taskIndex, COUNTER first, COUNTER count)
-// COUNTER is any integer type.
-template <typename F, typename COUNTER>
-inline void parallelize(COUNTER numTotal, COUNTER numPerTask, const F& f) {
-  auto numTasks = (numTotal + numPerTask - 1) / numPerTask;
-  WaitGroup wg(numTasks - 1);
-  for (unsigned int task = 1; task < numTasks; task++) {
-    schedule([=] {
-      auto first = task * numPerTask;
-      auto count = std::min(first + numPerTask, numTotal) - first;
-      f(task, first, count);
-      wg.done();
-    });
-  }
-
-  // Run the first chunk on this fiber to reduce the amount of time spent
-  // waiting.
-  f(0, 0, std::min(numPerTask, numTotal));
-  wg.wait();
-}
-
-}  // namespace marl
-
-#endif  // marl_util_h
diff --git a/src/parallelize_test.cpp b/src/parallelize_test.cpp
new file mode 100644
index 0000000..c111d40
--- /dev/null
+++ b/src/parallelize_test.cpp
@@ -0,0 +1,27 @@
+// Copyright 2020 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "marl_test.h"
+
+#include "marl/parallelize.h"
+
+TEST_P(WithBoundScheduler, Parallelize) {
+  bool a = false;
+  bool b = false;
+  bool c = false;
+  marl::parallelize([&] { a = true; }, [&] { b = true; }, [&] { c = true; });
+  ASSERT_TRUE(a);
+  ASSERT_TRUE(b);
+  ASSERT_TRUE(c);
+}
diff --git a/src/scheduler.cpp b/src/scheduler.cpp
index 533892a..b6443d1 100644
--- a/src/scheduler.cpp
+++ b/src/scheduler.cpp
@@ -120,7 +120,7 @@
                 "singleThreadedWorker not found");
     MARL_ASSERT(it->second.get() == worker, "worker is not bound?");
     bound->singleThreadedWorkers.byTid.erase(it);
-    if (bound->singleThreadedWorkers.byTid.size() == 0) {
+    if (bound->singleThreadedWorkers.byTid.empty()) {
       bound->singleThreadedWorkers.unbind.notify_one();
     }
   }
@@ -140,7 +140,7 @@
     marl::lock lock(singleThreadedWorkers.mutex);
     lock.wait(singleThreadedWorkers.unbind,
               [this]() REQUIRES(singleThreadedWorkers.mutex) {
-                return singleThreadedWorkers.byTid.size() == 0;
+                return singleThreadedWorkers.byTid.empty();
               });
   }
 
@@ -149,9 +149,9 @@
   setWorkerThreadCount(0);
 }
 
-void Scheduler::setThreadInitializer(const std::function<void()>& func) {
+void Scheduler::setThreadInitializer(const std::function<void()>& init) {
   marl::lock lock(threadInitFuncMutex);
-  threadInitFunc = func;
+  threadInitFunc = init;
 }
 
 const std::function<void()>& Scheduler::getThreadInitializer() {
@@ -299,15 +299,15 @@
 // Scheduler::WaitingFibers
 ////////////////////////////////////////////////////////////////////////////////
 Scheduler::WaitingFibers::operator bool() const {
-  return fibers.size() > 0;
+  return !fibers.empty();
 }
 
-Scheduler::Fiber* Scheduler::WaitingFibers::take(const TimePoint& timepoint) {
+Scheduler::Fiber* Scheduler::WaitingFibers::take(const TimePoint& timeout) {
   if (!*this) {
     return nullptr;
   }
   auto it = timeouts.begin();
-  if (timepoint < it->timepoint) {
+  if (timeout < it->timepoint) {
     return nullptr;
   }
   auto fiber = it->fiber;
@@ -324,9 +324,9 @@
   return timeouts.begin()->timepoint;
 }
 
-void Scheduler::WaitingFibers::add(const TimePoint& timepoint, Fiber* fiber) {
-  timeouts.emplace(Timeout{timepoint, fiber});
-  bool added = fibers.emplace(fiber, timepoint).second;
+void Scheduler::WaitingFibers::add(const TimePoint& timeout, Fiber* fiber) {
+  timeouts.emplace(Timeout{timeout, fiber});
+  bool added = fibers.emplace(fiber, timeout).second;
   (void)added;
   MARL_ASSERT(added, "WaitingFibers::add() fiber already waiting");
 }
@@ -474,13 +474,13 @@
 
   work.numBlockedFibers++;
 
-  if (work.fibers.size() > 0) {
+  if (!work.fibers.empty()) {
     // There's another fiber that has become unblocked, resume that.
     work.num--;
     auto to = take(work.fibers);
     ASSERT_FIBER_STATE(to, Fiber::State::Queued);
     switchToFiber(to);
-  } else if (idleFibers.size() > 0) {
+  } else if (!idleFibers.empty()) {
     // There's an old fiber we can reuse, resume that.
     auto to = take(idleFibers);
     ASSERT_FIBER_STATE(to, Fiber::State::Idle);
@@ -518,7 +518,7 @@
         break;
     }
     notify = work.notifyAdded;
-    work.fibers.push_back(std::move(fiber));
+    work.fibers.push_back(fiber);
     MARL_ASSERT(!work.waiting.contains(fiber),
                 "fiber is unexpectedly in the waiting list");
     setFiberState(fiber, Fiber::State::Queued);
@@ -552,8 +552,7 @@
   if (!work.mutex.try_lock()) {
     return false;
   }
-  if (work.tasks.size() == 0 ||
-      work.tasks.front().is(Task::Flags::SameThread)) {
+  if (work.tasks.empty() || work.tasks.front().is(Task::Flags::SameThread)) {
     work.mutex.unlock();
     return false;
   }
@@ -668,12 +667,12 @@
   ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
   MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(),
               "work.num out of sync");
-  while (work.fibers.size() > 0 || work.tasks.size() > 0) {
+  while (!work.fibers.empty() || !work.tasks.empty()) {
     // Note: we cannot take and store on the stack more than a single fiber
     // or task at a time, as the Fiber may yield and these items may get
     // held on suspended fiber stack.
 
-    while (work.fibers.size() > 0) {
+    while (!work.fibers.empty()) {
       work.num--;
       auto fiber = take(work.fibers);
       // Sanity checks,
@@ -690,7 +689,7 @@
       changeFiberState(currentFiber, Fiber::State::Idle, Fiber::State::Running);
     }
 
-    if (work.tasks.size() > 0) {
+    if (!work.tasks.empty()) {
       work.num--;
       auto task = take(work.tasks);
       work.mutex.unlock();
diff --git a/tools/bench/bench.go b/tools/bench/bench.go
index 81f2d72..ad81181 100644
--- a/tools/bench/bench.go
+++ b/tools/bench/bench.go
@@ -69,7 +69,6 @@
 		case nil:
 			return b, nil
 		case errWrongFormat:
-			break
 		default:
 			return Benchmark{}, err
 		}