blob: af98159da4594f23055867c993b01c8e28c9ca21 [file] [log] [blame]
Ben Claytonfe71eb92019-09-05 12:35:50 +01001// Copyright 2019 The Marl Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#include "osfiber.h" // Must come first. See osfiber_ucontext.h.
16
17#include "marl/scheduler.h"
18
19#include "marl/debug.h"
Nicolas Capensb66407c2020-10-29 00:34:36 -040020#include "marl/sanitizers.h"
Ben Claytonfe71eb92019-09-05 12:35:50 +010021#include "marl/thread.h"
22#include "marl/trace.h"
23
24#if defined(_WIN32)
25#include <intrin.h> // __nop()
26#endif
27
28// Enable to trace scheduler events.
29#define ENABLE_TRACE_EVENTS 0
30
Ben Claytonec288e22020-02-09 19:20:01 +000031// Enable to print verbose debug logging.
32#define ENABLE_DEBUG_LOGGING 0
33
Ben Claytonfe71eb92019-09-05 12:35:50 +010034#if ENABLE_TRACE_EVENTS
35#define TRACE(...) MARL_SCOPED_EVENT(__VA_ARGS__)
36#else
37#define TRACE(...)
38#endif
39
Ben Claytonec288e22020-02-09 19:20:01 +000040#if ENABLE_DEBUG_LOGGING
41#define DBG_LOG(msg, ...) \
42 printf("%.3x " msg "\n", (int)threadID() & 0xfff, __VA_ARGS__)
43#else
44#define DBG_LOG(msg, ...)
45#endif
46
47#define ASSERT_FIBER_STATE(FIBER, STATE) \
48 MARL_ASSERT(FIBER->state == STATE, \
49 "fiber %d was in state %s, but expected %s", (int)FIBER->id, \
50 Fiber::toString(FIBER->state), Fiber::toString(STATE))
51
Ben Claytonfe71eb92019-09-05 12:35:50 +010052namespace {
53
Ben Claytonec288e22020-02-09 19:20:01 +000054#if ENABLE_DEBUG_LOGGING
55// threadID() returns a uint64_t representing the currently executing thread.
56// threadID() is only intended to be used for debugging purposes.
57inline uint64_t threadID() {
58 auto id = std::this_thread::get_id();
59 return std::hash<std::thread::id>()(id);
60}
61#endif
62
Ben Claytonfe71eb92019-09-05 12:35:50 +010063inline void nop() {
64#if defined(_WIN32)
65 __nop();
66#else
67 __asm__ __volatile__("nop");
68#endif
69}
70
Ben Clayton269f38f2020-10-02 13:24:32 +010071inline marl::Scheduler::Config setConfigDefaults(
72 const marl::Scheduler::Config& cfgIn) {
73 marl::Scheduler::Config cfg{cfgIn};
74 if (cfg.workerThread.count > 0 && !cfg.workerThread.affinityPolicy) {
75 cfg.workerThread.affinityPolicy = marl::Thread::Affinity::Policy::anyOf(
76 marl::Thread::Affinity::all(cfg.allocator), cfg.allocator);
77 }
78 return cfg;
79}
80
Ben Claytonfe71eb92019-09-05 12:35:50 +010081} // anonymous namespace
82
83namespace marl {
84
85////////////////////////////////////////////////////////////////////////////////
86// Scheduler
87////////////////////////////////////////////////////////////////////////////////
88thread_local Scheduler* Scheduler::bound = nullptr;
89
90Scheduler* Scheduler::get() {
91 return bound;
92}
93
94void Scheduler::bind() {
Nicolas Capensb66407c2020-10-29 00:34:36 -040095#if !MEMORY_SANITIZER_ENABLED
96 // thread_local variables in shared libraries are initialized at load-time,
97 // but this is not observed by MemorySanitizer if the loader itself was not
98 // instrumented, leading to false-positive unitialized variable errors.
99 // See https://github.com/google/marl/issues/184
Ben Claytonfe71eb92019-09-05 12:35:50 +0100100 MARL_ASSERT(bound == nullptr, "Scheduler already bound");
Nicolas Capensb66407c2020-10-29 00:34:36 -0400101#endif
Ben Claytonfe71eb92019-09-05 12:35:50 +0100102 bound = this;
103 {
Ben Clayton87878972020-04-07 10:43:07 +0100104 marl::lock lock(singleThreadedWorkers.mutex);
Ben Clayton068a0c52020-06-02 19:51:02 +0100105 auto worker = cfg.allocator->make_unique<Worker>(
106 this, Worker::Mode::SingleThreaded, -1);
Ben Claytonfe71eb92019-09-05 12:35:50 +0100107 worker->start();
108 auto tid = std::this_thread::get_id();
Ben Claytona38d7d12020-03-24 00:27:33 +0000109 singleThreadedWorkers.byTid.emplace(tid, std::move(worker));
Ben Claytonfe71eb92019-09-05 12:35:50 +0100110 }
111}
112
113void Scheduler::unbind() {
114 MARL_ASSERT(bound != nullptr, "No scheduler bound");
Ben Clayton068a0c52020-06-02 19:51:02 +0100115 auto worker = Worker::getCurrent();
Ben Claytonfe71eb92019-09-05 12:35:50 +0100116 worker->stop();
Ben Claytona38d7d12020-03-24 00:27:33 +0000117 {
Ben Clayton87878972020-04-07 10:43:07 +0100118 marl::lock lock(bound->singleThreadedWorkers.mutex);
Ben Claytona38d7d12020-03-24 00:27:33 +0000119 auto tid = std::this_thread::get_id();
120 auto it = bound->singleThreadedWorkers.byTid.find(tid);
121 MARL_ASSERT(it != bound->singleThreadedWorkers.byTid.end(),
122 "singleThreadedWorker not found");
123 MARL_ASSERT(it->second.get() == worker, "worker is not bound?");
124 bound->singleThreadedWorkers.byTid.erase(it);
Ben Clayton27c63672020-04-17 00:02:45 +0100125 if (bound->singleThreadedWorkers.byTid.empty()) {
Ben Claytona38d7d12020-03-24 00:27:33 +0000126 bound->singleThreadedWorkers.unbind.notify_one();
127 }
128 }
Ben Claytonfe71eb92019-09-05 12:35:50 +0100129 bound = nullptr;
130}
131
Ben Clayton32b1d4b2020-06-15 22:23:46 +0100132Scheduler::Scheduler(const Config& config)
Ben Clayton269f38f2020-10-02 13:24:32 +0100133 : cfg(setConfigDefaults(config)),
134 workerThreads{},
135 singleThreadedWorkers(config.allocator) {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100136 for (size_t i = 0; i < spinningWorkers.size(); i++) {
137 spinningWorkers[i] = -1;
138 }
Ben Clayton068a0c52020-06-02 19:51:02 +0100139 for (int i = 0; i < cfg.workerThread.count; i++) {
140 workerThreads[i] =
141 cfg.allocator->create<Worker>(this, Worker::Mode::MultiThreaded, i);
142 }
143 for (int i = 0; i < cfg.workerThread.count; i++) {
144 workerThreads[i]->start();
145 }
Ben Claytonfe71eb92019-09-05 12:35:50 +0100146}
147
148Scheduler::~Scheduler() {
149 {
Ben Claytona38d7d12020-03-24 00:27:33 +0000150 // Wait until all the single threaded workers have been unbound.
Ben Clayton87878972020-04-07 10:43:07 +0100151 marl::lock lock(singleThreadedWorkers.mutex);
152 lock.wait(singleThreadedWorkers.unbind,
153 [this]() REQUIRES(singleThreadedWorkers.mutex) {
Ben Clayton27c63672020-04-17 00:02:45 +0100154 return singleThreadedWorkers.byTid.empty();
Ben Clayton87878972020-04-07 10:43:07 +0100155 });
Ben Claytonfe71eb92019-09-05 12:35:50 +0100156 }
Ben Claytonf1f6e682020-02-13 17:34:05 +0000157
158 // Release all worker threads.
159 // This will wait for all in-flight tasks to complete before returning.
Ben Clayton068a0c52020-06-02 19:51:02 +0100160 for (int i = cfg.workerThread.count - 1; i >= 0; i--) {
161 workerThreads[i]->stop();
162 }
163 for (int i = cfg.workerThread.count - 1; i >= 0; i--) {
164 cfg.allocator->destroy(workerThreads[i]);
165 }
166}
167
Ben Claytonfe71eb92019-09-05 12:35:50 +0100168void Scheduler::enqueue(Task&& task) {
Ben Clayton36835bc2020-03-16 20:30:01 +0000169 if (task.is(Task::Flags::SameThread)) {
Ben Clayton068a0c52020-06-02 19:51:02 +0100170 Worker::getCurrent()->enqueue(std::move(task));
Ben Clayton36835bc2020-03-16 20:30:01 +0000171 return;
172 }
Ben Clayton068a0c52020-06-02 19:51:02 +0100173 if (cfg.workerThread.count > 0) {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100174 while (true) {
175 // Prioritize workers that have recently started spinning.
176 auto i = --nextSpinningWorkerIdx % spinningWorkers.size();
177 auto idx = spinningWorkers[i].exchange(-1);
178 if (idx < 0) {
179 // If a spinning worker couldn't be found, round-robin the
180 // workers.
Ben Clayton068a0c52020-06-02 19:51:02 +0100181 idx = nextEnqueueIndex++ % cfg.workerThread.count;
Ben Claytonfe71eb92019-09-05 12:35:50 +0100182 }
183
184 auto worker = workerThreads[idx];
185 if (worker->tryLock()) {
186 worker->enqueueAndUnlock(std::move(task));
187 return;
188 }
189 }
190 } else {
Ben Clayton068a0c52020-06-02 19:51:02 +0100191 if (auto worker = Worker::getCurrent()) {
192 worker->enqueue(std::move(task));
193 } else {
194 MARL_FATAL(
195 "singleThreadedWorker not found. Did you forget to call "
196 "marl::Scheduler::bind()?");
197 }
Ben Claytonfe71eb92019-09-05 12:35:50 +0100198 }
199}
200
Ben Clayton068a0c52020-06-02 19:51:02 +0100201const Scheduler::Config& Scheduler::config() const {
202 return cfg;
203}
204
Ben Claytonfe71eb92019-09-05 12:35:50 +0100205bool Scheduler::stealWork(Worker* thief, uint64_t from, Task& out) {
Ben Clayton068a0c52020-06-02 19:51:02 +0100206 if (cfg.workerThread.count > 0) {
207 auto thread = workerThreads[from % cfg.workerThread.count];
Ben Claytonfe71eb92019-09-05 12:35:50 +0100208 if (thread != thief) {
Ben Clayton36835bc2020-03-16 20:30:01 +0000209 if (thread->steal(out)) {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100210 return true;
211 }
212 }
213 }
214 return false;
215}
216
217void Scheduler::onBeginSpinning(int workerId) {
218 auto idx = nextSpinningWorkerIdx++ % spinningWorkers.size();
219 spinningWorkers[idx] = workerId;
220}
221
222////////////////////////////////////////////////////////////////////////////////
Ben Clayton068a0c52020-06-02 19:51:02 +0100223// Scheduler::Config
224////////////////////////////////////////////////////////////////////////////////
225Scheduler::Config Scheduler::Config::allCores() {
226 return Config().setWorkerThreadCount(Thread::numLogicalCPUs());
227}
228
229////////////////////////////////////////////////////////////////////////////////
230// Scheduler::Fiber
Ben Claytonfe71eb92019-09-05 12:35:50 +0100231////////////////////////////////////////////////////////////////////////////////
Ben Clayton193ce892019-10-04 20:46:13 +0100232Scheduler::Fiber::Fiber(Allocator::unique_ptr<OSFiber>&& impl, uint32_t id)
Ben Clayton068a0c52020-06-02 19:51:02 +0100233 : id(id), impl(std::move(impl)), worker(Worker::getCurrent()) {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100234 MARL_ASSERT(worker != nullptr, "No Scheduler::Worker bound");
235}
236
Nicolas Capens15478fd2021-05-27 13:50:51 -0400237// TODO(chromium:1211047): Testing the static thread_local Worker::current for
238// null causes a MemorySantizer false positive.
239CLANG_NO_SANITIZE_MEMORY
Ben Claytonfe71eb92019-09-05 12:35:50 +0100240Scheduler::Fiber* Scheduler::Fiber::current() {
Ben Clayton068a0c52020-06-02 19:51:02 +0100241 auto worker = Worker::getCurrent();
Ben Claytonfe71eb92019-09-05 12:35:50 +0100242 return worker != nullptr ? worker->getCurrentFiber() : nullptr;
243}
244
Ben Claytonec288e22020-02-09 19:20:01 +0000245void Scheduler::Fiber::notify() {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100246 worker->enqueue(this);
247}
248
Ben Clayton87878972020-04-07 10:43:07 +0100249void Scheduler::Fiber::wait(marl::lock& lock, const Predicate& pred) {
Nicolas Capensb66407c2020-10-29 00:34:36 -0400250 MARL_ASSERT(worker == Worker::getCurrent(),
251 "Scheduler::Fiber::wait() must only be called on the currently "
252 "executing fiber");
Ben Claytonec288e22020-02-09 19:20:01 +0000253 worker->wait(lock, nullptr, pred);
Ben Claytonfe71eb92019-09-05 12:35:50 +0100254}
255
256void Scheduler::Fiber::switchTo(Fiber* to) {
Nicolas Capensb66407c2020-10-29 00:34:36 -0400257 MARL_ASSERT(worker == Worker::getCurrent(),
258 "Scheduler::Fiber::switchTo() must only be called on the "
259 "currently executing fiber");
Ben Claytonfe71eb92019-09-05 12:35:50 +0100260 if (to != this) {
Ben Clayton193ce892019-10-04 20:46:13 +0100261 impl->switchTo(to->impl.get());
Ben Claytonfe71eb92019-09-05 12:35:50 +0100262 }
263}
264
Ben Clayton193ce892019-10-04 20:46:13 +0100265Allocator::unique_ptr<Scheduler::Fiber> Scheduler::Fiber::create(
266 Allocator* allocator,
267 uint32_t id,
268 size_t stackSize,
269 const std::function<void()>& func) {
270 return allocator->make_unique<Fiber>(
271 OSFiber::createFiber(allocator, stackSize, func), id);
Ben Claytonfe71eb92019-09-05 12:35:50 +0100272}
273
Ben Clayton193ce892019-10-04 20:46:13 +0100274Allocator::unique_ptr<Scheduler::Fiber>
275Scheduler::Fiber::createFromCurrentThread(Allocator* allocator, uint32_t id) {
276 return allocator->make_unique<Fiber>(
277 OSFiber::createFiberFromCurrentThread(allocator), id);
Ben Claytonfe71eb92019-09-05 12:35:50 +0100278}
279
Ben Claytonec288e22020-02-09 19:20:01 +0000280const char* Scheduler::Fiber::toString(State state) {
281 switch (state) {
282 case State::Idle:
283 return "Idle";
284 case State::Yielded:
285 return "Yielded";
286 case State::Queued:
287 return "Queued";
288 case State::Running:
289 return "Running";
290 case State::Waiting:
291 return "Waiting";
292 }
293 MARL_ASSERT(false, "bad fiber state");
294 return "<unknown>";
295}
296
Ben Claytonfe71eb92019-09-05 12:35:50 +0100297////////////////////////////////////////////////////////////////////////////////
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000298// Scheduler::WaitingFibers
299////////////////////////////////////////////////////////////////////////////////
Ben Clayton32b1d4b2020-06-15 22:23:46 +0100300Scheduler::WaitingFibers::WaitingFibers(Allocator* allocator)
301 : timeouts(allocator), fibers(allocator) {}
302
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000303Scheduler::WaitingFibers::operator bool() const {
Ben Clayton27c63672020-04-17 00:02:45 +0100304 return !fibers.empty();
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000305}
306
Ben Clayton27c63672020-04-17 00:02:45 +0100307Scheduler::Fiber* Scheduler::WaitingFibers::take(const TimePoint& timeout) {
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000308 if (!*this) {
309 return nullptr;
310 }
311 auto it = timeouts.begin();
Ben Clayton27c63672020-04-17 00:02:45 +0100312 if (timeout < it->timepoint) {
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000313 return nullptr;
314 }
315 auto fiber = it->fiber;
316 timeouts.erase(it);
317 auto deleted = fibers.erase(fiber) != 0;
318 (void)deleted;
319 MARL_ASSERT(deleted, "WaitingFibers::take() maps out of sync");
320 return fiber;
321}
322
323Scheduler::TimePoint Scheduler::WaitingFibers::next() const {
324 MARL_ASSERT(*this,
325 "WaitingFibers::next() called when there' no waiting fibers");
326 return timeouts.begin()->timepoint;
327}
328
Ben Clayton27c63672020-04-17 00:02:45 +0100329void Scheduler::WaitingFibers::add(const TimePoint& timeout, Fiber* fiber) {
330 timeouts.emplace(Timeout{timeout, fiber});
331 bool added = fibers.emplace(fiber, timeout).second;
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000332 (void)added;
333 MARL_ASSERT(added, "WaitingFibers::add() fiber already waiting");
334}
335
336void Scheduler::WaitingFibers::erase(Fiber* fiber) {
337 auto it = fibers.find(fiber);
338 if (it != fibers.end()) {
339 auto timeout = it->second;
340 auto erased = timeouts.erase(Timeout{timeout, fiber}) != 0;
341 (void)erased;
342 MARL_ASSERT(erased, "WaitingFibers::erase() maps out of sync");
343 fibers.erase(it);
344 }
345}
346
Ben Claytonec288e22020-02-09 19:20:01 +0000347bool Scheduler::WaitingFibers::contains(Fiber* fiber) const {
348 return fibers.count(fiber) != 0;
349}
350
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000351bool Scheduler::WaitingFibers::Timeout::operator<(const Timeout& o) const {
352 if (timepoint != o.timepoint) {
353 return timepoint < o.timepoint;
354 }
355 return fiber < o.fiber;
356}
357
358////////////////////////////////////////////////////////////////////////////////
Ben Claytonfe71eb92019-09-05 12:35:50 +0100359// Scheduler::Worker
360////////////////////////////////////////////////////////////////////////////////
361thread_local Scheduler::Worker* Scheduler::Worker::current = nullptr;
362
363Scheduler::Worker::Worker(Scheduler* scheduler, Mode mode, uint32_t id)
Ben Clayton32b1d4b2020-06-15 22:23:46 +0100364 : id(id),
365 mode(mode),
366 scheduler(scheduler),
367 work(scheduler->cfg.allocator),
368 idleFibers(scheduler->cfg.allocator) {}
Ben Claytonfe71eb92019-09-05 12:35:50 +0100369
370void Scheduler::Worker::start() {
371 switch (mode) {
Ben Clayton32b1d4b2020-06-15 22:23:46 +0100372 case Mode::MultiThreaded: {
373 auto allocator = scheduler->cfg.allocator;
374 auto& affinityPolicy = scheduler->cfg.workerThread.affinityPolicy;
375 auto affinity = affinityPolicy->get(id, allocator);
376 thread = Thread(std::move(affinity), [=] {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100377 Thread::setName("Thread<%.2d>", int(id));
378
Ben Clayton068a0c52020-06-02 19:51:02 +0100379 if (auto const& initFunc = scheduler->cfg.workerThread.initializer) {
380 initFunc(id);
Ben Claytonfe71eb92019-09-05 12:35:50 +0100381 }
382
383 Scheduler::bound = scheduler;
384 Worker::current = this;
Ben Clayton068a0c52020-06-02 19:51:02 +0100385 mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0);
Ben Claytonfe71eb92019-09-05 12:35:50 +0100386 currentFiber = mainFiber.get();
Ben Clayton36835bc2020-03-16 20:30:01 +0000387 {
Ben Clayton87878972020-04-07 10:43:07 +0100388 marl::lock lock(work.mutex);
Ben Clayton36835bc2020-03-16 20:30:01 +0000389 run();
390 }
Ben Claytonfe71eb92019-09-05 12:35:50 +0100391 mainFiber.reset();
392 Worker::current = nullptr;
393 });
394 break;
Ben Clayton32b1d4b2020-06-15 22:23:46 +0100395 }
396 case Mode::SingleThreaded: {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100397 Worker::current = this;
Ben Clayton068a0c52020-06-02 19:51:02 +0100398 mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0);
Ben Claytonfe71eb92019-09-05 12:35:50 +0100399 currentFiber = mainFiber.get();
400 break;
Ben Clayton32b1d4b2020-06-15 22:23:46 +0100401 }
Ben Claytonfe71eb92019-09-05 12:35:50 +0100402 default:
403 MARL_ASSERT(false, "Unknown mode: %d", int(mode));
404 }
405}
406
407void Scheduler::Worker::stop() {
408 switch (mode) {
Ben Claytona38d7d12020-03-24 00:27:33 +0000409 case Mode::MultiThreaded: {
Ben Clayton36835bc2020-03-16 20:30:01 +0000410 enqueue(Task([this] { shutdown = true; }, Task::Flags::SameThread));
Ben Claytonfe71eb92019-09-05 12:35:50 +0100411 thread.join();
412 break;
Ben Claytona38d7d12020-03-24 00:27:33 +0000413 }
414 case Mode::SingleThreaded: {
Ben Clayton87878972020-04-07 10:43:07 +0100415 marl::lock lock(work.mutex);
Ben Claytona38d7d12020-03-24 00:27:33 +0000416 shutdown = true;
417 runUntilShutdown();
Ben Claytonfe71eb92019-09-05 12:35:50 +0100418 Worker::current = nullptr;
419 break;
Ben Claytona38d7d12020-03-24 00:27:33 +0000420 }
Ben Claytonfe71eb92019-09-05 12:35:50 +0100421 default:
422 MARL_ASSERT(false, "Unknown mode: %d", int(mode));
423 }
424}
425
Ben Claytona38d7d12020-03-24 00:27:33 +0000426bool Scheduler::Worker::wait(const TimePoint* timeout) {
427 DBG_LOG("%d: WAIT(%d)", (int)id, (int)currentFiber->id);
428 {
Ben Clayton87878972020-04-07 10:43:07 +0100429 marl::lock lock(work.mutex);
Ben Claytona38d7d12020-03-24 00:27:33 +0000430 suspend(timeout);
431 }
432 return timeout == nullptr || std::chrono::system_clock::now() < *timeout;
433}
434
Ben Clayton87878972020-04-07 10:43:07 +0100435bool Scheduler::Worker::wait(lock& waitLock,
Ben Claytonec288e22020-02-09 19:20:01 +0000436 const TimePoint* timeout,
437 const Predicate& pred) {
438 DBG_LOG("%d: WAIT(%d)", (int)id, (int)currentFiber->id);
439 while (!pred()) {
440 // Lock the work mutex to call suspend().
441 work.mutex.lock();
442
443 // Unlock the wait mutex with the work mutex lock held.
444 // Order is important here as we need to ensure that the fiber is not
445 // enqueued (via Fiber::notify()) between the waitLock.unlock() and fiber
446 // switch, otherwise the Fiber::notify() call may be ignored and the fiber
447 // is never woken.
Ben Clayton87878972020-04-07 10:43:07 +0100448 waitLock.unlock_no_tsa();
Ben Claytonec288e22020-02-09 19:20:01 +0000449
450 // suspend the fiber.
451 suspend(timeout);
452
453 // Fiber resumed. We don't need the work mutex locked any more.
454 work.mutex.unlock();
455
Ben Claytonf1f6e682020-02-13 17:34:05 +0000456 // Re-lock to either return due to timeout, or call pred().
Ben Clayton87878972020-04-07 10:43:07 +0100457 waitLock.lock_no_tsa();
Ben Claytonf1f6e682020-02-13 17:34:05 +0000458
Ben Claytonec288e22020-02-09 19:20:01 +0000459 // Check timeout.
460 if (timeout != nullptr && std::chrono::system_clock::now() >= *timeout) {
461 return false;
462 }
463
Ben Claytonf1f6e682020-02-13 17:34:05 +0000464 // Spurious wake up. Spin again.
Ben Claytonec288e22020-02-09 19:20:01 +0000465 }
466 return true;
467}
468
469void Scheduler::Worker::suspend(
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000470 const std::chrono::system_clock::time_point* timeout) {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100471 // Current fiber is yielding as it is blocked.
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000472 if (timeout != nullptr) {
Ben Claytonec288e22020-02-09 19:20:01 +0000473 changeFiberState(currentFiber, Fiber::State::Running,
474 Fiber::State::Waiting);
475 work.waiting.add(*timeout, currentFiber);
476 } else {
477 changeFiberState(currentFiber, Fiber::State::Running,
478 Fiber::State::Yielded);
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000479 }
480
481 // First wait until there's something else this worker can do.
Ben Claytonec288e22020-02-09 19:20:01 +0000482 waitForWork();
Ben Claytonfe71eb92019-09-05 12:35:50 +0100483
Ben Claytona38d7d12020-03-24 00:27:33 +0000484 work.numBlockedFibers++;
485
Ben Clayton27c63672020-04-17 00:02:45 +0100486 if (!work.fibers.empty()) {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100487 // There's another fiber that has become unblocked, resume that.
488 work.num--;
Ben Clayton32b1d4b2020-06-15 22:23:46 +0100489 auto to = containers::take(work.fibers);
Ben Claytonec288e22020-02-09 19:20:01 +0000490 ASSERT_FIBER_STATE(to, Fiber::State::Queued);
Ben Claytonfe71eb92019-09-05 12:35:50 +0100491 switchToFiber(to);
Ben Clayton27c63672020-04-17 00:02:45 +0100492 } else if (!idleFibers.empty()) {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100493 // There's an old fiber we can reuse, resume that.
Ben Clayton32b1d4b2020-06-15 22:23:46 +0100494 auto to = containers::take(idleFibers);
Ben Claytonec288e22020-02-09 19:20:01 +0000495 ASSERT_FIBER_STATE(to, Fiber::State::Idle);
Ben Claytonfe71eb92019-09-05 12:35:50 +0100496 switchToFiber(to);
497 } else {
Ben Claytonec288e22020-02-09 19:20:01 +0000498 // Tasks to process and no existing fibers to resume.
499 // Spawn a new fiber.
Ben Claytonfe71eb92019-09-05 12:35:50 +0100500 switchToFiber(createWorkerFiber());
501 }
Ben Claytonec288e22020-02-09 19:20:01 +0000502
Ben Claytona38d7d12020-03-24 00:27:33 +0000503 work.numBlockedFibers--;
504
Ben Claytonec288e22020-02-09 19:20:01 +0000505 setFiberState(currentFiber, Fiber::State::Running);
Ben Claytonfe71eb92019-09-05 12:35:50 +0100506}
507
508bool Scheduler::Worker::tryLock() {
509 return work.mutex.try_lock();
510}
511
512void Scheduler::Worker::enqueue(Fiber* fiber) {
Ben Clayton87878972020-04-07 10:43:07 +0100513 bool notify = false;
514 {
515 marl::lock lock(work.mutex);
516 DBG_LOG("%d: ENQUEUE(%d %s)", (int)id, (int)fiber->id,
517 Fiber::toString(fiber->state));
518 switch (fiber->state) {
519 case Fiber::State::Running:
520 case Fiber::State::Queued:
521 return; // Nothing to do here - task is already queued or running.
522 case Fiber::State::Waiting:
523 work.waiting.erase(fiber);
524 break;
525 case Fiber::State::Idle:
526 case Fiber::State::Yielded:
527 break;
528 }
529 notify = work.notifyAdded;
Ben Clayton27c63672020-04-17 00:02:45 +0100530 work.fibers.push_back(fiber);
Ben Clayton87878972020-04-07 10:43:07 +0100531 MARL_ASSERT(!work.waiting.contains(fiber),
532 "fiber is unexpectedly in the waiting list");
533 setFiberState(fiber, Fiber::State::Queued);
534 work.num++;
Ben Claytonec288e22020-02-09 19:20:01 +0000535 }
Ben Claytonec288e22020-02-09 19:20:01 +0000536
Ben Clayton36835bc2020-03-16 20:30:01 +0000537 if (notify) {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100538 work.added.notify_one();
539 }
540}
541
542void Scheduler::Worker::enqueue(Task&& task) {
543 work.mutex.lock();
544 enqueueAndUnlock(std::move(task));
545}
546
547void Scheduler::Worker::enqueueAndUnlock(Task&& task) {
Ben Clayton36835bc2020-03-16 20:30:01 +0000548 auto notify = work.notifyAdded;
549 work.tasks.push_back(std::move(task));
Ben Claytonfe71eb92019-09-05 12:35:50 +0100550 work.num++;
551 work.mutex.unlock();
Ben Clayton36835bc2020-03-16 20:30:01 +0000552 if (notify) {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100553 work.added.notify_one();
554 }
555}
556
Ben Clayton36835bc2020-03-16 20:30:01 +0000557bool Scheduler::Worker::steal(Task& out) {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100558 if (work.num.load() == 0) {
559 return false;
560 }
561 if (!work.mutex.try_lock()) {
562 return false;
563 }
Ben Clayton27c63672020-04-17 00:02:45 +0100564 if (work.tasks.empty() || work.tasks.front().is(Task::Flags::SameThread)) {
Ben Claytonec288e22020-02-09 19:20:01 +0000565 work.mutex.unlock();
Ben Claytonfe71eb92019-09-05 12:35:50 +0100566 return false;
567 }
568 work.num--;
Ben Clayton32b1d4b2020-06-15 22:23:46 +0100569 out = containers::take(work.tasks);
Ben Claytonec288e22020-02-09 19:20:01 +0000570 work.mutex.unlock();
Ben Claytonfe71eb92019-09-05 12:35:50 +0100571 return true;
572}
573
Ben Claytona38d7d12020-03-24 00:27:33 +0000574void Scheduler::Worker::run() {
575 if (mode == Mode::MultiThreaded) {
576 MARL_NAME_THREAD("Thread<%.2d> Fiber<%.2d>", int(id), Fiber::current()->id);
577 // This is the entry point for a multi-threaded worker.
578 // Start with a regular condition-variable wait for work. This avoids
579 // starting the thread with a spinForWork().
Ben Clayton87878972020-04-07 10:43:07 +0100580 work.wait([this]() REQUIRES(work.mutex) {
581 return work.num > 0 || work.waiting || shutdown;
582 });
Ben Claytona38d7d12020-03-24 00:27:33 +0000583 }
584 ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
585 runUntilShutdown();
586 switchToFiber(mainFiber.get());
Ben Claytonfe71eb92019-09-05 12:35:50 +0100587}
588
Ben Claytona38d7d12020-03-24 00:27:33 +0000589void Scheduler::Worker::runUntilShutdown() {
590 while (!shutdown || work.num > 0 || work.numBlockedFibers > 0U) {
591 waitForWork();
592 runUntilIdle();
Ben Claytonfe71eb92019-09-05 12:35:50 +0100593 }
594}
595
Ben Claytonec288e22020-02-09 19:20:01 +0000596void Scheduler::Worker::waitForWork() {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100597 MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(),
598 "work.num out of sync");
Ben Clayton36835bc2020-03-16 20:30:01 +0000599 if (work.num > 0) {
600 return;
601 }
602
603 if (mode == Mode::MultiThreaded) {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100604 scheduler->onBeginSpinning(id);
Ben Claytonec288e22020-02-09 19:20:01 +0000605 work.mutex.unlock();
Ben Claytonfe71eb92019-09-05 12:35:50 +0100606 spinForWork();
Ben Claytonec288e22020-02-09 19:20:01 +0000607 work.mutex.lock();
Ben Claytonfe71eb92019-09-05 12:35:50 +0100608 }
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000609
Ben Clayton87878972020-04-07 10:43:07 +0100610 work.wait([this]() REQUIRES(work.mutex) {
Ben Claytona38d7d12020-03-24 00:27:33 +0000611 return work.num > 0 || (shutdown && work.numBlockedFibers == 0U);
Ben Clayton36835bc2020-03-16 20:30:01 +0000612 });
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000613 if (work.waiting) {
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000614 enqueueFiberTimeouts();
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000615 }
616}
617
Ben Claytonec288e22020-02-09 19:20:01 +0000618void Scheduler::Worker::enqueueFiberTimeouts() {
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000619 auto now = std::chrono::system_clock::now();
620 while (auto fiber = work.waiting.take(now)) {
Ben Claytonec288e22020-02-09 19:20:01 +0000621 changeFiberState(fiber, Fiber::State::Waiting, Fiber::State::Queued);
622 DBG_LOG("%d: TIMEOUT(%d)", (int)id, (int)fiber->id);
Ben Clayton36835bc2020-03-16 20:30:01 +0000623 work.fibers.push_back(fiber);
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000624 work.num++;
625 }
Ben Claytonfe71eb92019-09-05 12:35:50 +0100626}
627
Ben Claytonec288e22020-02-09 19:20:01 +0000628void Scheduler::Worker::changeFiberState(Fiber* fiber,
629 Fiber::State from,
630 Fiber::State to) const {
631 (void)from; // Unusued parameter when ENABLE_DEBUG_LOGGING is disabled.
632 DBG_LOG("%d: CHANGE_FIBER_STATE(%d %s -> %s)", (int)id, (int)fiber->id,
633 Fiber::toString(from), Fiber::toString(to));
634 ASSERT_FIBER_STATE(fiber, from);
635 fiber->state = to;
636}
637
Ben Claytonec288e22020-02-09 19:20:01 +0000638void Scheduler::Worker::setFiberState(Fiber* fiber, Fiber::State to) const {
639 DBG_LOG("%d: SET_FIBER_STATE(%d %s -> %s)", (int)id, (int)fiber->id,
640 Fiber::toString(fiber->state), Fiber::toString(to));
641 fiber->state = to;
642}
643
Ben Claytonfe71eb92019-09-05 12:35:50 +0100644void Scheduler::Worker::spinForWork() {
645 TRACE("SPIN");
646 Task stolen;
647
648 constexpr auto duration = std::chrono::milliseconds(1);
649 auto start = std::chrono::high_resolution_clock::now();
650 while (std::chrono::high_resolution_clock::now() - start < duration) {
651 for (int i = 0; i < 256; i++) // Empirically picked magic number!
652 {
653 // clang-format off
654 nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
655 nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
656 nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
657 nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
658 // clang-format on
659 if (work.num > 0) {
660 return;
661 }
662 }
663
664 if (scheduler->stealWork(this, rng(), stolen)) {
Ben Clayton87878972020-04-07 10:43:07 +0100665 marl::lock lock(work.mutex);
Ben Clayton36835bc2020-03-16 20:30:01 +0000666 work.tasks.emplace_back(std::move(stolen));
Ben Claytonfe71eb92019-09-05 12:35:50 +0100667 work.num++;
668 return;
669 }
670
671 std::this_thread::yield();
672 }
673}
674
Ben Claytonec288e22020-02-09 19:20:01 +0000675void Scheduler::Worker::runUntilIdle() {
676 ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
Ben Claytonfe71eb92019-09-05 12:35:50 +0100677 MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(),
678 "work.num out of sync");
Ben Clayton27c63672020-04-17 00:02:45 +0100679 while (!work.fibers.empty() || !work.tasks.empty()) {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100680 // Note: we cannot take and store on the stack more than a single fiber
681 // or task at a time, as the Fiber may yield and these items may get
682 // held on suspended fiber stack.
683
Ben Clayton27c63672020-04-17 00:02:45 +0100684 while (!work.fibers.empty()) {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100685 work.num--;
Ben Clayton32b1d4b2020-06-15 22:23:46 +0100686 auto fiber = containers::take(work.fibers);
Ben Claytonec288e22020-02-09 19:20:01 +0000687 // Sanity checks,
688 MARL_ASSERT(idleFibers.count(fiber) == 0, "dequeued fiber is idle");
689 MARL_ASSERT(fiber != currentFiber, "dequeued fiber is currently running");
690 ASSERT_FIBER_STATE(fiber, Fiber::State::Queued);
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000691
Ben Claytonec288e22020-02-09 19:20:01 +0000692 changeFiberState(currentFiber, Fiber::State::Running, Fiber::State::Idle);
Ben Clayton36835bc2020-03-16 20:30:01 +0000693 auto added = idleFibers.emplace(currentFiber).second;
694 (void)added;
695 MARL_ASSERT(added, "fiber already idle");
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000696
Ben Clayton36835bc2020-03-16 20:30:01 +0000697 switchToFiber(fiber);
Ben Claytonec288e22020-02-09 19:20:01 +0000698 changeFiberState(currentFiber, Fiber::State::Idle, Fiber::State::Running);
Ben Claytonfe71eb92019-09-05 12:35:50 +0100699 }
700
Ben Clayton27c63672020-04-17 00:02:45 +0100701 if (!work.tasks.empty()) {
Ben Claytonfe71eb92019-09-05 12:35:50 +0100702 work.num--;
Ben Clayton32b1d4b2020-06-15 22:23:46 +0100703 auto task = containers::take(work.tasks);
Ben Claytonec288e22020-02-09 19:20:01 +0000704 work.mutex.unlock();
Ben Claytonfe71eb92019-09-05 12:35:50 +0100705
706 // Run the task.
707 task();
708
709 // std::function<> can carry arguments with complex destructors.
710 // Ensure these are destructed outside of the lock.
711 task = Task();
712
Ben Claytonec288e22020-02-09 19:20:01 +0000713 work.mutex.lock();
Ben Claytonfe71eb92019-09-05 12:35:50 +0100714 }
715 }
716}
717
718Scheduler::Fiber* Scheduler::Worker::createWorkerFiber() {
Nicolas Capens07ed7cf2019-09-24 10:39:33 -0400719 auto fiberId = static_cast<uint32_t>(workerFibers.size() + 1);
Ben Claytonec288e22020-02-09 19:20:01 +0000720 DBG_LOG("%d: CREATE(%d)", (int)id, (int)fiberId);
Ben Clayton269f38f2020-10-02 13:24:32 +0100721 auto fiber = Fiber::create(scheduler->cfg.allocator, fiberId,
722 scheduler->cfg.fiberStackSize,
Ben Clayton87878972020-04-07 10:43:07 +0100723 [&]() REQUIRES(work.mutex) { run(); });
Ben Clayton193ce892019-10-04 20:46:13 +0100724 auto ptr = fiber.get();
Ben Clayton32b1d4b2020-06-15 22:23:46 +0100725 workerFibers.emplace_back(std::move(fiber));
Ben Clayton193ce892019-10-04 20:46:13 +0100726 return ptr;
Ben Claytonfe71eb92019-09-05 12:35:50 +0100727}
728
729void Scheduler::Worker::switchToFiber(Fiber* to) {
Ben Claytonec288e22020-02-09 19:20:01 +0000730 DBG_LOG("%d: SWITCH(%d -> %d)", (int)id, (int)currentFiber->id, (int)to->id);
Ben Clayton6dd9ff12019-11-15 16:46:36 +0000731 MARL_ASSERT(to == mainFiber.get() || idleFibers.count(to) == 0,
732 "switching to idle fiber");
Ben Claytonfe71eb92019-09-05 12:35:50 +0100733 auto from = currentFiber;
734 currentFiber = to;
735 from->switchTo(to);
736}
737
Ben Clayton36835bc2020-03-16 20:30:01 +0000738////////////////////////////////////////////////////////////////////////////////
739// Scheduler::Worker::Work
740////////////////////////////////////////////////////////////////////////////////
Ben Clayton32b1d4b2020-06-15 22:23:46 +0100741Scheduler::Worker::Work::Work(Allocator* allocator)
742 : tasks(allocator), fibers(allocator), waiting(allocator) {}
743
Ben Clayton36835bc2020-03-16 20:30:01 +0000744template <typename F>
745void Scheduler::Worker::Work::wait(F&& f) {
Ben Clayton36835bc2020-03-16 20:30:01 +0000746 notifyAdded = true;
747 if (waiting) {
Ben Clayton87878972020-04-07 10:43:07 +0100748 mutex.wait_until_locked(added, waiting.next(), f);
Ben Clayton36835bc2020-03-16 20:30:01 +0000749 } else {
Ben Clayton87878972020-04-07 10:43:07 +0100750 mutex.wait_locked(added, f);
Ben Clayton36835bc2020-03-16 20:30:01 +0000751 }
752 notifyAdded = false;
Ben Clayton36835bc2020-03-16 20:30:01 +0000753}
754
Ben Clayton32b1d4b2020-06-15 22:23:46 +0100755////////////////////////////////////////////////////////////////////////////////
756// Scheduler::Worker::Work
757////////////////////////////////////////////////////////////////////////////////
758Scheduler::SingleThreadedWorkers::SingleThreadedWorkers(Allocator* allocator)
759 : byTid(allocator) {}
760
Ben Claytonfe71eb92019-09-05 12:35:50 +0100761} // namespace marl