Update Marl to 690889fbb
Pull in fix for crbug/351004963
Changes:
690889fbb Make implicit `this` capture explicit (#266)
dbf097e43 use std::forward when parameter is universal reference
535d49182 Fixed race condition causing workers to sleep prematurely
3eb171ef5 Make sure we don't copy or move Storage
Commands:
./third_party/update-marl.sh
Bug: b/140546382
Change-Id: I444204ac09978f080cbd2aebeab8d5247bd0f69d
Reviewed-on: https://swiftshader-review.googlesource.com/c/SwiftShader/+/74049
Tested-by: Shahbaz Youssefi <syoussefi@google.com>
Kokoro-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Shahbaz Youssefi <syoussefi@google.com>
diff --git a/third_party/marl/include/marl/dag.h b/third_party/marl/include/marl/dag.h
index 56a3815..a2631aa 100644
--- a/third_party/marl/include/marl/dag.h
+++ b/third_party/marl/include/marl/dag.h
@@ -94,6 +94,7 @@
struct Node {
MARL_NO_EXPORT inline Node() = default;
MARL_NO_EXPORT inline Node(Work&& work);
+ MARL_NO_EXPORT inline Node(const Work& work);
// The work to perform for this node in the graph.
Work work;
@@ -138,6 +139,9 @@
DAGBase<T>::Node::Node(Work&& work) : work(std::move(work)) {}
template <typename T>
+DAGBase<T>::Node::Node(const Work& work) : work(work) {}
+
+template <typename T>
void DAGBase<T>::initCounters(RunContext* ctx, Allocator* allocator) {
auto numCounters = initialCounters.size();
ctx->counters = allocator->make_unique_n<Counter>(numCounters);
@@ -233,7 +237,7 @@
template <typename T>
template <typename F>
DAGNodeBuilder<T> DAGNodeBuilder<T>::then(F&& work) {
- auto node = builder->node(std::move(work));
+ auto node = builder->node(std::forward<F>(work));
builder->addDependency(*this, node);
return node;
}
@@ -323,7 +327,7 @@
"NodeBuilder vectors out of sync");
auto index = dag->nodes.size();
numIns.emplace_back(0);
- dag->nodes.emplace_back(Node{std::move(work)});
+ dag->nodes.emplace_back(Node{std::forward<F>(work)});
auto node = DAGNodeBuilder<T>{this, index};
for (auto in : after) {
addDependency(in, node);
diff --git a/third_party/marl/include/marl/pool.h b/third_party/marl/include/marl/pool.h
index eba6652..f03eec3 100644
--- a/third_party/marl/include/marl/pool.h
+++ b/third_party/marl/include/marl/pool.h
@@ -235,6 +235,11 @@
MARL_NO_EXPORT inline Storage(Allocator* allocator);
MARL_NO_EXPORT inline ~Storage();
MARL_NO_EXPORT inline void return_(Item*) override;
+ // We cannot copy this as the Item pointers would be shared and
+ // deleted at a wrong point. We cannot move this because we return
+ // pointers into items[N].
+ MARL_NO_EXPORT inline Storage(const Storage&) = delete;
+ MARL_NO_EXPORT inline Storage& operator=(const Storage&) = delete;
Item items[N];
marl::mutex mutex;
@@ -361,6 +366,11 @@
MARL_NO_EXPORT inline Storage(Allocator* allocator);
MARL_NO_EXPORT inline ~Storage();
MARL_NO_EXPORT inline void return_(Item*) override;
+ // We cannot copy this as the Item pointers would be shared and
+ // deleted at a wrong point. We could move this but would have to take
+ // extra care no Item pointers are left in the moved-out object.
+ MARL_NO_EXPORT inline Storage(const Storage&) = delete;
+ MARL_NO_EXPORT inline Storage& operator=(const Storage&) = delete;
Allocator* allocator;
marl::mutex mutex;
diff --git a/third_party/marl/include/marl/scheduler.h b/third_party/marl/include/marl/scheduler.h
index b3159b8..9e133f2 100644
--- a/third_party/marl/include/marl/scheduler.h
+++ b/third_party/marl/include/marl/scheduler.h
@@ -417,10 +417,11 @@
// spinForWork().
void waitForWork() REQUIRES(work.mutex);
- // spinForWork() attempts to steal work from another Worker, and keeps
+ // spinForWorkAndLock() attempts to steal work from another Worker, and keeps
// the thread awake for a short duration. This reduces overheads of
- // frequently putting the thread to sleep and re-waking.
- void spinForWork();
+ // frequently putting the thread to sleep and re-waking. It locks the mutex
+ // before returning so that a stolen task cannot be re-stolen by other workers.
+ void spinForWorkAndLock() ACQUIRE(work.mutex);
// enqueueFiberTimeouts() enqueues all the fibers that have finished
// waiting.
@@ -498,7 +499,7 @@
// The immutable configuration used to build the scheduler.
const Config cfg;
- std::array<std::atomic<int>, 8> spinningWorkers;
+ std::array<std::atomic<int>, MaxWorkerThreads> spinningWorkers;
std::atomic<unsigned int> nextSpinningWorkerIdx = {0x8000000};
std::atomic<unsigned int> nextEnqueueIndex = {0};
diff --git a/third_party/marl/src/dag_test.cpp b/third_party/marl/src/dag_test.cpp
index 2596041..b666318 100644
--- a/third_party/marl/src/dag_test.cpp
+++ b/third_party/marl/src/dag_test.cpp
@@ -173,3 +173,18 @@
UnorderedElementsAre("E0", "E1", "E2", "E3"));
ASSERT_THAT(data.order[11], "F");
}
+
+TEST_P(WithBoundScheduler, DAGForwardFunc) {
+ marl::DAG<void>::Builder builder;
+ std::function<void()> func([](){});
+
+ ASSERT_TRUE(func);
+
+ auto a = builder.root()
+ .then(func)
+ .then(func);
+
+ builder.node(func, {a});
+
+ ASSERT_TRUE(func);
+}
diff --git a/third_party/marl/src/scheduler.cpp b/third_party/marl/src/scheduler.cpp
index dcdf83a..51bb27b 100644
--- a/third_party/marl/src/scheduler.cpp
+++ b/third_party/marl/src/scheduler.cpp
@@ -130,10 +130,8 @@
: cfg(setConfigDefaults(config)),
workerThreads{},
singleThreadedWorkers(config.allocator) {
- for (size_t i = 0; i < spinningWorkers.size(); i++) {
- spinningWorkers[i] = -1;
- }
for (int i = 0; i < cfg.workerThread.count; i++) {
+ spinningWorkers[i] = -1;
workerThreads[i] =
cfg.allocator->create<Worker>(this, Worker::Mode::MultiThreaded, i);
}
@@ -170,7 +168,7 @@
if (cfg.workerThread.count > 0) {
while (true) {
// Prioritize workers that have recently started spinning.
- auto i = --nextSpinningWorkerIdx % spinningWorkers.size();
+ auto i = --nextSpinningWorkerIdx % cfg.workerThread.count;
auto idx = spinningWorkers[i].exchange(-1);
if (idx < 0) {
// If a spinning worker couldn't be found, round-robin the
@@ -212,7 +210,7 @@
}
void Scheduler::onBeginSpinning(int workerId) {
- auto idx = nextSpinningWorkerIdx++ % spinningWorkers.size();
+ auto idx = nextSpinningWorkerIdx++ % cfg.workerThread.count;
spinningWorkers[idx] = workerId;
}
@@ -369,7 +367,7 @@
auto allocator = scheduler->cfg.allocator;
auto& affinityPolicy = scheduler->cfg.workerThread.affinityPolicy;
auto affinity = affinityPolicy->get(id, allocator);
- thread = Thread(std::move(affinity), [=] {
+ thread = Thread(std::move(affinity), [=, this] {
Thread::setName("Thread<%.2d>", int(id));
if (auto const& initFunc = scheduler->cfg.workerThread.initializer) {
@@ -572,7 +570,7 @@
MARL_NAME_THREAD("Thread<%.2d> Fiber<%.2d>", int(id), Fiber::current()->id);
// This is the entry point for a multi-threaded worker.
// Start with a regular condition-variable wait for work. This avoids
- // starting the thread with a spinForWork().
+ // starting the thread with a spinForWorkAndLock().
work.wait([this]() REQUIRES(work.mutex) {
return work.num > 0 || work.waiting || shutdown;
});
@@ -599,8 +597,7 @@
if (mode == Mode::MultiThreaded) {
scheduler->onBeginSpinning(id);
work.mutex.unlock();
- spinForWork();
- work.mutex.lock();
+ spinForWorkAndLock();
}
work.wait([this]() REQUIRES(work.mutex) {
@@ -637,7 +634,7 @@
fiber->state = to;
}
-void Scheduler::Worker::spinForWork() {
+void Scheduler::Worker::spinForWorkAndLock() {
TRACE("SPIN");
Task stolen;
@@ -652,13 +649,21 @@
nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
// clang-format on
+
if (work.num > 0) {
- return;
+ work.mutex.lock();
+ if (work.num > 0) {
+ return;
+ }
+ else {
+ // Our new task was stolen by another worker. Keep spinning.
+ work.mutex.unlock();
+ }
}
}
if (scheduler->stealWork(this, rng(), stolen)) {
- marl::lock lock(work.mutex);
+ work.mutex.lock();
work.tasks.emplace_back(std::move(stolen));
work.num++;
return;
@@ -666,6 +671,7 @@
std::this_thread::yield();
}
+ work.mutex.lock();
}
void Scheduler::Worker::runUntilIdle() {
diff --git a/third_party/marl/src/scheduler_bench.cpp b/third_party/marl/src/scheduler_bench.cpp
index 81c4d2f..fb7beb5 100644
--- a/third_party/marl/src/scheduler_bench.cpp
+++ b/third_party/marl/src/scheduler_bench.cpp
@@ -48,6 +48,35 @@
}
BENCHMARK_REGISTER_F(Schedule, SomeWork)->Apply(Schedule::args);
+BENCHMARK_DEFINE_F(Schedule, MultipleForkAndJoin)(benchmark::State& state) {
+ run(state, [&](int numTasks) {
+ const int batchSize = std::max(1, Schedule::numThreads(state));
+ for (auto _ : state) {
+ marl::WaitGroup wg;
+ for (int i = 0; i < numTasks; i++) {
+ wg.add(1);
+ marl::schedule([=] {
+ // Give each task a significant amount of work so that concurrency matters.
+ // If any worker performs more than one task, it will affect the results.
+ int value = i;
+ for (int j = 0; j < 256; ++j) {
+ value = doSomeWork(value);
+ }
+ benchmark::DoNotOptimize(value);
+ wg.done();
+ });
+ // Wait for completion after every batch. This simulates the fork-and-join pattern.
+ if ((i + 1) % batchSize == 0) {
+ wg.wait();
+ }
+ }
+ wg.wait();
+ }
+ });
+}
+
+BENCHMARK_REGISTER_F(Schedule, MultipleForkAndJoin)->Apply(Schedule::args<512>);
+
BENCHMARK_DEFINE_F(Schedule, SomeWorkWorkerAffinityOneOf)
(benchmark::State& state) {
marl::Scheduler::Config cfg;