Ben Clayton | b6c572d | 2019-09-07 11:41:39 +0000 | [diff] [blame^] | 1 | // Copyright 2019 The SwiftShader Authors. All Rights Reserved. |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | #ifndef yarn_scheduler_hpp |
| 16 | #define yarn_scheduler_hpp |
| 17 | |
| 18 | #include "Debug.hpp" |
| 19 | #include "SAL.hpp" |
| 20 | |
| 21 | #include <array> |
| 22 | #include <atomic> |
| 23 | #include <condition_variable> |
| 24 | #include <functional> |
| 25 | #include <mutex> |
| 26 | #include <queue> |
| 27 | #include <thread> |
| 28 | #include <unordered_map> |
| 29 | |
| 30 | namespace yarn { |
| 31 | |
| 32 | class OSFiber; |
| 33 | |
| 34 | // Task is a unit of work for the scheduler. |
| 35 | using Task = std::function<void()>; |
| 36 | |
| 37 | // Scheduler asynchronously processes Tasks. |
| 38 | // A scheduler can be bound to one or more threads using the bind() method. |
| 39 | // Once bound to a thread, that thread can call yarn::schedule() to enqueue |
| 40 | // work tasks to be executed asynchronously. |
| 41 | // Scheduler are initially constructed in single-threaded mode. |
| 42 | // Call setWorkerThreadCount() to spawn dedicated worker threads. |
| 43 | class Scheduler |
| 44 | { |
| 45 | class Worker; |
| 46 | |
| 47 | public: |
| 48 | Scheduler(); |
| 49 | ~Scheduler(); |
| 50 | |
| 51 | // get() returns the scheduler bound to the current thread. |
| 52 | static Scheduler *get(); |
| 53 | |
| 54 | // bind() binds this scheduler to the current thread. |
| 55 | // There must be no existing scheduler bound to the thread prior to calling. |
| 56 | void bind(); |
| 57 | |
| 58 | // unbind() unbinds the scheduler currently bound to the current thread. |
| 59 | // There must be a existing scheduler bound to the thread prior to calling. |
| 60 | static void unbind(); |
| 61 | |
| 62 | // enqueue() queues the task for asynchronous execution. |
| 63 | void enqueue(Task&& task); |
| 64 | |
| 65 | // setThreadInitializer() sets the worker thread initializer function which |
| 66 | // will be called for each new worker thread spawned. |
| 67 | // The initializer will only be called on newly created threads (call |
| 68 | // setThreadInitializer() before setWorkerThreadCount()). |
| 69 | void setThreadInitializer(const std::function<void()>& init); |
| 70 | |
| 71 | // getThreadInitializer() returns the thread initializer function set by |
| 72 | // setThreadInitializer(). |
| 73 | const std::function<void()>& getThreadInitializer(); |
| 74 | |
| 75 | // setWorkerThreadCount() adjusts the number of dedicated worker threads. |
| 76 | // A count of 0 puts the scheduler into single-threaded mode. |
| 77 | // Note: Currently the number of threads cannot be adjusted once tasks |
| 78 | // have been enqueued. This restriction may be lifted at a later time. |
| 79 | void setWorkerThreadCount(int count); |
| 80 | |
| 81 | // getWorkerThreadCount() returns the number of worker threads. |
| 82 | int getWorkerThreadCount(); |
| 83 | |
| 84 | // Fibers expose methods to perform cooperative multitasking and are |
| 85 | // automatically created by the Scheduler. |
| 86 | // |
| 87 | // The currently executing Fiber can be obtained by calling Fiber::current(). |
| 88 | // |
| 89 | // When execution becomes blocked, yield() can be called to suspend execution of |
| 90 | // the fiber and start executing other pending work. |
| 91 | // Once the block has been lifted, schedule() can be called to reschedule the |
| 92 | // Fiber on the same thread that previously executed it. |
| 93 | class Fiber |
| 94 | { |
| 95 | public: |
| 96 | ~Fiber(); |
| 97 | |
| 98 | // current() returns the currently executing fiber, or nullptr if called |
| 99 | // without a bound scheduler. |
| 100 | static Fiber* current(); |
| 101 | |
| 102 | // yield() suspends execution of this Fiber, allowing the thread to work |
| 103 | // on other tasks. |
| 104 | // yield() must only be called on the currently executing fiber. |
| 105 | void yield(); |
| 106 | |
| 107 | // schedule() reschedules the suspended Fiber for execution. |
| 108 | void schedule(); |
| 109 | |
| 110 | // id is the thread-unique identifier of the Fiber. |
| 111 | uint32_t const id; |
| 112 | |
| 113 | private: |
| 114 | friend class Scheduler; |
| 115 | |
| 116 | Fiber(OSFiber*, uint32_t id); |
| 117 | |
| 118 | // switchTo() switches execution to the given fiber. |
| 119 | // switchTo() must only be called on the currently executing fiber. |
| 120 | void switchTo(Fiber*); |
| 121 | |
| 122 | // create() constructs and returns a new fiber with the given identifier, |
| 123 | // stack size that will executed func when switched to. |
| 124 | static Fiber* create(uint32_t id, size_t stackSize, const std::function<void()>& func); |
| 125 | |
| 126 | // createFromCurrentThread() constructs and returns a new fiber with the |
| 127 | // given identifier for the current thread. |
| 128 | static Fiber* createFromCurrentThread(uint32_t id); |
| 129 | |
| 130 | OSFiber* const impl; |
| 131 | Worker* const worker; |
| 132 | }; |
| 133 | |
| 134 | private: |
| 135 | // Stack size in bytes of a new fiber. |
| 136 | // TODO: Make configurable so the default size can be reduced. |
| 137 | static constexpr size_t FiberStackSize = 1024 * 1024; |
| 138 | |
| 139 | // Maximum number of worker threads. |
| 140 | static constexpr size_t MaxWorkerThreads = 64; |
| 141 | |
| 142 | // TODO: Implement a queue that recycles elements to reduce number of |
| 143 | // heap allocations. |
| 144 | using TaskQueue = std::queue<Task>; |
| 145 | using FiberQueue = std::queue<Fiber*>; |
| 146 | |
| 147 | // Workers executes Tasks on a single thread. |
| 148 | // Once a task is started, it may yield to other tasks on the same Worker. |
| 149 | // Tasks are always resumed by the same Worker. |
| 150 | class Worker |
| 151 | { |
| 152 | public: |
| 153 | enum class Mode |
| 154 | { |
| 155 | // Worker will spawn a background thread to process tasks. |
| 156 | MultiThreaded, |
| 157 | |
| 158 | // Worker will execute tasks whenever it yields. |
| 159 | SingleThreaded, |
| 160 | }; |
| 161 | |
| 162 | Worker(Scheduler *scheduler, Mode mode, uint32_t id); |
| 163 | |
| 164 | // start() begins execution of the worker. |
| 165 | void start(); |
| 166 | |
| 167 | // stop() ceases execution of the worker, blocking until all pending |
| 168 | // tasks have fully finished. |
| 169 | void stop(); |
| 170 | |
| 171 | // yield() suspends execution of the current task, and looks for other |
| 172 | // tasks to start or continue execution. |
| 173 | void yield(Fiber* fiber); |
| 174 | |
| 175 | // enqueue(Fiber*) enqueues resuming of a suspended fiber. |
| 176 | void enqueue(Fiber* fiber); |
| 177 | |
| 178 | // enqueue(Task&&) enqueues a new, unstarted task. |
| 179 | void enqueue(Task&& task); |
| 180 | |
| 181 | // tryLock() attempts to lock the worker for task enqueing. |
| 182 | // If the lock was successful then true is returned, and the caller must |
| 183 | // call enqueueAndUnlock(). |
| 184 | bool tryLock(); |
| 185 | |
| 186 | // enqueueAndUnlock() enqueues the task and unlocks the worker. |
| 187 | // Must only be called after a call to tryLock() which returned true. |
| 188 | void enqueueAndUnlock(Task&& task); |
| 189 | |
| 190 | // flush() processes all pending tasks before returning. |
| 191 | void flush(); |
| 192 | |
| 193 | // dequeue() attempts to take a Task from the worker. Returns true if |
| 194 | // a task was taken and assigned to out, otherwise false. |
| 195 | bool dequeue(Task& out); |
| 196 | |
| 197 | // getCurrent() returns the Worker currently bound to the current |
| 198 | // thread. |
| 199 | static inline Worker* getCurrent(); |
| 200 | |
| 201 | // getCurrentFiber() returns the Fiber currently being executed. |
| 202 | inline Fiber* getCurrentFiber() const; |
| 203 | |
| 204 | // Unique identifier of the Worker. |
| 205 | const uint32_t id; |
| 206 | |
| 207 | private: |
| 208 | // run() is the task processing function for the worker. |
| 209 | // If the worker was constructed in Mode::MultiThreaded, run() will |
| 210 | // continue to process tasks until stop() is called. |
| 211 | // If the worker was constructed in Mode::SingleThreaded, run() call |
| 212 | // flush() and return. |
| 213 | void run(); |
| 214 | |
| 215 | // createWorkerFiber() creates a new fiber that when executed calls |
| 216 | // run(). |
| 217 | Fiber* createWorkerFiber(); |
| 218 | |
| 219 | // switchToFiber() switches execution to the given fiber. The fiber |
| 220 | // must belong to this worker. |
| 221 | void switchToFiber(Fiber*); |
| 222 | |
| 223 | // runUntilIdle() executes all pending tasks and then returns. |
| 224 | _Requires_lock_held_(lock) |
| 225 | void runUntilIdle(std::unique_lock<std::mutex> &lock); |
| 226 | |
| 227 | // waitForWork() blocks until new work is available, potentially calling |
| 228 | // spinForWork(). |
| 229 | _Requires_lock_held_(lock) |
| 230 | void waitForWork(std::unique_lock<std::mutex> &lock); |
| 231 | |
| 232 | // spinForWork() attempts to steal work from another Worker, and keeps |
| 233 | // the thread awake for a short duration. This reduces overheads of |
| 234 | // frequently putting the thread to sleep and re-waking. |
| 235 | void spinForWork(); |
| 236 | |
| 237 | // Work holds tasks and fibers that are enqueued on the Worker. |
| 238 | struct Work |
| 239 | { |
| 240 | std::atomic<uint64_t> num = { 0 }; // tasks.size() + fibers.size() |
| 241 | TaskQueue tasks; // guarded by mutex |
| 242 | FiberQueue fibers; // guarded by mutex |
| 243 | std::condition_variable added; |
| 244 | std::mutex mutex; |
| 245 | }; |
| 246 | |
| 247 | // https://en.wikipedia.org/wiki/Xorshift |
| 248 | class FastRnd |
| 249 | { |
| 250 | public: |
| 251 | inline uint64_t operator ()() |
| 252 | { |
| 253 | x ^= x << 13; |
| 254 | x ^= x >> 7; |
| 255 | x ^= x << 17; |
| 256 | return x; |
| 257 | } |
| 258 | private: |
| 259 | uint64_t x = std::chrono::system_clock::now().time_since_epoch().count(); |
| 260 | }; |
| 261 | |
| 262 | // The current worker bound to the current thread. |
| 263 | static thread_local Worker* current; |
| 264 | |
| 265 | Mode const mode; |
| 266 | Scheduler* const scheduler; |
| 267 | std::unique_ptr<Fiber> mainFiber; |
| 268 | Fiber* currentFiber = nullptr; |
| 269 | std::thread thread; |
| 270 | Work work; |
| 271 | FiberQueue idleFibers; // Fibers that have completed which can be reused. |
| 272 | std::vector<std::unique_ptr<Fiber>> workerFibers; // All fibers created by this worker. |
| 273 | FastRnd rng; |
| 274 | std::atomic<bool> shutdown = { false }; |
| 275 | }; |
| 276 | |
| 277 | // stealWork() attempts to steal a task from the worker with the given id. |
| 278 | // Returns true if a task was stolen and assigned to out, otherwise false. |
| 279 | bool stealWork(Worker* thief, uint64_t from, Task& out); |
| 280 | |
| 281 | // onBeginSpinning() is called when a Worker calls spinForWork(). |
| 282 | // The scheduler will prioritize this worker for new tasks to try to prevent |
| 283 | // it going to sleep. |
| 284 | void onBeginSpinning(int workerId); |
| 285 | |
| 286 | // The scheduler currently bound to the current thread. |
| 287 | static thread_local Scheduler* bound; |
| 288 | |
| 289 | std::function<void()> threadInitFunc; |
| 290 | std::mutex threadInitFuncMutex; |
| 291 | |
| 292 | std::array<std::atomic<int>, 8> spinningWorkers; |
| 293 | std::atomic<unsigned int> nextSpinningWorkerIdx = { 0x8000000 }; |
| 294 | |
| 295 | // TODO: Make this lot thread-safe so setWorkerThreadCount() can be called |
| 296 | // during execution of tasks. |
| 297 | std::atomic<unsigned int> nextEnqueueIndex = { 0 }; |
| 298 | unsigned int numWorkerThreads = 0; |
| 299 | std::array<Worker*, MaxWorkerThreads> workerThreads; |
| 300 | |
| 301 | std::mutex singleThreadedWorkerMutex; |
| 302 | std::unordered_map<std::thread::id, std::unique_ptr<Worker>> singleThreadedWorkers; |
| 303 | }; |
| 304 | |
| 305 | Scheduler::Worker* Scheduler::Worker::getCurrent() |
| 306 | { |
| 307 | return Worker::current; |
| 308 | } |
| 309 | |
| 310 | Scheduler::Fiber* Scheduler::Worker::getCurrentFiber() const |
| 311 | { |
| 312 | return currentFiber; |
| 313 | } |
| 314 | |
| 315 | // schedule() schedules the function f to be asynchronously called with the |
| 316 | // given arguments using the currently bound scheduler. |
| 317 | template<typename Function, typename ... Args> |
| 318 | inline void schedule(Function&& f, Args&& ... args) |
| 319 | { |
| 320 | YARN_ASSERT_HAS_BOUND_SCHEDULER("yarn::schedule"); |
| 321 | auto scheduler = Scheduler::get(); |
| 322 | scheduler->enqueue(std::bind(std::forward<Function>(f), std::forward<Args>(args)...)); |
| 323 | } |
| 324 | |
| 325 | // schedule() schedules the function f to be asynchronously called using the |
| 326 | // currently bound scheduler. |
| 327 | template<typename Function> |
| 328 | inline void schedule(Function&& f) |
| 329 | { |
| 330 | YARN_ASSERT_HAS_BOUND_SCHEDULER("yarn::schedule"); |
| 331 | auto scheduler = Scheduler::get(); |
| 332 | scheduler->enqueue(std::forward<Function>(f)); |
| 333 | } |
| 334 | |
| 335 | } // namespace yarn |
| 336 | |
| 337 | #endif // yarn_scheduler_hpp |