blob: 7451698a95bada46eb655388bf2307b5bd380bb6 [file] [log] [blame]
Ben Claytonb6c572d2019-09-07 11:41:39 +00001// Copyright 2019 The SwiftShader Authors. All Rights Reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef yarn_scheduler_hpp
16#define yarn_scheduler_hpp
17
18#include "Debug.hpp"
19#include "SAL.hpp"
20
21#include <array>
22#include <atomic>
23#include <condition_variable>
24#include <functional>
25#include <mutex>
26#include <queue>
27#include <thread>
28#include <unordered_map>
29
30namespace yarn {
31
32class OSFiber;
33
34// Task is a unit of work for the scheduler.
35using Task = std::function<void()>;
36
37// Scheduler asynchronously processes Tasks.
38// A scheduler can be bound to one or more threads using the bind() method.
39// Once bound to a thread, that thread can call yarn::schedule() to enqueue
40// work tasks to be executed asynchronously.
41// Scheduler are initially constructed in single-threaded mode.
42// Call setWorkerThreadCount() to spawn dedicated worker threads.
43class Scheduler
44{
45 class Worker;
46
47public:
48 Scheduler();
49 ~Scheduler();
50
51 // get() returns the scheduler bound to the current thread.
52 static Scheduler *get();
53
54 // bind() binds this scheduler to the current thread.
55 // There must be no existing scheduler bound to the thread prior to calling.
56 void bind();
57
58 // unbind() unbinds the scheduler currently bound to the current thread.
59 // There must be a existing scheduler bound to the thread prior to calling.
60 static void unbind();
61
62 // enqueue() queues the task for asynchronous execution.
63 void enqueue(Task&& task);
64
65 // setThreadInitializer() sets the worker thread initializer function which
66 // will be called for each new worker thread spawned.
67 // The initializer will only be called on newly created threads (call
68 // setThreadInitializer() before setWorkerThreadCount()).
69 void setThreadInitializer(const std::function<void()>& init);
70
71 // getThreadInitializer() returns the thread initializer function set by
72 // setThreadInitializer().
73 const std::function<void()>& getThreadInitializer();
74
75 // setWorkerThreadCount() adjusts the number of dedicated worker threads.
76 // A count of 0 puts the scheduler into single-threaded mode.
77 // Note: Currently the number of threads cannot be adjusted once tasks
78 // have been enqueued. This restriction may be lifted at a later time.
79 void setWorkerThreadCount(int count);
80
81 // getWorkerThreadCount() returns the number of worker threads.
82 int getWorkerThreadCount();
83
84 // Fibers expose methods to perform cooperative multitasking and are
85 // automatically created by the Scheduler.
86 //
87 // The currently executing Fiber can be obtained by calling Fiber::current().
88 //
89 // When execution becomes blocked, yield() can be called to suspend execution of
90 // the fiber and start executing other pending work.
91 // Once the block has been lifted, schedule() can be called to reschedule the
92 // Fiber on the same thread that previously executed it.
93 class Fiber
94 {
95 public:
96 ~Fiber();
97
98 // current() returns the currently executing fiber, or nullptr if called
99 // without a bound scheduler.
100 static Fiber* current();
101
102 // yield() suspends execution of this Fiber, allowing the thread to work
103 // on other tasks.
104 // yield() must only be called on the currently executing fiber.
105 void yield();
106
107 // schedule() reschedules the suspended Fiber for execution.
108 void schedule();
109
110 // id is the thread-unique identifier of the Fiber.
111 uint32_t const id;
112
113 private:
114 friend class Scheduler;
115
116 Fiber(OSFiber*, uint32_t id);
117
118 // switchTo() switches execution to the given fiber.
119 // switchTo() must only be called on the currently executing fiber.
120 void switchTo(Fiber*);
121
122 // create() constructs and returns a new fiber with the given identifier,
123 // stack size that will executed func when switched to.
124 static Fiber* create(uint32_t id, size_t stackSize, const std::function<void()>& func);
125
126 // createFromCurrentThread() constructs and returns a new fiber with the
127 // given identifier for the current thread.
128 static Fiber* createFromCurrentThread(uint32_t id);
129
130 OSFiber* const impl;
131 Worker* const worker;
132 };
133
134private:
135 // Stack size in bytes of a new fiber.
136 // TODO: Make configurable so the default size can be reduced.
137 static constexpr size_t FiberStackSize = 1024 * 1024;
138
139 // Maximum number of worker threads.
140 static constexpr size_t MaxWorkerThreads = 64;
141
142 // TODO: Implement a queue that recycles elements to reduce number of
143 // heap allocations.
144 using TaskQueue = std::queue<Task>;
145 using FiberQueue = std::queue<Fiber*>;
146
147 // Workers executes Tasks on a single thread.
148 // Once a task is started, it may yield to other tasks on the same Worker.
149 // Tasks are always resumed by the same Worker.
150 class Worker
151 {
152 public:
153 enum class Mode
154 {
155 // Worker will spawn a background thread to process tasks.
156 MultiThreaded,
157
158 // Worker will execute tasks whenever it yields.
159 SingleThreaded,
160 };
161
162 Worker(Scheduler *scheduler, Mode mode, uint32_t id);
163
164 // start() begins execution of the worker.
165 void start();
166
167 // stop() ceases execution of the worker, blocking until all pending
168 // tasks have fully finished.
169 void stop();
170
171 // yield() suspends execution of the current task, and looks for other
172 // tasks to start or continue execution.
173 void yield(Fiber* fiber);
174
175 // enqueue(Fiber*) enqueues resuming of a suspended fiber.
176 void enqueue(Fiber* fiber);
177
178 // enqueue(Task&&) enqueues a new, unstarted task.
179 void enqueue(Task&& task);
180
181 // tryLock() attempts to lock the worker for task enqueing.
182 // If the lock was successful then true is returned, and the caller must
183 // call enqueueAndUnlock().
184 bool tryLock();
185
186 // enqueueAndUnlock() enqueues the task and unlocks the worker.
187 // Must only be called after a call to tryLock() which returned true.
188 void enqueueAndUnlock(Task&& task);
189
190 // flush() processes all pending tasks before returning.
191 void flush();
192
193 // dequeue() attempts to take a Task from the worker. Returns true if
194 // a task was taken and assigned to out, otherwise false.
195 bool dequeue(Task& out);
196
197 // getCurrent() returns the Worker currently bound to the current
198 // thread.
199 static inline Worker* getCurrent();
200
201 // getCurrentFiber() returns the Fiber currently being executed.
202 inline Fiber* getCurrentFiber() const;
203
204 // Unique identifier of the Worker.
205 const uint32_t id;
206
207 private:
208 // run() is the task processing function for the worker.
209 // If the worker was constructed in Mode::MultiThreaded, run() will
210 // continue to process tasks until stop() is called.
211 // If the worker was constructed in Mode::SingleThreaded, run() call
212 // flush() and return.
213 void run();
214
215 // createWorkerFiber() creates a new fiber that when executed calls
216 // run().
217 Fiber* createWorkerFiber();
218
219 // switchToFiber() switches execution to the given fiber. The fiber
220 // must belong to this worker.
221 void switchToFiber(Fiber*);
222
223 // runUntilIdle() executes all pending tasks and then returns.
224 _Requires_lock_held_(lock)
225 void runUntilIdle(std::unique_lock<std::mutex> &lock);
226
227 // waitForWork() blocks until new work is available, potentially calling
228 // spinForWork().
229 _Requires_lock_held_(lock)
230 void waitForWork(std::unique_lock<std::mutex> &lock);
231
232 // spinForWork() attempts to steal work from another Worker, and keeps
233 // the thread awake for a short duration. This reduces overheads of
234 // frequently putting the thread to sleep and re-waking.
235 void spinForWork();
236
237 // Work holds tasks and fibers that are enqueued on the Worker.
238 struct Work
239 {
240 std::atomic<uint64_t> num = { 0 }; // tasks.size() + fibers.size()
241 TaskQueue tasks; // guarded by mutex
242 FiberQueue fibers; // guarded by mutex
243 std::condition_variable added;
244 std::mutex mutex;
245 };
246
247 // https://en.wikipedia.org/wiki/Xorshift
248 class FastRnd
249 {
250 public:
251 inline uint64_t operator ()()
252 {
253 x ^= x << 13;
254 x ^= x >> 7;
255 x ^= x << 17;
256 return x;
257 }
258 private:
259 uint64_t x = std::chrono::system_clock::now().time_since_epoch().count();
260 };
261
262 // The current worker bound to the current thread.
263 static thread_local Worker* current;
264
265 Mode const mode;
266 Scheduler* const scheduler;
267 std::unique_ptr<Fiber> mainFiber;
268 Fiber* currentFiber = nullptr;
269 std::thread thread;
270 Work work;
271 FiberQueue idleFibers; // Fibers that have completed which can be reused.
272 std::vector<std::unique_ptr<Fiber>> workerFibers; // All fibers created by this worker.
273 FastRnd rng;
274 std::atomic<bool> shutdown = { false };
275 };
276
277 // stealWork() attempts to steal a task from the worker with the given id.
278 // Returns true if a task was stolen and assigned to out, otherwise false.
279 bool stealWork(Worker* thief, uint64_t from, Task& out);
280
281 // onBeginSpinning() is called when a Worker calls spinForWork().
282 // The scheduler will prioritize this worker for new tasks to try to prevent
283 // it going to sleep.
284 void onBeginSpinning(int workerId);
285
286 // The scheduler currently bound to the current thread.
287 static thread_local Scheduler* bound;
288
289 std::function<void()> threadInitFunc;
290 std::mutex threadInitFuncMutex;
291
292 std::array<std::atomic<int>, 8> spinningWorkers;
293 std::atomic<unsigned int> nextSpinningWorkerIdx = { 0x8000000 };
294
295 // TODO: Make this lot thread-safe so setWorkerThreadCount() can be called
296 // during execution of tasks.
297 std::atomic<unsigned int> nextEnqueueIndex = { 0 };
298 unsigned int numWorkerThreads = 0;
299 std::array<Worker*, MaxWorkerThreads> workerThreads;
300
301 std::mutex singleThreadedWorkerMutex;
302 std::unordered_map<std::thread::id, std::unique_ptr<Worker>> singleThreadedWorkers;
303};
304
305Scheduler::Worker* Scheduler::Worker::getCurrent()
306{
307 return Worker::current;
308}
309
310Scheduler::Fiber* Scheduler::Worker::getCurrentFiber() const
311{
312 return currentFiber;
313}
314
315// schedule() schedules the function f to be asynchronously called with the
316// given arguments using the currently bound scheduler.
317template<typename Function, typename ... Args>
318inline void schedule(Function&& f, Args&& ... args)
319{
320 YARN_ASSERT_HAS_BOUND_SCHEDULER("yarn::schedule");
321 auto scheduler = Scheduler::get();
322 scheduler->enqueue(std::bind(std::forward<Function>(f), std::forward<Args>(args)...));
323}
324
325// schedule() schedules the function f to be asynchronously called using the
326// currently bound scheduler.
327template<typename Function>
328inline void schedule(Function&& f)
329{
330 YARN_ASSERT_HAS_BOUND_SCHEDULER("yarn::schedule");
331 auto scheduler = Scheduler::get();
332 scheduler->enqueue(std::forward<Function>(f));
333}
334
335} // namespace yarn
336
337#endif // yarn_scheduler_hpp