Squashed 'third_party/marl/' changes from 38c0c7a0f..c51271125
c51271125 README.md: Repoint link from `master` to `main`
da0a9c610 Add basic CMake rules for versioning marl
0e639c3c7 Benchmarks: Simplify tests further.
32af8bb50 Kokoro: Enable debug checks
8cf8dc033 Wrap all stl containers with a marl::StlAllocator
325b072b9 Support specifying worker thread affinities
49fe9a17f marl::containers: Add const methods
11f31bfbe Benchmarks: Warn if benchmarking with sanitizers
bfdf613e7 Benchmarks: Add MARL_FULL_BENCHMARK flag
5e2383370 Benchmarks: Allow running with custom Config
c277c61b0 marl::containers::vector fixes
fcbe1f279 Add issue number to MARL_DEPRECATED()
1f010cad7 Warn about use of deprecated APIs
d2e553bff Don't use deprecated scheduler methods
git-subtree-dir: third_party/marl
git-subtree-split: c51271125451c599efb9ec58b355a4c434296a8f
diff --git a/src/scheduler.cpp b/src/scheduler.cpp
index 384f67d..66440eb 100644
--- a/src/scheduler.cpp
+++ b/src/scheduler.cpp
@@ -59,21 +59,6 @@
}
#endif
-template <typename T>
-inline T take(std::deque<T>& queue) {
- auto out = std::move(queue.front());
- queue.pop_front();
- return out;
-}
-
-template <typename T>
-inline T take(std::unordered_set<T>& set) {
- auto it = set.begin();
- auto out = std::move(*it);
- set.erase(it);
- return out;
-}
-
inline void nop() {
#if defined(_WIN32)
__nop();
@@ -127,7 +112,12 @@
bound = nullptr;
}
-Scheduler::Scheduler(const Config& config) : cfg(config), workerThreads{} {
+Scheduler::Scheduler(const Config& config)
+ : cfg(config), workerThreads{}, singleThreadedWorkers(config.allocator) {
+ if (cfg.workerThread.count > 0 && !cfg.workerThread.affinityPolicy) {
+ cfg.workerThread.affinityPolicy = Thread::Affinity::Policy::anyOf(
+ Thread::Affinity::all(cfg.allocator), cfg.allocator);
+ }
for (size_t i = 0; i < spinningWorkers.size(); i++) {
spinningWorkers[i] = -1;
}
@@ -162,7 +152,7 @@
#if MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
Scheduler::Scheduler(Allocator* allocator /* = Allocator::Default */)
- : workerThreads{} {
+ : workerThreads{}, singleThreadedWorkers(allocator) {
cfg.allocator = allocator;
for (size_t i = 0; i < spinningWorkers.size(); i++) {
spinningWorkers[i] = -1;
@@ -338,6 +328,9 @@
////////////////////////////////////////////////////////////////////////////////
// Scheduler::WaitingFibers
////////////////////////////////////////////////////////////////////////////////
+Scheduler::WaitingFibers::WaitingFibers(Allocator* allocator)
+ : timeouts(allocator), fibers(allocator) {}
+
Scheduler::WaitingFibers::operator bool() const {
return !fibers.empty();
}
@@ -399,12 +392,19 @@
thread_local Scheduler::Worker* Scheduler::Worker::current = nullptr;
Scheduler::Worker::Worker(Scheduler* scheduler, Mode mode, uint32_t id)
- : id(id), mode(mode), scheduler(scheduler) {}
+ : id(id),
+ mode(mode),
+ scheduler(scheduler),
+ work(scheduler->cfg.allocator),
+ idleFibers(scheduler->cfg.allocator) {}
void Scheduler::Worker::start() {
switch (mode) {
- case Mode::MultiThreaded:
- thread = Thread(id, [=] {
+ case Mode::MultiThreaded: {
+ auto allocator = scheduler->cfg.allocator;
+ auto& affinityPolicy = scheduler->cfg.workerThread.affinityPolicy;
+ auto affinity = affinityPolicy->get(id, allocator);
+ thread = Thread(std::move(affinity), [=] {
Thread::setName("Thread<%.2d>", int(id));
if (auto const& initFunc = scheduler->cfg.workerThread.initializer) {
@@ -423,13 +423,13 @@
Worker::current = nullptr;
});
break;
-
- case Mode::SingleThreaded:
+ }
+ case Mode::SingleThreaded: {
Worker::current = this;
mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0);
currentFiber = mainFiber.get();
break;
-
+ }
default:
MARL_ASSERT(false, "Unknown mode: %d", int(mode));
}
@@ -517,12 +517,12 @@
if (!work.fibers.empty()) {
// There's another fiber that has become unblocked, resume that.
work.num--;
- auto to = take(work.fibers);
+ auto to = containers::take(work.fibers);
ASSERT_FIBER_STATE(to, Fiber::State::Queued);
switchToFiber(to);
} else if (!idleFibers.empty()) {
// There's an old fiber we can reuse, resume that.
- auto to = take(idleFibers);
+ auto to = containers::take(idleFibers);
ASSERT_FIBER_STATE(to, Fiber::State::Idle);
switchToFiber(to);
} else {
@@ -597,7 +597,7 @@
return false;
}
work.num--;
- out = take(work.tasks);
+ out = containers::take(work.tasks);
work.mutex.unlock();
return true;
}
@@ -714,7 +714,7 @@
while (!work.fibers.empty()) {
work.num--;
- auto fiber = take(work.fibers);
+ auto fiber = containers::take(work.fibers);
// Sanity checks,
MARL_ASSERT(idleFibers.count(fiber) == 0, "dequeued fiber is idle");
MARL_ASSERT(fiber != currentFiber, "dequeued fiber is currently running");
@@ -731,7 +731,7 @@
if (!work.tasks.empty()) {
work.num--;
- auto task = take(work.tasks);
+ auto task = containers::take(work.tasks);
work.mutex.unlock();
// Run the task.
@@ -752,7 +752,7 @@
auto fiber = Fiber::create(scheduler->cfg.allocator, fiberId, FiberStackSize,
[&]() REQUIRES(work.mutex) { run(); });
auto ptr = fiber.get();
- workerFibers.push_back(std::move(fiber));
+ workerFibers.emplace_back(std::move(fiber));
return ptr;
}
@@ -768,6 +768,9 @@
////////////////////////////////////////////////////////////////////////////////
// Scheduler::Worker::Work
////////////////////////////////////////////////////////////////////////////////
+Scheduler::Worker::Work::Work(Allocator* allocator)
+ : tasks(allocator), fibers(allocator), waiting(allocator) {}
+
template <typename F>
void Scheduler::Worker::Work::wait(F&& f) {
notifyAdded = true;
@@ -779,4 +782,10 @@
notifyAdded = false;
}
+////////////////////////////////////////////////////////////////////////////////
+// Scheduler::Worker::Work
+////////////////////////////////////////////////////////////////////////////////
+Scheduler::SingleThreadedWorkers::SingleThreadedWorkers(Allocator* allocator)
+ : byTid(allocator) {}
+
} // namespace marl