Squashed 'third_party/marl/' changes from 38c0c7a0f..c51271125

c51271125 README.md: Repoint link from `master` to `main`
da0a9c610 Add basic CMake rules for versioning marl
0e639c3c7 Benchmarks: Simplify tests further.
32af8bb50 Kokoro: Enable debug checks
8cf8dc033 Wrap all stl containers with a marl::StlAllocator
325b072b9 Support specifying worker thread affinities
49fe9a17f marl::containers: Add const methods
11f31bfbe Benchmarks: Warn if benchmarking with sanitizers
bfdf613e7 Benchmarks: Add MARL_FULL_BENCHMARK flag
5e2383370 Benchmarks: Allow running with custom Config
c277c61b0 marl::containers::vector fixes
fcbe1f279 Add issue number to MARL_DEPRECATED()
1f010cad7 Warn about use of deprecated APIs
d2e553bff Don't use deprecated scheduler methods

git-subtree-dir: third_party/marl
git-subtree-split: c51271125451c599efb9ec58b355a4c434296a8f
diff --git a/src/scheduler.cpp b/src/scheduler.cpp
index 384f67d..66440eb 100644
--- a/src/scheduler.cpp
+++ b/src/scheduler.cpp
@@ -59,21 +59,6 @@
 }
 #endif
 
-template <typename T>
-inline T take(std::deque<T>& queue) {
-  auto out = std::move(queue.front());
-  queue.pop_front();
-  return out;
-}
-
-template <typename T>
-inline T take(std::unordered_set<T>& set) {
-  auto it = set.begin();
-  auto out = std::move(*it);
-  set.erase(it);
-  return out;
-}
-
 inline void nop() {
 #if defined(_WIN32)
   __nop();
@@ -127,7 +112,12 @@
   bound = nullptr;
 }
 
-Scheduler::Scheduler(const Config& config) : cfg(config), workerThreads{} {
+Scheduler::Scheduler(const Config& config)
+    : cfg(config), workerThreads{}, singleThreadedWorkers(config.allocator) {
+  if (cfg.workerThread.count > 0 && !cfg.workerThread.affinityPolicy) {
+    cfg.workerThread.affinityPolicy = Thread::Affinity::Policy::anyOf(
+        Thread::Affinity::all(cfg.allocator), cfg.allocator);
+  }
   for (size_t i = 0; i < spinningWorkers.size(); i++) {
     spinningWorkers[i] = -1;
   }
@@ -162,7 +152,7 @@
 
 #if MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
 Scheduler::Scheduler(Allocator* allocator /* = Allocator::Default */)
-    : workerThreads{} {
+    : workerThreads{}, singleThreadedWorkers(allocator) {
   cfg.allocator = allocator;
   for (size_t i = 0; i < spinningWorkers.size(); i++) {
     spinningWorkers[i] = -1;
@@ -338,6 +328,9 @@
 ////////////////////////////////////////////////////////////////////////////////
 // Scheduler::WaitingFibers
 ////////////////////////////////////////////////////////////////////////////////
+Scheduler::WaitingFibers::WaitingFibers(Allocator* allocator)
+    : timeouts(allocator), fibers(allocator) {}
+
 Scheduler::WaitingFibers::operator bool() const {
   return !fibers.empty();
 }
@@ -399,12 +392,19 @@
 thread_local Scheduler::Worker* Scheduler::Worker::current = nullptr;
 
 Scheduler::Worker::Worker(Scheduler* scheduler, Mode mode, uint32_t id)
-    : id(id), mode(mode), scheduler(scheduler) {}
+    : id(id),
+      mode(mode),
+      scheduler(scheduler),
+      work(scheduler->cfg.allocator),
+      idleFibers(scheduler->cfg.allocator) {}
 
 void Scheduler::Worker::start() {
   switch (mode) {
-    case Mode::MultiThreaded:
-      thread = Thread(id, [=] {
+    case Mode::MultiThreaded: {
+      auto allocator = scheduler->cfg.allocator;
+      auto& affinityPolicy = scheduler->cfg.workerThread.affinityPolicy;
+      auto affinity = affinityPolicy->get(id, allocator);
+      thread = Thread(std::move(affinity), [=] {
         Thread::setName("Thread<%.2d>", int(id));
 
         if (auto const& initFunc = scheduler->cfg.workerThread.initializer) {
@@ -423,13 +423,13 @@
         Worker::current = nullptr;
       });
       break;
-
-    case Mode::SingleThreaded:
+    }
+    case Mode::SingleThreaded: {
       Worker::current = this;
       mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0);
       currentFiber = mainFiber.get();
       break;
-
+    }
     default:
       MARL_ASSERT(false, "Unknown mode: %d", int(mode));
   }
@@ -517,12 +517,12 @@
   if (!work.fibers.empty()) {
     // There's another fiber that has become unblocked, resume that.
     work.num--;
-    auto to = take(work.fibers);
+    auto to = containers::take(work.fibers);
     ASSERT_FIBER_STATE(to, Fiber::State::Queued);
     switchToFiber(to);
   } else if (!idleFibers.empty()) {
     // There's an old fiber we can reuse, resume that.
-    auto to = take(idleFibers);
+    auto to = containers::take(idleFibers);
     ASSERT_FIBER_STATE(to, Fiber::State::Idle);
     switchToFiber(to);
   } else {
@@ -597,7 +597,7 @@
     return false;
   }
   work.num--;
-  out = take(work.tasks);
+  out = containers::take(work.tasks);
   work.mutex.unlock();
   return true;
 }
@@ -714,7 +714,7 @@
 
     while (!work.fibers.empty()) {
       work.num--;
-      auto fiber = take(work.fibers);
+      auto fiber = containers::take(work.fibers);
       // Sanity checks,
       MARL_ASSERT(idleFibers.count(fiber) == 0, "dequeued fiber is idle");
       MARL_ASSERT(fiber != currentFiber, "dequeued fiber is currently running");
@@ -731,7 +731,7 @@
 
     if (!work.tasks.empty()) {
       work.num--;
-      auto task = take(work.tasks);
+      auto task = containers::take(work.tasks);
       work.mutex.unlock();
 
       // Run the task.
@@ -752,7 +752,7 @@
   auto fiber = Fiber::create(scheduler->cfg.allocator, fiberId, FiberStackSize,
                              [&]() REQUIRES(work.mutex) { run(); });
   auto ptr = fiber.get();
-  workerFibers.push_back(std::move(fiber));
+  workerFibers.emplace_back(std::move(fiber));
   return ptr;
 }
 
@@ -768,6 +768,9 @@
 ////////////////////////////////////////////////////////////////////////////////
 // Scheduler::Worker::Work
 ////////////////////////////////////////////////////////////////////////////////
+Scheduler::Worker::Work::Work(Allocator* allocator)
+    : tasks(allocator), fibers(allocator), waiting(allocator) {}
+
 template <typename F>
 void Scheduler::Worker::Work::wait(F&& f) {
   notifyAdded = true;
@@ -779,4 +782,10 @@
   notifyAdded = false;
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// Scheduler::Worker::Work
+////////////////////////////////////////////////////////////////////////////////
+Scheduler::SingleThreadedWorkers::SingleThreadedWorkers(Allocator* allocator)
+    : byTid(allocator) {}
+
 }  // namespace marl