Update Marl to 690889fbb

Pull in fix for crbug/351004963

Changes:
    690889fbb Make implicit `this` capture explicit (#266)
    dbf097e43 use std::forward when parameter is universal reference
    535d49182 Fixed race condition causing workers to sleep prematurely
    3eb171ef5 Make sure we don't copy or move Storage

Commands:
    ./third_party/update-marl.sh

Bug: b/140546382
Change-Id: I444204ac09978f080cbd2aebeab8d5247bd0f69d
Reviewed-on: https://swiftshader-review.googlesource.com/c/SwiftShader/+/74049
Tested-by: Shahbaz Youssefi <syoussefi@google.com>
Kokoro-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Shahbaz Youssefi <syoussefi@google.com>
diff --git a/third_party/marl/include/marl/dag.h b/third_party/marl/include/marl/dag.h
index 56a3815..a2631aa 100644
--- a/third_party/marl/include/marl/dag.h
+++ b/third_party/marl/include/marl/dag.h
@@ -94,6 +94,7 @@
   struct Node {
     MARL_NO_EXPORT inline Node() = default;
     MARL_NO_EXPORT inline Node(Work&& work);
+    MARL_NO_EXPORT inline Node(const Work& work);
 
     // The work to perform for this node in the graph.
     Work work;
@@ -138,6 +139,9 @@
 DAGBase<T>::Node::Node(Work&& work) : work(std::move(work)) {}
 
 template <typename T>
+DAGBase<T>::Node::Node(const Work& work) : work(work) {}
+
+template <typename T>
 void DAGBase<T>::initCounters(RunContext* ctx, Allocator* allocator) {
   auto numCounters = initialCounters.size();
   ctx->counters = allocator->make_unique_n<Counter>(numCounters);
@@ -233,7 +237,7 @@
 template <typename T>
 template <typename F>
 DAGNodeBuilder<T> DAGNodeBuilder<T>::then(F&& work) {
-  auto node = builder->node(std::move(work));
+  auto node = builder->node(std::forward<F>(work));
   builder->addDependency(*this, node);
   return node;
 }
@@ -323,7 +327,7 @@
               "NodeBuilder vectors out of sync");
   auto index = dag->nodes.size();
   numIns.emplace_back(0);
-  dag->nodes.emplace_back(Node{std::move(work)});
+  dag->nodes.emplace_back(Node{std::forward<F>(work)});
   auto node = DAGNodeBuilder<T>{this, index};
   for (auto in : after) {
     addDependency(in, node);
diff --git a/third_party/marl/include/marl/pool.h b/third_party/marl/include/marl/pool.h
index eba6652..f03eec3 100644
--- a/third_party/marl/include/marl/pool.h
+++ b/third_party/marl/include/marl/pool.h
@@ -235,6 +235,11 @@
     MARL_NO_EXPORT inline Storage(Allocator* allocator);
     MARL_NO_EXPORT inline ~Storage();
     MARL_NO_EXPORT inline void return_(Item*) override;
+    // We cannot copy this as the Item pointers would be shared and
+    // deleted at a wrong point. We cannot move this because we return
+    // pointers into items[N].
+    MARL_NO_EXPORT inline Storage(const Storage&) = delete;
+    MARL_NO_EXPORT inline Storage& operator=(const Storage&) = delete;
 
     Item items[N];
     marl::mutex mutex;
@@ -361,6 +366,11 @@
     MARL_NO_EXPORT inline Storage(Allocator* allocator);
     MARL_NO_EXPORT inline ~Storage();
     MARL_NO_EXPORT inline void return_(Item*) override;
+    // We cannot copy this as the Item pointers would be shared and
+    // deleted at a wrong point. We could move this but would have to take
+    // extra care no Item pointers are left in the moved-out object.
+    MARL_NO_EXPORT inline Storage(const Storage&) = delete;
+    MARL_NO_EXPORT inline Storage& operator=(const Storage&) = delete;
 
     Allocator* allocator;
     marl::mutex mutex;
diff --git a/third_party/marl/include/marl/scheduler.h b/third_party/marl/include/marl/scheduler.h
index b3159b8..9e133f2 100644
--- a/third_party/marl/include/marl/scheduler.h
+++ b/third_party/marl/include/marl/scheduler.h
@@ -417,10 +417,11 @@
     // spinForWork().
     void waitForWork() REQUIRES(work.mutex);
 
-    // spinForWork() attempts to steal work from another Worker, and keeps
+    // spinForWorkAndLock() attempts to steal work from another Worker, and keeps
     // the thread awake for a short duration. This reduces overheads of
-    // frequently putting the thread to sleep and re-waking.
-    void spinForWork();
+    // frequently putting the thread to sleep and re-waking. It locks the mutex
+    // before returning so that a stolen task cannot be re-stolen by other workers.
+    void spinForWorkAndLock() ACQUIRE(work.mutex);
 
     // enqueueFiberTimeouts() enqueues all the fibers that have finished
     // waiting.
@@ -498,7 +499,7 @@
   // The immutable configuration used to build the scheduler.
   const Config cfg;
 
-  std::array<std::atomic<int>, 8> spinningWorkers;
+  std::array<std::atomic<int>, MaxWorkerThreads> spinningWorkers;
   std::atomic<unsigned int> nextSpinningWorkerIdx = {0x8000000};
 
   std::atomic<unsigned int> nextEnqueueIndex = {0};
diff --git a/third_party/marl/src/dag_test.cpp b/third_party/marl/src/dag_test.cpp
index 2596041..b666318 100644
--- a/third_party/marl/src/dag_test.cpp
+++ b/third_party/marl/src/dag_test.cpp
@@ -173,3 +173,18 @@
               UnorderedElementsAre("E0", "E1", "E2", "E3"));
   ASSERT_THAT(data.order[11], "F");
 }
+
+TEST_P(WithBoundScheduler, DAGForwardFunc) {
+  marl::DAG<void>::Builder builder;
+  std::function<void()> func([](){});
+
+  ASSERT_TRUE(func);
+
+  auto a = builder.root()
+      .then(func)
+      .then(func);
+
+  builder.node(func, {a});
+
+  ASSERT_TRUE(func);
+}
diff --git a/third_party/marl/src/scheduler.cpp b/third_party/marl/src/scheduler.cpp
index dcdf83a..51bb27b 100644
--- a/third_party/marl/src/scheduler.cpp
+++ b/third_party/marl/src/scheduler.cpp
@@ -130,10 +130,8 @@
     : cfg(setConfigDefaults(config)),
       workerThreads{},
       singleThreadedWorkers(config.allocator) {
-  for (size_t i = 0; i < spinningWorkers.size(); i++) {
-    spinningWorkers[i] = -1;
-  }
   for (int i = 0; i < cfg.workerThread.count; i++) {
+    spinningWorkers[i] = -1;
     workerThreads[i] =
         cfg.allocator->create<Worker>(this, Worker::Mode::MultiThreaded, i);
   }
@@ -170,7 +168,7 @@
   if (cfg.workerThread.count > 0) {
     while (true) {
       // Prioritize workers that have recently started spinning.
-      auto i = --nextSpinningWorkerIdx % spinningWorkers.size();
+      auto i = --nextSpinningWorkerIdx % cfg.workerThread.count;
       auto idx = spinningWorkers[i].exchange(-1);
       if (idx < 0) {
         // If a spinning worker couldn't be found, round-robin the
@@ -212,7 +210,7 @@
 }
 
 void Scheduler::onBeginSpinning(int workerId) {
-  auto idx = nextSpinningWorkerIdx++ % spinningWorkers.size();
+  auto idx = nextSpinningWorkerIdx++ % cfg.workerThread.count;
   spinningWorkers[idx] = workerId;
 }
 
@@ -369,7 +367,7 @@
       auto allocator = scheduler->cfg.allocator;
       auto& affinityPolicy = scheduler->cfg.workerThread.affinityPolicy;
       auto affinity = affinityPolicy->get(id, allocator);
-      thread = Thread(std::move(affinity), [=] {
+      thread = Thread(std::move(affinity), [=, this] {
         Thread::setName("Thread<%.2d>", int(id));
 
         if (auto const& initFunc = scheduler->cfg.workerThread.initializer) {
@@ -572,7 +570,7 @@
     MARL_NAME_THREAD("Thread<%.2d> Fiber<%.2d>", int(id), Fiber::current()->id);
     // This is the entry point for a multi-threaded worker.
     // Start with a regular condition-variable wait for work. This avoids
-    // starting the thread with a spinForWork().
+    // starting the thread with a spinForWorkAndLock().
     work.wait([this]() REQUIRES(work.mutex) {
       return work.num > 0 || work.waiting || shutdown;
     });
@@ -599,8 +597,7 @@
   if (mode == Mode::MultiThreaded) {
     scheduler->onBeginSpinning(id);
     work.mutex.unlock();
-    spinForWork();
-    work.mutex.lock();
+    spinForWorkAndLock();
   }
 
   work.wait([this]() REQUIRES(work.mutex) {
@@ -637,7 +634,7 @@
   fiber->state = to;
 }
 
-void Scheduler::Worker::spinForWork() {
+void Scheduler::Worker::spinForWorkAndLock() {
   TRACE("SPIN");
   Task stolen;
 
@@ -652,13 +649,21 @@
       nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
       nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
       // clang-format on
+
       if (work.num > 0) {
-        return;
+        work.mutex.lock();
+        if (work.num > 0) {
+          return;
+        }
+        else {
+          // Our new task was stolen by another worker. Keep spinning.
+          work.mutex.unlock();
+        }
       }
     }
 
     if (scheduler->stealWork(this, rng(), stolen)) {
-      marl::lock lock(work.mutex);
+      work.mutex.lock();
       work.tasks.emplace_back(std::move(stolen));
       work.num++;
       return;
@@ -666,6 +671,7 @@
 
     std::this_thread::yield();
   }
+  work.mutex.lock();
 }
 
 void Scheduler::Worker::runUntilIdle() {
diff --git a/third_party/marl/src/scheduler_bench.cpp b/third_party/marl/src/scheduler_bench.cpp
index 81c4d2f..fb7beb5 100644
--- a/third_party/marl/src/scheduler_bench.cpp
+++ b/third_party/marl/src/scheduler_bench.cpp
@@ -48,6 +48,35 @@
 }
 BENCHMARK_REGISTER_F(Schedule, SomeWork)->Apply(Schedule::args);
 
+BENCHMARK_DEFINE_F(Schedule, MultipleForkAndJoin)(benchmark::State& state) {
+  run(state, [&](int numTasks) {
+    const int batchSize = std::max(1, Schedule::numThreads(state));
+    for (auto _ : state) {
+      marl::WaitGroup wg;
+      for (int i = 0; i < numTasks; i++) {
+        wg.add(1);
+        marl::schedule([=] {
+          // Give each task a significant amount of work so that concurrency matters.
+          // If any worker performs more than one task, it will affect the results.
+          int value = i;
+          for (int j = 0; j < 256; ++j) {
+            value = doSomeWork(value);
+          }
+          benchmark::DoNotOptimize(value);
+          wg.done();
+        });
+        // Wait for completion after every batch. This simulates the fork-and-join pattern.
+        if ((i + 1) % batchSize == 0) {
+          wg.wait();
+        }
+      }
+      wg.wait();
+    }
+  });
+}
+
+BENCHMARK_REGISTER_F(Schedule, MultipleForkAndJoin)->Apply(Schedule::args<512>);
+
 BENCHMARK_DEFINE_F(Schedule, SomeWorkWorkerAffinityOneOf)
 (benchmark::State& state) {
   marl::Scheduler::Config cfg;