Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 1 | // Copyright 2019 The Marl Authors. |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // https://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | #include "osfiber.h" // Must come first. See osfiber_ucontext.h. |
| 16 | |
| 17 | #include "marl/scheduler.h" |
| 18 | |
| 19 | #include "marl/debug.h" |
Nicolas Capens | b66407c | 2020-10-29 00:34:36 -0400 | [diff] [blame] | 20 | #include "marl/sanitizers.h" |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 21 | #include "marl/thread.h" |
| 22 | #include "marl/trace.h" |
| 23 | |
| 24 | #if defined(_WIN32) |
| 25 | #include <intrin.h> // __nop() |
| 26 | #endif |
| 27 | |
| 28 | // Enable to trace scheduler events. |
| 29 | #define ENABLE_TRACE_EVENTS 0 |
| 30 | |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 31 | // Enable to print verbose debug logging. |
| 32 | #define ENABLE_DEBUG_LOGGING 0 |
| 33 | |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 34 | #if ENABLE_TRACE_EVENTS |
| 35 | #define TRACE(...) MARL_SCOPED_EVENT(__VA_ARGS__) |
| 36 | #else |
| 37 | #define TRACE(...) |
| 38 | #endif |
| 39 | |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 40 | #if ENABLE_DEBUG_LOGGING |
| 41 | #define DBG_LOG(msg, ...) \ |
| 42 | printf("%.3x " msg "\n", (int)threadID() & 0xfff, __VA_ARGS__) |
| 43 | #else |
| 44 | #define DBG_LOG(msg, ...) |
| 45 | #endif |
| 46 | |
| 47 | #define ASSERT_FIBER_STATE(FIBER, STATE) \ |
| 48 | MARL_ASSERT(FIBER->state == STATE, \ |
| 49 | "fiber %d was in state %s, but expected %s", (int)FIBER->id, \ |
| 50 | Fiber::toString(FIBER->state), Fiber::toString(STATE)) |
| 51 | |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 52 | namespace { |
| 53 | |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 54 | #if ENABLE_DEBUG_LOGGING |
| 55 | // threadID() returns a uint64_t representing the currently executing thread. |
| 56 | // threadID() is only intended to be used for debugging purposes. |
| 57 | inline uint64_t threadID() { |
| 58 | auto id = std::this_thread::get_id(); |
| 59 | return std::hash<std::thread::id>()(id); |
| 60 | } |
| 61 | #endif |
| 62 | |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 63 | inline void nop() { |
| 64 | #if defined(_WIN32) |
| 65 | __nop(); |
| 66 | #else |
| 67 | __asm__ __volatile__("nop"); |
| 68 | #endif |
| 69 | } |
| 70 | |
Ben Clayton | 269f38f | 2020-10-02 13:24:32 +0100 | [diff] [blame] | 71 | inline marl::Scheduler::Config setConfigDefaults( |
| 72 | const marl::Scheduler::Config& cfgIn) { |
| 73 | marl::Scheduler::Config cfg{cfgIn}; |
| 74 | if (cfg.workerThread.count > 0 && !cfg.workerThread.affinityPolicy) { |
| 75 | cfg.workerThread.affinityPolicy = marl::Thread::Affinity::Policy::anyOf( |
| 76 | marl::Thread::Affinity::all(cfg.allocator), cfg.allocator); |
| 77 | } |
| 78 | return cfg; |
| 79 | } |
| 80 | |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 81 | } // anonymous namespace |
| 82 | |
| 83 | namespace marl { |
| 84 | |
| 85 | //////////////////////////////////////////////////////////////////////////////// |
| 86 | // Scheduler |
| 87 | //////////////////////////////////////////////////////////////////////////////// |
| 88 | thread_local Scheduler* Scheduler::bound = nullptr; |
| 89 | |
| 90 | Scheduler* Scheduler::get() { |
| 91 | return bound; |
| 92 | } |
| 93 | |
| 94 | void Scheduler::bind() { |
Nicolas Capens | b66407c | 2020-10-29 00:34:36 -0400 | [diff] [blame] | 95 | #if !MEMORY_SANITIZER_ENABLED |
| 96 | // thread_local variables in shared libraries are initialized at load-time, |
| 97 | // but this is not observed by MemorySanitizer if the loader itself was not |
| 98 | // instrumented, leading to false-positive unitialized variable errors. |
| 99 | // See https://github.com/google/marl/issues/184 |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 100 | MARL_ASSERT(bound == nullptr, "Scheduler already bound"); |
Nicolas Capens | b66407c | 2020-10-29 00:34:36 -0400 | [diff] [blame] | 101 | #endif |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 102 | bound = this; |
| 103 | { |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 104 | marl::lock lock(singleThreadedWorkers.mutex); |
Ben Clayton | 068a0c5 | 2020-06-02 19:51:02 +0100 | [diff] [blame] | 105 | auto worker = cfg.allocator->make_unique<Worker>( |
| 106 | this, Worker::Mode::SingleThreaded, -1); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 107 | worker->start(); |
| 108 | auto tid = std::this_thread::get_id(); |
Ben Clayton | a38d7d1 | 2020-03-24 00:27:33 +0000 | [diff] [blame] | 109 | singleThreadedWorkers.byTid.emplace(tid, std::move(worker)); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 110 | } |
| 111 | } |
| 112 | |
| 113 | void Scheduler::unbind() { |
| 114 | MARL_ASSERT(bound != nullptr, "No scheduler bound"); |
Ben Clayton | 068a0c5 | 2020-06-02 19:51:02 +0100 | [diff] [blame] | 115 | auto worker = Worker::getCurrent(); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 116 | worker->stop(); |
Ben Clayton | a38d7d1 | 2020-03-24 00:27:33 +0000 | [diff] [blame] | 117 | { |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 118 | marl::lock lock(bound->singleThreadedWorkers.mutex); |
Ben Clayton | a38d7d1 | 2020-03-24 00:27:33 +0000 | [diff] [blame] | 119 | auto tid = std::this_thread::get_id(); |
| 120 | auto it = bound->singleThreadedWorkers.byTid.find(tid); |
| 121 | MARL_ASSERT(it != bound->singleThreadedWorkers.byTid.end(), |
| 122 | "singleThreadedWorker not found"); |
| 123 | MARL_ASSERT(it->second.get() == worker, "worker is not bound?"); |
| 124 | bound->singleThreadedWorkers.byTid.erase(it); |
Ben Clayton | 27c6367 | 2020-04-17 00:02:45 +0100 | [diff] [blame] | 125 | if (bound->singleThreadedWorkers.byTid.empty()) { |
Ben Clayton | a38d7d1 | 2020-03-24 00:27:33 +0000 | [diff] [blame] | 126 | bound->singleThreadedWorkers.unbind.notify_one(); |
| 127 | } |
| 128 | } |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 129 | bound = nullptr; |
| 130 | } |
| 131 | |
Ben Clayton | 32b1d4b | 2020-06-15 22:23:46 +0100 | [diff] [blame] | 132 | Scheduler::Scheduler(const Config& config) |
Ben Clayton | 269f38f | 2020-10-02 13:24:32 +0100 | [diff] [blame] | 133 | : cfg(setConfigDefaults(config)), |
| 134 | workerThreads{}, |
| 135 | singleThreadedWorkers(config.allocator) { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 136 | for (size_t i = 0; i < spinningWorkers.size(); i++) { |
| 137 | spinningWorkers[i] = -1; |
| 138 | } |
Ben Clayton | 068a0c5 | 2020-06-02 19:51:02 +0100 | [diff] [blame] | 139 | for (int i = 0; i < cfg.workerThread.count; i++) { |
| 140 | workerThreads[i] = |
| 141 | cfg.allocator->create<Worker>(this, Worker::Mode::MultiThreaded, i); |
| 142 | } |
| 143 | for (int i = 0; i < cfg.workerThread.count; i++) { |
| 144 | workerThreads[i]->start(); |
| 145 | } |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 146 | } |
| 147 | |
| 148 | Scheduler::~Scheduler() { |
| 149 | { |
Ben Clayton | a38d7d1 | 2020-03-24 00:27:33 +0000 | [diff] [blame] | 150 | // Wait until all the single threaded workers have been unbound. |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 151 | marl::lock lock(singleThreadedWorkers.mutex); |
| 152 | lock.wait(singleThreadedWorkers.unbind, |
| 153 | [this]() REQUIRES(singleThreadedWorkers.mutex) { |
Ben Clayton | 27c6367 | 2020-04-17 00:02:45 +0100 | [diff] [blame] | 154 | return singleThreadedWorkers.byTid.empty(); |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 155 | }); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 156 | } |
Ben Clayton | f1f6e68 | 2020-02-13 17:34:05 +0000 | [diff] [blame] | 157 | |
| 158 | // Release all worker threads. |
| 159 | // This will wait for all in-flight tasks to complete before returning. |
Ben Clayton | 068a0c5 | 2020-06-02 19:51:02 +0100 | [diff] [blame] | 160 | for (int i = cfg.workerThread.count - 1; i >= 0; i--) { |
| 161 | workerThreads[i]->stop(); |
| 162 | } |
| 163 | for (int i = cfg.workerThread.count - 1; i >= 0; i--) { |
| 164 | cfg.allocator->destroy(workerThreads[i]); |
| 165 | } |
| 166 | } |
| 167 | |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 168 | void Scheduler::enqueue(Task&& task) { |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 169 | if (task.is(Task::Flags::SameThread)) { |
Ben Clayton | 068a0c5 | 2020-06-02 19:51:02 +0100 | [diff] [blame] | 170 | Worker::getCurrent()->enqueue(std::move(task)); |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 171 | return; |
| 172 | } |
Ben Clayton | 068a0c5 | 2020-06-02 19:51:02 +0100 | [diff] [blame] | 173 | if (cfg.workerThread.count > 0) { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 174 | while (true) { |
| 175 | // Prioritize workers that have recently started spinning. |
| 176 | auto i = --nextSpinningWorkerIdx % spinningWorkers.size(); |
| 177 | auto idx = spinningWorkers[i].exchange(-1); |
| 178 | if (idx < 0) { |
| 179 | // If a spinning worker couldn't be found, round-robin the |
| 180 | // workers. |
Ben Clayton | 068a0c5 | 2020-06-02 19:51:02 +0100 | [diff] [blame] | 181 | idx = nextEnqueueIndex++ % cfg.workerThread.count; |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 182 | } |
| 183 | |
| 184 | auto worker = workerThreads[idx]; |
| 185 | if (worker->tryLock()) { |
| 186 | worker->enqueueAndUnlock(std::move(task)); |
| 187 | return; |
| 188 | } |
| 189 | } |
| 190 | } else { |
Ben Clayton | 068a0c5 | 2020-06-02 19:51:02 +0100 | [diff] [blame] | 191 | if (auto worker = Worker::getCurrent()) { |
| 192 | worker->enqueue(std::move(task)); |
| 193 | } else { |
| 194 | MARL_FATAL( |
| 195 | "singleThreadedWorker not found. Did you forget to call " |
| 196 | "marl::Scheduler::bind()?"); |
| 197 | } |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 198 | } |
| 199 | } |
| 200 | |
Ben Clayton | 068a0c5 | 2020-06-02 19:51:02 +0100 | [diff] [blame] | 201 | const Scheduler::Config& Scheduler::config() const { |
| 202 | return cfg; |
| 203 | } |
| 204 | |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 205 | bool Scheduler::stealWork(Worker* thief, uint64_t from, Task& out) { |
Ben Clayton | 068a0c5 | 2020-06-02 19:51:02 +0100 | [diff] [blame] | 206 | if (cfg.workerThread.count > 0) { |
| 207 | auto thread = workerThreads[from % cfg.workerThread.count]; |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 208 | if (thread != thief) { |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 209 | if (thread->steal(out)) { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 210 | return true; |
| 211 | } |
| 212 | } |
| 213 | } |
| 214 | return false; |
| 215 | } |
| 216 | |
| 217 | void Scheduler::onBeginSpinning(int workerId) { |
| 218 | auto idx = nextSpinningWorkerIdx++ % spinningWorkers.size(); |
| 219 | spinningWorkers[idx] = workerId; |
| 220 | } |
| 221 | |
| 222 | //////////////////////////////////////////////////////////////////////////////// |
Ben Clayton | 068a0c5 | 2020-06-02 19:51:02 +0100 | [diff] [blame] | 223 | // Scheduler::Config |
| 224 | //////////////////////////////////////////////////////////////////////////////// |
| 225 | Scheduler::Config Scheduler::Config::allCores() { |
| 226 | return Config().setWorkerThreadCount(Thread::numLogicalCPUs()); |
| 227 | } |
| 228 | |
| 229 | //////////////////////////////////////////////////////////////////////////////// |
| 230 | // Scheduler::Fiber |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 231 | //////////////////////////////////////////////////////////////////////////////// |
Ben Clayton | 193ce89 | 2019-10-04 20:46:13 +0100 | [diff] [blame] | 232 | Scheduler::Fiber::Fiber(Allocator::unique_ptr<OSFiber>&& impl, uint32_t id) |
Ben Clayton | 068a0c5 | 2020-06-02 19:51:02 +0100 | [diff] [blame] | 233 | : id(id), impl(std::move(impl)), worker(Worker::getCurrent()) { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 234 | MARL_ASSERT(worker != nullptr, "No Scheduler::Worker bound"); |
| 235 | } |
| 236 | |
Nicolas Capens | 15478fd | 2021-05-27 13:50:51 -0400 | [diff] [blame] | 237 | // TODO(chromium:1211047): Testing the static thread_local Worker::current for |
| 238 | // null causes a MemorySantizer false positive. |
| 239 | CLANG_NO_SANITIZE_MEMORY |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 240 | Scheduler::Fiber* Scheduler::Fiber::current() { |
Ben Clayton | 068a0c5 | 2020-06-02 19:51:02 +0100 | [diff] [blame] | 241 | auto worker = Worker::getCurrent(); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 242 | return worker != nullptr ? worker->getCurrentFiber() : nullptr; |
| 243 | } |
| 244 | |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 245 | void Scheduler::Fiber::notify() { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 246 | worker->enqueue(this); |
| 247 | } |
| 248 | |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 249 | void Scheduler::Fiber::wait(marl::lock& lock, const Predicate& pred) { |
Nicolas Capens | b66407c | 2020-10-29 00:34:36 -0400 | [diff] [blame] | 250 | MARL_ASSERT(worker == Worker::getCurrent(), |
| 251 | "Scheduler::Fiber::wait() must only be called on the currently " |
| 252 | "executing fiber"); |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 253 | worker->wait(lock, nullptr, pred); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 254 | } |
| 255 | |
| 256 | void Scheduler::Fiber::switchTo(Fiber* to) { |
Nicolas Capens | b66407c | 2020-10-29 00:34:36 -0400 | [diff] [blame] | 257 | MARL_ASSERT(worker == Worker::getCurrent(), |
| 258 | "Scheduler::Fiber::switchTo() must only be called on the " |
| 259 | "currently executing fiber"); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 260 | if (to != this) { |
Ben Clayton | 193ce89 | 2019-10-04 20:46:13 +0100 | [diff] [blame] | 261 | impl->switchTo(to->impl.get()); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 262 | } |
| 263 | } |
| 264 | |
Ben Clayton | 193ce89 | 2019-10-04 20:46:13 +0100 | [diff] [blame] | 265 | Allocator::unique_ptr<Scheduler::Fiber> Scheduler::Fiber::create( |
| 266 | Allocator* allocator, |
| 267 | uint32_t id, |
| 268 | size_t stackSize, |
| 269 | const std::function<void()>& func) { |
| 270 | return allocator->make_unique<Fiber>( |
| 271 | OSFiber::createFiber(allocator, stackSize, func), id); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 272 | } |
| 273 | |
Ben Clayton | 193ce89 | 2019-10-04 20:46:13 +0100 | [diff] [blame] | 274 | Allocator::unique_ptr<Scheduler::Fiber> |
| 275 | Scheduler::Fiber::createFromCurrentThread(Allocator* allocator, uint32_t id) { |
| 276 | return allocator->make_unique<Fiber>( |
| 277 | OSFiber::createFiberFromCurrentThread(allocator), id); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 278 | } |
| 279 | |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 280 | const char* Scheduler::Fiber::toString(State state) { |
| 281 | switch (state) { |
| 282 | case State::Idle: |
| 283 | return "Idle"; |
| 284 | case State::Yielded: |
| 285 | return "Yielded"; |
| 286 | case State::Queued: |
| 287 | return "Queued"; |
| 288 | case State::Running: |
| 289 | return "Running"; |
| 290 | case State::Waiting: |
| 291 | return "Waiting"; |
| 292 | } |
| 293 | MARL_ASSERT(false, "bad fiber state"); |
| 294 | return "<unknown>"; |
| 295 | } |
| 296 | |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 297 | //////////////////////////////////////////////////////////////////////////////// |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 298 | // Scheduler::WaitingFibers |
| 299 | //////////////////////////////////////////////////////////////////////////////// |
Ben Clayton | 32b1d4b | 2020-06-15 22:23:46 +0100 | [diff] [blame] | 300 | Scheduler::WaitingFibers::WaitingFibers(Allocator* allocator) |
| 301 | : timeouts(allocator), fibers(allocator) {} |
| 302 | |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 303 | Scheduler::WaitingFibers::operator bool() const { |
Ben Clayton | 27c6367 | 2020-04-17 00:02:45 +0100 | [diff] [blame] | 304 | return !fibers.empty(); |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 305 | } |
| 306 | |
Ben Clayton | 27c6367 | 2020-04-17 00:02:45 +0100 | [diff] [blame] | 307 | Scheduler::Fiber* Scheduler::WaitingFibers::take(const TimePoint& timeout) { |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 308 | if (!*this) { |
| 309 | return nullptr; |
| 310 | } |
| 311 | auto it = timeouts.begin(); |
Ben Clayton | 27c6367 | 2020-04-17 00:02:45 +0100 | [diff] [blame] | 312 | if (timeout < it->timepoint) { |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 313 | return nullptr; |
| 314 | } |
| 315 | auto fiber = it->fiber; |
| 316 | timeouts.erase(it); |
| 317 | auto deleted = fibers.erase(fiber) != 0; |
| 318 | (void)deleted; |
| 319 | MARL_ASSERT(deleted, "WaitingFibers::take() maps out of sync"); |
| 320 | return fiber; |
| 321 | } |
| 322 | |
| 323 | Scheduler::TimePoint Scheduler::WaitingFibers::next() const { |
| 324 | MARL_ASSERT(*this, |
| 325 | "WaitingFibers::next() called when there' no waiting fibers"); |
| 326 | return timeouts.begin()->timepoint; |
| 327 | } |
| 328 | |
Ben Clayton | 27c6367 | 2020-04-17 00:02:45 +0100 | [diff] [blame] | 329 | void Scheduler::WaitingFibers::add(const TimePoint& timeout, Fiber* fiber) { |
| 330 | timeouts.emplace(Timeout{timeout, fiber}); |
| 331 | bool added = fibers.emplace(fiber, timeout).second; |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 332 | (void)added; |
| 333 | MARL_ASSERT(added, "WaitingFibers::add() fiber already waiting"); |
| 334 | } |
| 335 | |
| 336 | void Scheduler::WaitingFibers::erase(Fiber* fiber) { |
| 337 | auto it = fibers.find(fiber); |
| 338 | if (it != fibers.end()) { |
| 339 | auto timeout = it->second; |
| 340 | auto erased = timeouts.erase(Timeout{timeout, fiber}) != 0; |
| 341 | (void)erased; |
| 342 | MARL_ASSERT(erased, "WaitingFibers::erase() maps out of sync"); |
| 343 | fibers.erase(it); |
| 344 | } |
| 345 | } |
| 346 | |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 347 | bool Scheduler::WaitingFibers::contains(Fiber* fiber) const { |
| 348 | return fibers.count(fiber) != 0; |
| 349 | } |
| 350 | |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 351 | bool Scheduler::WaitingFibers::Timeout::operator<(const Timeout& o) const { |
| 352 | if (timepoint != o.timepoint) { |
| 353 | return timepoint < o.timepoint; |
| 354 | } |
| 355 | return fiber < o.fiber; |
| 356 | } |
| 357 | |
| 358 | //////////////////////////////////////////////////////////////////////////////// |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 359 | // Scheduler::Worker |
| 360 | //////////////////////////////////////////////////////////////////////////////// |
| 361 | thread_local Scheduler::Worker* Scheduler::Worker::current = nullptr; |
| 362 | |
| 363 | Scheduler::Worker::Worker(Scheduler* scheduler, Mode mode, uint32_t id) |
Ben Clayton | 32b1d4b | 2020-06-15 22:23:46 +0100 | [diff] [blame] | 364 | : id(id), |
| 365 | mode(mode), |
| 366 | scheduler(scheduler), |
| 367 | work(scheduler->cfg.allocator), |
| 368 | idleFibers(scheduler->cfg.allocator) {} |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 369 | |
| 370 | void Scheduler::Worker::start() { |
| 371 | switch (mode) { |
Ben Clayton | 32b1d4b | 2020-06-15 22:23:46 +0100 | [diff] [blame] | 372 | case Mode::MultiThreaded: { |
| 373 | auto allocator = scheduler->cfg.allocator; |
| 374 | auto& affinityPolicy = scheduler->cfg.workerThread.affinityPolicy; |
| 375 | auto affinity = affinityPolicy->get(id, allocator); |
| 376 | thread = Thread(std::move(affinity), [=] { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 377 | Thread::setName("Thread<%.2d>", int(id)); |
| 378 | |
Ben Clayton | 068a0c5 | 2020-06-02 19:51:02 +0100 | [diff] [blame] | 379 | if (auto const& initFunc = scheduler->cfg.workerThread.initializer) { |
| 380 | initFunc(id); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 381 | } |
| 382 | |
| 383 | Scheduler::bound = scheduler; |
| 384 | Worker::current = this; |
Ben Clayton | 068a0c5 | 2020-06-02 19:51:02 +0100 | [diff] [blame] | 385 | mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 386 | currentFiber = mainFiber.get(); |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 387 | { |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 388 | marl::lock lock(work.mutex); |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 389 | run(); |
| 390 | } |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 391 | mainFiber.reset(); |
| 392 | Worker::current = nullptr; |
| 393 | }); |
| 394 | break; |
Ben Clayton | 32b1d4b | 2020-06-15 22:23:46 +0100 | [diff] [blame] | 395 | } |
| 396 | case Mode::SingleThreaded: { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 397 | Worker::current = this; |
Ben Clayton | 068a0c5 | 2020-06-02 19:51:02 +0100 | [diff] [blame] | 398 | mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 399 | currentFiber = mainFiber.get(); |
| 400 | break; |
Ben Clayton | 32b1d4b | 2020-06-15 22:23:46 +0100 | [diff] [blame] | 401 | } |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 402 | default: |
| 403 | MARL_ASSERT(false, "Unknown mode: %d", int(mode)); |
| 404 | } |
| 405 | } |
| 406 | |
| 407 | void Scheduler::Worker::stop() { |
| 408 | switch (mode) { |
Ben Clayton | a38d7d1 | 2020-03-24 00:27:33 +0000 | [diff] [blame] | 409 | case Mode::MultiThreaded: { |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 410 | enqueue(Task([this] { shutdown = true; }, Task::Flags::SameThread)); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 411 | thread.join(); |
| 412 | break; |
Ben Clayton | a38d7d1 | 2020-03-24 00:27:33 +0000 | [diff] [blame] | 413 | } |
| 414 | case Mode::SingleThreaded: { |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 415 | marl::lock lock(work.mutex); |
Ben Clayton | a38d7d1 | 2020-03-24 00:27:33 +0000 | [diff] [blame] | 416 | shutdown = true; |
| 417 | runUntilShutdown(); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 418 | Worker::current = nullptr; |
| 419 | break; |
Ben Clayton | a38d7d1 | 2020-03-24 00:27:33 +0000 | [diff] [blame] | 420 | } |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 421 | default: |
| 422 | MARL_ASSERT(false, "Unknown mode: %d", int(mode)); |
| 423 | } |
| 424 | } |
| 425 | |
Ben Clayton | a38d7d1 | 2020-03-24 00:27:33 +0000 | [diff] [blame] | 426 | bool Scheduler::Worker::wait(const TimePoint* timeout) { |
| 427 | DBG_LOG("%d: WAIT(%d)", (int)id, (int)currentFiber->id); |
| 428 | { |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 429 | marl::lock lock(work.mutex); |
Ben Clayton | a38d7d1 | 2020-03-24 00:27:33 +0000 | [diff] [blame] | 430 | suspend(timeout); |
| 431 | } |
| 432 | return timeout == nullptr || std::chrono::system_clock::now() < *timeout; |
| 433 | } |
| 434 | |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 435 | bool Scheduler::Worker::wait(lock& waitLock, |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 436 | const TimePoint* timeout, |
| 437 | const Predicate& pred) { |
| 438 | DBG_LOG("%d: WAIT(%d)", (int)id, (int)currentFiber->id); |
| 439 | while (!pred()) { |
| 440 | // Lock the work mutex to call suspend(). |
| 441 | work.mutex.lock(); |
| 442 | |
| 443 | // Unlock the wait mutex with the work mutex lock held. |
| 444 | // Order is important here as we need to ensure that the fiber is not |
| 445 | // enqueued (via Fiber::notify()) between the waitLock.unlock() and fiber |
| 446 | // switch, otherwise the Fiber::notify() call may be ignored and the fiber |
| 447 | // is never woken. |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 448 | waitLock.unlock_no_tsa(); |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 449 | |
| 450 | // suspend the fiber. |
| 451 | suspend(timeout); |
| 452 | |
| 453 | // Fiber resumed. We don't need the work mutex locked any more. |
| 454 | work.mutex.unlock(); |
| 455 | |
Ben Clayton | f1f6e68 | 2020-02-13 17:34:05 +0000 | [diff] [blame] | 456 | // Re-lock to either return due to timeout, or call pred(). |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 457 | waitLock.lock_no_tsa(); |
Ben Clayton | f1f6e68 | 2020-02-13 17:34:05 +0000 | [diff] [blame] | 458 | |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 459 | // Check timeout. |
| 460 | if (timeout != nullptr && std::chrono::system_clock::now() >= *timeout) { |
| 461 | return false; |
| 462 | } |
| 463 | |
Ben Clayton | f1f6e68 | 2020-02-13 17:34:05 +0000 | [diff] [blame] | 464 | // Spurious wake up. Spin again. |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 465 | } |
| 466 | return true; |
| 467 | } |
| 468 | |
| 469 | void Scheduler::Worker::suspend( |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 470 | const std::chrono::system_clock::time_point* timeout) { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 471 | // Current fiber is yielding as it is blocked. |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 472 | if (timeout != nullptr) { |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 473 | changeFiberState(currentFiber, Fiber::State::Running, |
| 474 | Fiber::State::Waiting); |
| 475 | work.waiting.add(*timeout, currentFiber); |
| 476 | } else { |
| 477 | changeFiberState(currentFiber, Fiber::State::Running, |
| 478 | Fiber::State::Yielded); |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 479 | } |
| 480 | |
| 481 | // First wait until there's something else this worker can do. |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 482 | waitForWork(); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 483 | |
Ben Clayton | a38d7d1 | 2020-03-24 00:27:33 +0000 | [diff] [blame] | 484 | work.numBlockedFibers++; |
| 485 | |
Ben Clayton | 27c6367 | 2020-04-17 00:02:45 +0100 | [diff] [blame] | 486 | if (!work.fibers.empty()) { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 487 | // There's another fiber that has become unblocked, resume that. |
| 488 | work.num--; |
Ben Clayton | 32b1d4b | 2020-06-15 22:23:46 +0100 | [diff] [blame] | 489 | auto to = containers::take(work.fibers); |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 490 | ASSERT_FIBER_STATE(to, Fiber::State::Queued); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 491 | switchToFiber(to); |
Ben Clayton | 27c6367 | 2020-04-17 00:02:45 +0100 | [diff] [blame] | 492 | } else if (!idleFibers.empty()) { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 493 | // There's an old fiber we can reuse, resume that. |
Ben Clayton | 32b1d4b | 2020-06-15 22:23:46 +0100 | [diff] [blame] | 494 | auto to = containers::take(idleFibers); |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 495 | ASSERT_FIBER_STATE(to, Fiber::State::Idle); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 496 | switchToFiber(to); |
| 497 | } else { |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 498 | // Tasks to process and no existing fibers to resume. |
| 499 | // Spawn a new fiber. |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 500 | switchToFiber(createWorkerFiber()); |
| 501 | } |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 502 | |
Ben Clayton | a38d7d1 | 2020-03-24 00:27:33 +0000 | [diff] [blame] | 503 | work.numBlockedFibers--; |
| 504 | |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 505 | setFiberState(currentFiber, Fiber::State::Running); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 506 | } |
| 507 | |
| 508 | bool Scheduler::Worker::tryLock() { |
| 509 | return work.mutex.try_lock(); |
| 510 | } |
| 511 | |
| 512 | void Scheduler::Worker::enqueue(Fiber* fiber) { |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 513 | bool notify = false; |
| 514 | { |
| 515 | marl::lock lock(work.mutex); |
| 516 | DBG_LOG("%d: ENQUEUE(%d %s)", (int)id, (int)fiber->id, |
| 517 | Fiber::toString(fiber->state)); |
| 518 | switch (fiber->state) { |
| 519 | case Fiber::State::Running: |
| 520 | case Fiber::State::Queued: |
| 521 | return; // Nothing to do here - task is already queued or running. |
| 522 | case Fiber::State::Waiting: |
| 523 | work.waiting.erase(fiber); |
| 524 | break; |
| 525 | case Fiber::State::Idle: |
| 526 | case Fiber::State::Yielded: |
| 527 | break; |
| 528 | } |
| 529 | notify = work.notifyAdded; |
Ben Clayton | 27c6367 | 2020-04-17 00:02:45 +0100 | [diff] [blame] | 530 | work.fibers.push_back(fiber); |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 531 | MARL_ASSERT(!work.waiting.contains(fiber), |
| 532 | "fiber is unexpectedly in the waiting list"); |
| 533 | setFiberState(fiber, Fiber::State::Queued); |
| 534 | work.num++; |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 535 | } |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 536 | |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 537 | if (notify) { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 538 | work.added.notify_one(); |
| 539 | } |
| 540 | } |
| 541 | |
| 542 | void Scheduler::Worker::enqueue(Task&& task) { |
| 543 | work.mutex.lock(); |
| 544 | enqueueAndUnlock(std::move(task)); |
| 545 | } |
| 546 | |
| 547 | void Scheduler::Worker::enqueueAndUnlock(Task&& task) { |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 548 | auto notify = work.notifyAdded; |
| 549 | work.tasks.push_back(std::move(task)); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 550 | work.num++; |
| 551 | work.mutex.unlock(); |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 552 | if (notify) { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 553 | work.added.notify_one(); |
| 554 | } |
| 555 | } |
| 556 | |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 557 | bool Scheduler::Worker::steal(Task& out) { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 558 | if (work.num.load() == 0) { |
| 559 | return false; |
| 560 | } |
| 561 | if (!work.mutex.try_lock()) { |
| 562 | return false; |
| 563 | } |
Ben Clayton | 27c6367 | 2020-04-17 00:02:45 +0100 | [diff] [blame] | 564 | if (work.tasks.empty() || work.tasks.front().is(Task::Flags::SameThread)) { |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 565 | work.mutex.unlock(); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 566 | return false; |
| 567 | } |
| 568 | work.num--; |
Ben Clayton | 32b1d4b | 2020-06-15 22:23:46 +0100 | [diff] [blame] | 569 | out = containers::take(work.tasks); |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 570 | work.mutex.unlock(); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 571 | return true; |
| 572 | } |
| 573 | |
Ben Clayton | a38d7d1 | 2020-03-24 00:27:33 +0000 | [diff] [blame] | 574 | void Scheduler::Worker::run() { |
| 575 | if (mode == Mode::MultiThreaded) { |
| 576 | MARL_NAME_THREAD("Thread<%.2d> Fiber<%.2d>", int(id), Fiber::current()->id); |
| 577 | // This is the entry point for a multi-threaded worker. |
| 578 | // Start with a regular condition-variable wait for work. This avoids |
| 579 | // starting the thread with a spinForWork(). |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 580 | work.wait([this]() REQUIRES(work.mutex) { |
| 581 | return work.num > 0 || work.waiting || shutdown; |
| 582 | }); |
Ben Clayton | a38d7d1 | 2020-03-24 00:27:33 +0000 | [diff] [blame] | 583 | } |
| 584 | ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running); |
| 585 | runUntilShutdown(); |
| 586 | switchToFiber(mainFiber.get()); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 587 | } |
| 588 | |
Ben Clayton | a38d7d1 | 2020-03-24 00:27:33 +0000 | [diff] [blame] | 589 | void Scheduler::Worker::runUntilShutdown() { |
| 590 | while (!shutdown || work.num > 0 || work.numBlockedFibers > 0U) { |
| 591 | waitForWork(); |
| 592 | runUntilIdle(); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 593 | } |
| 594 | } |
| 595 | |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 596 | void Scheduler::Worker::waitForWork() { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 597 | MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(), |
| 598 | "work.num out of sync"); |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 599 | if (work.num > 0) { |
| 600 | return; |
| 601 | } |
| 602 | |
| 603 | if (mode == Mode::MultiThreaded) { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 604 | scheduler->onBeginSpinning(id); |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 605 | work.mutex.unlock(); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 606 | spinForWork(); |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 607 | work.mutex.lock(); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 608 | } |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 609 | |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 610 | work.wait([this]() REQUIRES(work.mutex) { |
Ben Clayton | a38d7d1 | 2020-03-24 00:27:33 +0000 | [diff] [blame] | 611 | return work.num > 0 || (shutdown && work.numBlockedFibers == 0U); |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 612 | }); |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 613 | if (work.waiting) { |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 614 | enqueueFiberTimeouts(); |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 615 | } |
| 616 | } |
| 617 | |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 618 | void Scheduler::Worker::enqueueFiberTimeouts() { |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 619 | auto now = std::chrono::system_clock::now(); |
| 620 | while (auto fiber = work.waiting.take(now)) { |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 621 | changeFiberState(fiber, Fiber::State::Waiting, Fiber::State::Queued); |
| 622 | DBG_LOG("%d: TIMEOUT(%d)", (int)id, (int)fiber->id); |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 623 | work.fibers.push_back(fiber); |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 624 | work.num++; |
| 625 | } |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 626 | } |
| 627 | |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 628 | void Scheduler::Worker::changeFiberState(Fiber* fiber, |
| 629 | Fiber::State from, |
| 630 | Fiber::State to) const { |
| 631 | (void)from; // Unusued parameter when ENABLE_DEBUG_LOGGING is disabled. |
| 632 | DBG_LOG("%d: CHANGE_FIBER_STATE(%d %s -> %s)", (int)id, (int)fiber->id, |
| 633 | Fiber::toString(from), Fiber::toString(to)); |
| 634 | ASSERT_FIBER_STATE(fiber, from); |
| 635 | fiber->state = to; |
| 636 | } |
| 637 | |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 638 | void Scheduler::Worker::setFiberState(Fiber* fiber, Fiber::State to) const { |
| 639 | DBG_LOG("%d: SET_FIBER_STATE(%d %s -> %s)", (int)id, (int)fiber->id, |
| 640 | Fiber::toString(fiber->state), Fiber::toString(to)); |
| 641 | fiber->state = to; |
| 642 | } |
| 643 | |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 644 | void Scheduler::Worker::spinForWork() { |
| 645 | TRACE("SPIN"); |
| 646 | Task stolen; |
| 647 | |
| 648 | constexpr auto duration = std::chrono::milliseconds(1); |
| 649 | auto start = std::chrono::high_resolution_clock::now(); |
| 650 | while (std::chrono::high_resolution_clock::now() - start < duration) { |
| 651 | for (int i = 0; i < 256; i++) // Empirically picked magic number! |
| 652 | { |
| 653 | // clang-format off |
| 654 | nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop(); |
| 655 | nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop(); |
| 656 | nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop(); |
| 657 | nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop(); |
| 658 | // clang-format on |
| 659 | if (work.num > 0) { |
| 660 | return; |
| 661 | } |
| 662 | } |
| 663 | |
| 664 | if (scheduler->stealWork(this, rng(), stolen)) { |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 665 | marl::lock lock(work.mutex); |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 666 | work.tasks.emplace_back(std::move(stolen)); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 667 | work.num++; |
| 668 | return; |
| 669 | } |
| 670 | |
| 671 | std::this_thread::yield(); |
| 672 | } |
| 673 | } |
| 674 | |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 675 | void Scheduler::Worker::runUntilIdle() { |
| 676 | ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 677 | MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(), |
| 678 | "work.num out of sync"); |
Ben Clayton | 27c6367 | 2020-04-17 00:02:45 +0100 | [diff] [blame] | 679 | while (!work.fibers.empty() || !work.tasks.empty()) { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 680 | // Note: we cannot take and store on the stack more than a single fiber |
| 681 | // or task at a time, as the Fiber may yield and these items may get |
| 682 | // held on suspended fiber stack. |
| 683 | |
Ben Clayton | 27c6367 | 2020-04-17 00:02:45 +0100 | [diff] [blame] | 684 | while (!work.fibers.empty()) { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 685 | work.num--; |
Ben Clayton | 32b1d4b | 2020-06-15 22:23:46 +0100 | [diff] [blame] | 686 | auto fiber = containers::take(work.fibers); |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 687 | // Sanity checks, |
| 688 | MARL_ASSERT(idleFibers.count(fiber) == 0, "dequeued fiber is idle"); |
| 689 | MARL_ASSERT(fiber != currentFiber, "dequeued fiber is currently running"); |
| 690 | ASSERT_FIBER_STATE(fiber, Fiber::State::Queued); |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 691 | |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 692 | changeFiberState(currentFiber, Fiber::State::Running, Fiber::State::Idle); |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 693 | auto added = idleFibers.emplace(currentFiber).second; |
| 694 | (void)added; |
| 695 | MARL_ASSERT(added, "fiber already idle"); |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 696 | |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 697 | switchToFiber(fiber); |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 698 | changeFiberState(currentFiber, Fiber::State::Idle, Fiber::State::Running); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 699 | } |
| 700 | |
Ben Clayton | 27c6367 | 2020-04-17 00:02:45 +0100 | [diff] [blame] | 701 | if (!work.tasks.empty()) { |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 702 | work.num--; |
Ben Clayton | 32b1d4b | 2020-06-15 22:23:46 +0100 | [diff] [blame] | 703 | auto task = containers::take(work.tasks); |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 704 | work.mutex.unlock(); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 705 | |
| 706 | // Run the task. |
| 707 | task(); |
| 708 | |
| 709 | // std::function<> can carry arguments with complex destructors. |
| 710 | // Ensure these are destructed outside of the lock. |
| 711 | task = Task(); |
| 712 | |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 713 | work.mutex.lock(); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 714 | } |
| 715 | } |
| 716 | } |
| 717 | |
| 718 | Scheduler::Fiber* Scheduler::Worker::createWorkerFiber() { |
Nicolas Capens | 07ed7cf | 2019-09-24 10:39:33 -0400 | [diff] [blame] | 719 | auto fiberId = static_cast<uint32_t>(workerFibers.size() + 1); |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 720 | DBG_LOG("%d: CREATE(%d)", (int)id, (int)fiberId); |
Ben Clayton | 269f38f | 2020-10-02 13:24:32 +0100 | [diff] [blame] | 721 | auto fiber = Fiber::create(scheduler->cfg.allocator, fiberId, |
| 722 | scheduler->cfg.fiberStackSize, |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 723 | [&]() REQUIRES(work.mutex) { run(); }); |
Ben Clayton | 193ce89 | 2019-10-04 20:46:13 +0100 | [diff] [blame] | 724 | auto ptr = fiber.get(); |
Ben Clayton | 32b1d4b | 2020-06-15 22:23:46 +0100 | [diff] [blame] | 725 | workerFibers.emplace_back(std::move(fiber)); |
Ben Clayton | 193ce89 | 2019-10-04 20:46:13 +0100 | [diff] [blame] | 726 | return ptr; |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 727 | } |
| 728 | |
| 729 | void Scheduler::Worker::switchToFiber(Fiber* to) { |
Ben Clayton | ec288e2 | 2020-02-09 19:20:01 +0000 | [diff] [blame] | 730 | DBG_LOG("%d: SWITCH(%d -> %d)", (int)id, (int)currentFiber->id, (int)to->id); |
Ben Clayton | 6dd9ff1 | 2019-11-15 16:46:36 +0000 | [diff] [blame] | 731 | MARL_ASSERT(to == mainFiber.get() || idleFibers.count(to) == 0, |
| 732 | "switching to idle fiber"); |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 733 | auto from = currentFiber; |
| 734 | currentFiber = to; |
| 735 | from->switchTo(to); |
| 736 | } |
| 737 | |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 738 | //////////////////////////////////////////////////////////////////////////////// |
| 739 | // Scheduler::Worker::Work |
| 740 | //////////////////////////////////////////////////////////////////////////////// |
Ben Clayton | 32b1d4b | 2020-06-15 22:23:46 +0100 | [diff] [blame] | 741 | Scheduler::Worker::Work::Work(Allocator* allocator) |
| 742 | : tasks(allocator), fibers(allocator), waiting(allocator) {} |
| 743 | |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 744 | template <typename F> |
| 745 | void Scheduler::Worker::Work::wait(F&& f) { |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 746 | notifyAdded = true; |
| 747 | if (waiting) { |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 748 | mutex.wait_until_locked(added, waiting.next(), f); |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 749 | } else { |
Ben Clayton | 8787897 | 2020-04-07 10:43:07 +0100 | [diff] [blame] | 750 | mutex.wait_locked(added, f); |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 751 | } |
| 752 | notifyAdded = false; |
Ben Clayton | 36835bc | 2020-03-16 20:30:01 +0000 | [diff] [blame] | 753 | } |
| 754 | |
Ben Clayton | 32b1d4b | 2020-06-15 22:23:46 +0100 | [diff] [blame] | 755 | //////////////////////////////////////////////////////////////////////////////// |
| 756 | // Scheduler::Worker::Work |
| 757 | //////////////////////////////////////////////////////////////////////////////// |
| 758 | Scheduler::SingleThreadedWorkers::SingleThreadedWorkers(Allocator* allocator) |
| 759 | : byTid(allocator) {} |
| 760 | |
Ben Clayton | fe71eb9 | 2019-09-05 12:35:50 +0100 | [diff] [blame] | 761 | } // namespace marl |