Squashed 'third_party/marl/' changes from 38c0c7a0f..c51271125

c51271125 README.md: Repoint link from `master` to `main`
da0a9c610 Add basic CMake rules for versioning marl
0e639c3c7 Benchmarks: Simplify tests further.
32af8bb50 Kokoro: Enable debug checks
8cf8dc033 Wrap all stl containers with a marl::StlAllocator
325b072b9 Support specifying worker thread affinities
49fe9a17f marl::containers: Add const methods
11f31bfbe Benchmarks: Warn if benchmarking with sanitizers
bfdf613e7 Benchmarks: Add MARL_FULL_BENCHMARK flag
5e2383370 Benchmarks: Allow running with custom Config
c277c61b0 marl::containers::vector fixes
fcbe1f279 Add issue number to MARL_DEPRECATED()
1f010cad7 Warn about use of deprecated APIs
d2e553bff Don't use deprecated scheduler methods

git-subtree-dir: third_party/marl
git-subtree-split: c51271125451c599efb9ec58b355a4c434296a8f
diff --git a/CMakeLists.txt b/CMakeLists.txt
index eee64fc..da609fd 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -14,9 +14,15 @@
 
 cmake_minimum_required(VERSION 3.0)
 
+include(cmake/parse_version.cmake)
+parse_version("${CMAKE_CURRENT_SOURCE_DIR}/VERSION" MARL)
+
 set(CMAKE_CXX_STANDARD 11)
 
-project(Marl C CXX ASM)
+project(Marl
+    VERSION   "${MARL_VERSION_MAJOR}.${MARL_VERSION_MINOR}.${MARL_VERSION_PATCH}"
+    LANGUAGES C CXX ASM
+)
 
 include(CheckCXXSourceCompiles)
 
@@ -29,7 +35,7 @@
 ###########################################################
 # Options
 ###########################################################
-function (option_if_not_defined name description default)
+function(option_if_not_defined name description default)
     if(NOT DEFINED ${name})
         option(${name} ${description} ${default})
     endif()
@@ -39,15 +45,18 @@
 option_if_not_defined(MARL_BUILD_EXAMPLES "Build example applications" OFF)
 option_if_not_defined(MARL_BUILD_TESTS "Build tests" OFF)
 option_if_not_defined(MARL_BUILD_BENCHMARKS "Build benchmarks" OFF)
+option_if_not_defined(MARL_BUILD_SHARED "Build marl as a shared / dynamic library (default static)" OFF)
 option_if_not_defined(MARL_ASAN "Build marl with address sanitizer" OFF)
 option_if_not_defined(MARL_MSAN "Build marl with memory sanitizer" OFF)
 option_if_not_defined(MARL_TSAN "Build marl with thread sanitizer" OFF)
 option_if_not_defined(MARL_INSTALL "Create marl install target" OFF)
+option_if_not_defined(MARL_FULL_BENCHMARK "Run benchmarks for [0 .. numLogicalCPUs] with no stepping" OFF)
+option_if_not_defined(MARL_DEBUG_ENABLED "Enable debug checks even in release builds" OFF)
 
 ###########################################################
 # Directories
 ###########################################################
-function (set_if_not_defined name value)
+function(set_if_not_defined name value)
     if(NOT DEFINED ${name})
         set(${name} ${value} PARENT_SCOPE)
     endif()
@@ -147,7 +156,7 @@
 # Functions
 ###########################################################
 function(marl_set_target_options target)
-    if (MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED)
+    if(MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED)
         target_compile_options(${target} PRIVATE "-Wthread-safety")
     endif()
 
@@ -186,6 +195,10 @@
         target_link_libraries(${target} PUBLIC "-fsanitize=thread")
     endif()
 
+    if(MARL_DEBUG_ENABLED)
+        target_compile_definitions(${target} PRIVATE "MARL_DEBUG_ENABLED=1")
+    endif()
+
     target_include_directories(${target} PUBLIC $<BUILD_INTERFACE:${MARL_INCLUDE_DIR}>)
 endfunction(marl_set_target_options)
 
@@ -194,9 +207,16 @@
 ###########################################################
 
 # marl
-add_library(marl STATIC ${MARL_LIST})
+if(MARL_BUILD_SHARED) # Can also be controlled by BUILD_SHARED_LIBS
+    add_library(marl SHARED ${MARL_LIST})
+else()
+    add_library(marl ${MARL_LIST})
+endif()
+
 set_target_properties(marl PROPERTIES
     POSITION_INDEPENDENT_CODE 1
+    VERSION ${MARL_VERSION}
+    SOVERSION "${MARL_VERSION_MAJOR}"
 )
 
 marl_set_target_options(marl)
@@ -253,6 +273,7 @@
         ${MARL_SRC_DIR}/parallelize_test.cpp
         ${MARL_SRC_DIR}/pool_test.cpp
         ${MARL_SRC_DIR}/scheduler_test.cpp
+        ${MARL_SRC_DIR}/thread_test.cpp
         ${MARL_SRC_DIR}/ticket_test.cpp
         ${MARL_SRC_DIR}/waitgroup_test.cpp
         ${MARL_GOOGLETEST_DIR}/googletest/src/gtest-all.cc
@@ -293,6 +314,10 @@
 
     marl_set_target_options(marl-benchmarks)
 
+    target_compile_definitions(marl-benchmarks PRIVATE
+        "MARL_FULL_BENCHMARK=${MARL_FULL_BENCHMARK}"
+    )
+
     target_link_libraries(marl-benchmarks PRIVATE benchmark::benchmark marl)
 endif(MARL_BUILD_BENCHMARKS)
 
diff --git a/README.md b/README.md
index 06408dc..b0cd532 100644
--- a/README.md
+++ b/README.md
@@ -23,11 +23,10 @@
 #include <cstdio>
 
 int main() {
-  // Create a marl scheduler using the 4 hardware threads.
+  // Create a marl scheduler using all the logical processors available to the process.
   // Bind this scheduler to the main thread so we can call marl::schedule()
-  marl::Scheduler scheduler;
+  marl::Scheduler scheduler(marl::Scheduler::Config::allCores());
   scheduler.bind();
-  scheduler.setWorkerThreadCount(4);
   defer(scheduler.unbind());  // Automatically unbind before returning.
 
   constexpr int numTasks = 10;
@@ -140,7 +139,7 @@
 
 #### Create one instance of `marl::Scheduler`, use it for the lifetime of the process.
 
-`marl::Scheduler::setWorkerThreadCount()` is an expensive operation as it spawn a number of hardware threads. \
+The `marl::Scheduler` constructor can be expensive as it may spawn a number of hardware threads. \
 Destructing the `marl::Scheduler` requires waiting on all tasks to complete.
 
 Multiple `marl::Scheduler`s may fight each other for hardware thread utilization.
@@ -151,9 +150,8 @@
 
 ```c++
 int main() {
-  marl::Scheduler scheduler;
+  marl::Scheduler scheduler(marl::Scheduler::Config::allCores());
   scheduler.bind();
-  scheduler.setWorkerThreadCount(marl::Thread::numLogicalCPUs());
   defer(scheduler.unbind());
 
   return do_program_stuff();
@@ -196,7 +194,7 @@
 
 Short blocking calls are acceptable, such as a mutex lock to access a data structure. However be careful that you do not use a marl blocking call with a `std::mutex` lock held - the marl task may yield with the lock held, and block other tasks from re-locking the mutex. This sort of situation may end up with a deadlock.
 
-If you need to make a blocking call from a marl worker thread, you may wish to use [`marl::blocking_call()`](https://github.com/google/marl/blob/master/include/marl/blockingcall.h), which will spawn a new thread for performing the call, allowing the marl worker to continue processing other scheduled tasks.
+If you need to make a blocking call from a marl worker thread, you may wish to use [`marl::blocking_call()`](https://github.com/google/marl/blob/main/include/marl/blockingcall.h), which will spawn a new thread for performing the call, allowing the marl worker to continue processing other scheduled tasks.
 
 ---
 
diff --git a/VERSION b/VERSION
new file mode 100644
index 0000000..3b2e7a0
--- /dev/null
+++ b/VERSION
@@ -0,0 +1 @@
+0.0.0-dev
\ No newline at end of file
diff --git a/cmake/parse_version.cmake b/cmake/parse_version.cmake
new file mode 100644
index 0000000..be18eb8
--- /dev/null
+++ b/cmake/parse_version.cmake
@@ -0,0 +1,39 @@
+# Copyright 2020 The Marl Authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# parse_version() reads and parses the version string from FILE, assigning the
+# version string to ${PROJECT}_VERSION and the parsed version to
+# ${PROJECT}_VERSION_MAJOR, ${PROJECT}_VERSION_MINOR, ${PROJECT}_VERSION_PATCH,
+# and the optional ${PROJECT}_VERSION_FLAVOR.
+#
+# The version string take one of the forms:
+#    <major>.<minor>.<patch>
+#    <major>.<minor>.<patch>-<flavor>
+function(parse_version FILE PROJECT)
+    configure_file(${FILE} "${CMAKE_CURRENT_BINARY_DIR}/VERSION") # Required to re-run cmake on version change
+    file(READ ${FILE} VERSION)
+    if(${VERSION} MATCHES "([0-9]+)\\.([0-9]+)\\.([0-9]+)(-[a-zA-Z0-9]+)?")
+        set(FLAVOR "")
+        if(NOT "${CMAKE_MATCH_4}" STREQUAL "")
+            string(SUBSTRING ${CMAKE_MATCH_4} 1 -1 FLAVOR)
+        endif()
+        set("${PROJECT}_VERSION" ${VERSION}       PARENT_SCOPE)
+        set("${PROJECT}_VERSION_MAJOR"  ${CMAKE_MATCH_1} PARENT_SCOPE)
+        set("${PROJECT}_VERSION_MINOR"  ${CMAKE_MATCH_2} PARENT_SCOPE)
+        set("${PROJECT}_VERSION_PATCH"  ${CMAKE_MATCH_3} PARENT_SCOPE)
+        set("${PROJECT}_VERSION_FLAVOR" ${FLAVOR}        PARENT_SCOPE)
+    else()
+        message(FATAL_ERROR "Unable to parse version string '${VERSION}'")
+    endif()
+endfunction()
diff --git a/docs/scheduler.md b/docs/scheduler.md
index e25cfff..15da2aa 100644
--- a/docs/scheduler.md
+++ b/docs/scheduler.md
@@ -82,7 +82,7 @@
 
 When a task is scheduled with a call to `marl::schedule()`, a worker is picked, and the task is placed on to the worker's `work.tasks` queue. The worker is picked using the following rules:
 
-- If the scheduler has no dedicated worker threads (`marl::Scheduler::getWorkerThreadCount() == 0`), then the task is queued on to the [Single-Threaded-Worker](#single-threaded-workers) for the currently executing thread.
+- If the scheduler has no dedicated worker threads (`marl::Scheduler::config().workerThreads.count == 0`), then the task is queued on to the [Single-Threaded-Worker](#single-threaded-workers) for the currently executing thread.
 - Otherwise one of the [Multi-Threaded-Workers](#multi-threaded-workers) is picked. If any workers have entered a [spin-for-work](#marlschedulerworkerspinforwork) state, then these will be prioritized, otherwise a [Multi-Threaded-Worker](#multi-threaded-workers) is picked in a round-robin fashion.
 
 ### `marl::Scheduler::Worker::run()`
@@ -188,15 +188,17 @@
 
 A single-threaded-worker (STW) is created for each thread that is bound with a call to `marl::Scheduler::bind()`.
 
-If the scheduler has no dedicated worker threads (`marl::Scheduler::getWorkerThreadCount() == 0`), then scheduled tasks are queued on to the STW for the currently executing thread.
+If the scheduler has no dedicated worker threads (`marl::Scheduler::config().workerThreads.count == 0`), then scheduled tasks are queued on to the STW for the currently executing thread.
 
 Because in this mode there are no worker threads, the tasks queued on the STW are not automatically background executed. Instead, tasks are only executed whenever there's a call to [`marl::Scheduler::Worker::suspend()`](#marlschedulerworkersuspend).
 The logic for [`suspend()`](#marlschedulerworkersuspend) is common for STWs and MTWs, spawning new fibers that call [`marl::Scheduler::Worker::run()`](#marlschedulerworkerrun) whenever all other fibers are blocked.
 
 ```c++
 void SingleThreadedWorkerExample() {
-  marl::Scheduler scheduler;
-  scheduler.setWorkerThreadCount(0); // STW mode.
+  marl::Scheduler::Config cfg;
+  cfg.setWorkerThreadCount(0); // STW mode.
+
+  marl::Scheduler scheduler(cfg);
   scheduler.bind();
   defer(scheduler.unbind());
 
@@ -219,7 +221,7 @@
 
 ### Multi-Threaded-Workers
 
-Multi-Threaded-Workers are created when `marl::Scheduler::setWorkerThreadCount()` is called with a positive number.
+Multi-Threaded-Workers are created when the `marl::Scheduler` is constructed with a positive number of worker threads (`marl::Scheduler::Config::workerThread::count > 0`).
 
 Each MTW is paired with a new `std::thread` that begins by calling `marl::Scheduler::Worker::run()`.
 
diff --git a/examples/fractal.cpp b/examples/fractal.cpp
index 4d1b4ba..b741f44 100644
--- a/examples/fractal.cpp
+++ b/examples/fractal.cpp
@@ -149,8 +149,7 @@
 int main() {
   // Create a marl scheduler using the full number of logical cpus.
   // Bind this scheduler to the main thread so we can call marl::schedule()
-  marl::Scheduler scheduler;
-  scheduler.setWorkerThreadCount(marl::Thread::numLogicalCPUs());
+  marl::Scheduler scheduler(marl::Scheduler::Config::allCores());
   scheduler.bind();
   defer(scheduler.unbind());  // unbind before destructing the scheduler.
 
diff --git a/examples/hello_task.cpp b/examples/hello_task.cpp
index 3206c3c..47dd06b 100644
--- a/examples/hello_task.cpp
+++ b/examples/hello_task.cpp
@@ -24,9 +24,11 @@
 int main() {
   // Create a marl scheduler using the 4 hardware threads.
   // Bind this scheduler to the main thread so we can call marl::schedule()
-  marl::Scheduler scheduler;
+  marl::Scheduler::Config cfg;
+  cfg.setWorkerThreadCount(4);
+
+  marl::Scheduler scheduler(cfg);
   scheduler.bind();
-  scheduler.setWorkerThreadCount(4);
   defer(scheduler.unbind());  // Automatically unbind before returning.
 
   constexpr int numTasks = 10;
diff --git a/examples/primes.cpp b/examples/primes.cpp
index baa8998..a3e871a 100644
--- a/examples/primes.cpp
+++ b/examples/primes.cpp
@@ -20,6 +20,8 @@
 #include "marl/thread.h"
 #include "marl/ticket.h"
 
+#include <vector>
+
 #include <math.h>
 
 // searchMax defines the upper limit on primes to find.
@@ -42,8 +44,7 @@
 int main() {
   // Create a marl scheduler using the full number of logical cpus.
   // Bind this scheduler to the main thread so we can call marl::schedule()
-  marl::Scheduler scheduler;
-  scheduler.setWorkerThreadCount(marl::Thread::numLogicalCPUs());
+  marl::Scheduler scheduler(marl::Scheduler::Config::allCores());
   scheduler.bind();
   defer(scheduler.unbind());  // unbind before destructing the scheduler.
 
diff --git a/examples/tasks_in_tasks.cpp b/examples/tasks_in_tasks.cpp
index f34c476..1d308af 100644
--- a/examples/tasks_in_tasks.cpp
+++ b/examples/tasks_in_tasks.cpp
@@ -23,9 +23,11 @@
 int main() {
   // Create a marl scheduler using the 4 hardware threads.
   // Bind this scheduler to the main thread so we can call marl::schedule()
-  marl::Scheduler scheduler;
+  marl::Scheduler::Config cfg;
+  cfg.setWorkerThreadCount(4);
+
+  marl::Scheduler scheduler(cfg);
   scheduler.bind();
-  scheduler.setWorkerThreadCount(4);
   defer(scheduler.unbind());  // Automatically unbind before returning.
 
   // marl::schedule() requires the scheduler to be bound to the current thread
diff --git a/include/marl/containers.h b/include/marl/containers.h
index acf421c..1147a97 100644
--- a/include/marl/containers.h
+++ b/include/marl/containers.h
@@ -22,18 +22,66 @@
 #include <cstddef>    // size_t
 #include <utility>    // std::move
 
+#include <deque>
+#include <map>
+#include <set>
+#include <unordered_map>
+#include <unordered_set>
+
 namespace marl {
 namespace containers {
 
 ////////////////////////////////////////////////////////////////////////////////
+// STL wrappers
+// STL containers that use a marl::StlAllocator backed by a marl::Allocator.
+// Note: These may be re-implemented to optimize for marl's usage cases.
+// See: https://github.com/google/marl/issues/129
+////////////////////////////////////////////////////////////////////////////////
+template <typename T>
+using deque = std::deque<T, StlAllocator<T>>;
+
+template <typename K, typename V, typename C = std::less<K>>
+using map = std::map<K, V, C, StlAllocator<std::pair<const K, V>>>;
+
+template <typename K, typename C = std::less<K>>
+using set = std::set<K, C, StlAllocator<K>>;
+
+template <typename K,
+          typename V,
+          typename H = std::hash<K>,
+          typename E = std::equal_to<K>>
+using unordered_map =
+    std::unordered_map<K, V, H, E, StlAllocator<std::pair<const K, V>>>;
+
+template <typename K, typename H = std::hash<K>, typename E = std::equal_to<K>>
+using unordered_set = std::unordered_set<K, H, E, StlAllocator<K>>;
+
+// take() takes and returns the front value from the deque.
+template <typename T>
+inline T take(deque<T>& queue) {
+  auto out = std::move(queue.front());
+  queue.pop_front();
+  return out;
+}
+
+// take() takes and returns the first value from the unordered_set.
+template <typename T, typename H, typename E>
+inline T take(unordered_set<T, H, E>& set) {
+  auto it = set.begin();
+  auto out = std::move(*it);
+  set.erase(it);
+  return out;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 // vector<T, BASE_CAPACITY>
 ////////////////////////////////////////////////////////////////////////////////
 
 // vector is a container of contiguously stored elements.
-// Unlike std::vector, marl::containers::vector keeps the first BASE_CAPACITY
-// elements internally, which will avoid dynamic heap allocations.
-// Once the vector exceeds BASE_CAPACITY elements, vector will allocate storage
-// from the heap.
+// Unlike std::vector, marl::containers::vector keeps the first
+// BASE_CAPACITY elements internally, which will avoid dynamic heap
+// allocations. Once the vector exceeds BASE_CAPACITY elements, vector will
+// allocate storage from the heap.
 template <typename T, int BASE_CAPACITY>
 class vector {
  public:
@@ -49,6 +97,8 @@
 
   inline ~vector();
 
+  inline vector& operator=(const vector&);
+
   template <int BASE_CAPACITY_2>
   inline vector<T, BASE_CAPACITY>& operator=(const vector<T, BASE_CAPACITY_2>&);
 
@@ -60,21 +110,30 @@
   inline void pop_back();
   inline T& front();
   inline T& back();
+  inline const T& front() const;
+  inline const T& back() const;
   inline T* begin();
   inline T* end();
+  inline const T* begin() const;
+  inline const T* end() const;
   inline T& operator[](size_t i);
   inline const T& operator[](size_t i) const;
   inline size_t size() const;
   inline size_t cap() const;
   inline void resize(size_t n);
   inline void reserve(size_t n);
+  inline T* data();
+  inline const T* data() const;
+
+  Allocator* const allocator;
 
  private:
   using TStorage = typename marl::aligned_storage<sizeof(T), alignof(T)>::type;
 
+  vector(const vector&) = delete;
+
   inline void free();
 
-  Allocator* const allocator;
   size_t count = 0;
   size_t capacity = BASE_CAPACITY;
   TStorage buffer[BASE_CAPACITY];
@@ -111,6 +170,18 @@
 }
 
 template <typename T, int BASE_CAPACITY>
+vector<T, BASE_CAPACITY>& vector<T, BASE_CAPACITY>::operator=(
+    const vector& other) {
+  free();
+  reserve(other.size());
+  count = other.size();
+  for (size_t i = 0; i < count; i++) {
+    new (&reinterpret_cast<T*>(elements)[i]) T(other[i]);
+  }
+  return *this;
+}
+
+template <typename T, int BASE_CAPACITY>
 template <int BASE_CAPACITY_2>
 vector<T, BASE_CAPACITY>& vector<T, BASE_CAPACITY>::operator=(
     const vector<T, BASE_CAPACITY_2>& other) {
@@ -171,6 +242,18 @@
 }
 
 template <typename T, int BASE_CAPACITY>
+const T& vector<T, BASE_CAPACITY>::front() const {
+  MARL_ASSERT(count > 0, "front() called on empty vector");
+  return reinterpret_cast<T*>(elements)[0];
+}
+
+template <typename T, int BASE_CAPACITY>
+const T& vector<T, BASE_CAPACITY>::back() const {
+  MARL_ASSERT(count > 0, "back() called on empty vector");
+  return reinterpret_cast<T*>(elements)[count - 1];
+}
+
+template <typename T, int BASE_CAPACITY>
 T* vector<T, BASE_CAPACITY>::begin() {
   return reinterpret_cast<T*>(elements);
 }
@@ -181,6 +264,16 @@
 }
 
 template <typename T, int BASE_CAPACITY>
+const T* vector<T, BASE_CAPACITY>::begin() const {
+  return reinterpret_cast<T*>(elements);
+}
+
+template <typename T, int BASE_CAPACITY>
+const T* vector<T, BASE_CAPACITY>::end() const {
+  return reinterpret_cast<T*>(elements) + count;
+}
+
+template <typename T, int BASE_CAPACITY>
 T& vector<T, BASE_CAPACITY>::operator[](size_t i) {
   MARL_ASSERT(i < count, "index %d exceeds vector size %d", int(i), int(count));
   return reinterpret_cast<T*>(elements)[i];
@@ -231,6 +324,16 @@
 }
 
 template <typename T, int BASE_CAPACITY>
+T* vector<T, BASE_CAPACITY>::data() {
+  return elements;
+}
+
+template <typename T, int BASE_CAPACITY>
+const T* vector<T, BASE_CAPACITY>::data() const {
+  return elements;
+}
+
+template <typename T, int BASE_CAPACITY>
 void vector<T, BASE_CAPACITY>::free() {
   for (size_t i = 0; i < count; i++) {
     reinterpret_cast<T*>(elements)[i].~T();
@@ -238,6 +341,7 @@
 
   if (allocation.ptr != nullptr) {
     allocator->free(allocation);
+    allocation = {};
     elements = nullptr;
   }
 }
diff --git a/include/marl/deprecated.h b/include/marl/deprecated.h
index 1f5caed..430e9c7 100644
--- a/include/marl/deprecated.h
+++ b/include/marl/deprecated.h
@@ -24,13 +24,23 @@
 #endif  // MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
 
 #ifndef MARL_WARN_DEPRECATED
-#define MARL_WARN_DEPRECATED 0
+#define MARL_WARN_DEPRECATED 1
 #endif  // MARL_WARN_DEPRECATED
 
 #if MARL_WARN_DEPRECATED
-#define MARL_DEPRECATED(message) __attribute__((deprecated(message)))
+#if defined(_WIN32)
+#define MARL_DEPRECATED(issue_num, message)                              \
+  __declspec(deprecated(                                                 \
+      message "\nSee: https://github.com/google/marl/issues/" #issue_num \
+              " for more information"))
 #else
-#define MARL_DEPRECATED(message)
+#define MARL_DEPRECATED(issue_num, message)                              \
+  __attribute__((deprecated(                                             \
+      message "\nSee: https://github.com/google/marl/issues/" #issue_num \
+              " for more information")))
+#endif
+#else
+#define MARL_DEPRECATED(issue_num, message)
 #endif
 
 #endif  // marl_deprecated_h
diff --git a/include/marl/memory.h b/include/marl/memory.h
index 8608851..d7992ae 100644
--- a/include/marl/memory.h
+++ b/include/marl/memory.h
@@ -27,6 +27,9 @@
 
 namespace marl {
 
+template <typename T>
+struct StlAllocator;
+
 // pageSize() returns the size in bytes of a virtual memory page for the host
 // system.
 size_t pageSize();
@@ -36,6 +39,19 @@
   return alignment * ((val + alignment - 1) / alignment);
 }
 
+// aligned_storage() is a replacement for std::aligned_storage that isn't busted
+// on older versions of MSVC.
+template <size_t SIZE, size_t ALIGNMENT>
+struct aligned_storage {
+  struct alignas(ALIGNMENT) type {
+    unsigned char data[SIZE];
+  };
+};
+
+///////////////////////////////////////////////////////////////////////////////
+// Allocation
+///////////////////////////////////////////////////////////////////////////////
+
 // Allocation holds the result of a memory allocation from an Allocator.
 struct Allocation {
   // Intended usage of the allocation. Used for allocation trackers.
@@ -45,6 +61,7 @@
     Create,  // Allocator::create(), make_unique(), make_shared()
     Vector,  // marl::containers::vector<T>
     List,    // marl::containers::list<T>
+    Stl,     // marl::StlAllocator
     Count,   // Not intended to be used as a usage type - used for upper bound.
   };
 
@@ -60,6 +77,10 @@
   Request request;      // Request used for the allocation.
 };
 
+///////////////////////////////////////////////////////////////////////////////
+// Allocator
+///////////////////////////////////////////////////////////////////////////////
+
 // Allocator is an interface to a memory allocator.
 // Marl provides a default implementation with Allocator::Default.
 class Allocator {
@@ -183,14 +204,9 @@
   return std::shared_ptr<T>(reinterpret_cast<T*>(alloc.ptr), Deleter{this});
 }
 
-// aligned_storage() is a replacement for std::aligned_storage that isn't busted
-// on older versions of MSVC.
-template <size_t SIZE, size_t ALIGNMENT>
-struct aligned_storage {
-  struct alignas(ALIGNMENT) type {
-    unsigned char data[SIZE];
-  };
-};
+///////////////////////////////////////////////////////////////////////////////
+// TrackedAllocator
+///////////////////////////////////////////////////////////////////////////////
 
 // TrackedAllocator wraps an Allocator to track the allocations made.
 class TrackedAllocator : public Allocator {
@@ -280,6 +296,141 @@
   return allocator->free(allocation);
 }
 
+///////////////////////////////////////////////////////////////////////////////
+// StlAllocator
+///////////////////////////////////////////////////////////////////////////////
+
+// StlAllocator exposes an STL-compatible allocator wrapping a marl::Allocator.
+template <typename T>
+struct StlAllocator {
+  using value_type = T;
+  using pointer = T*;
+  using const_pointer = const T*;
+  using reference = T&;
+  using const_reference = const T&;
+  using size_type = size_t;
+  using difference_type = size_t;
+
+  // An equivalent STL allocator for a different type.
+  template <class U>
+  struct rebind {
+    typedef StlAllocator<U> other;
+  };
+
+  // Constructs an StlAllocator that will allocate using allocator.
+  // allocator must remain valid until this StlAllocator has been destroyed.
+  inline StlAllocator(Allocator* allocator);
+
+  template <typename U>
+  inline StlAllocator(const StlAllocator<U>& other);
+
+  // Returns the actual address of x even in presence of overloaded operator&.
+  inline pointer address(reference x) const;
+  inline const_pointer address(const_reference x) const;
+
+  // Allocates the memory for n objects of type T.
+  // Does not actually construct the objects.
+  inline T* allocate(std::size_t n);
+
+  // Deallocates the memory for n objects of type T.
+  inline void deallocate(T* p, std::size_t n);
+
+  // Returns the maximum theoretically possible number of T stored in this
+  // allocator.
+  inline size_type max_size() const;
+
+  // Copy constructs an object of type T at the address p.
+  inline void construct(pointer p, const_reference val);
+
+  // Constructs an object of type U at the address P forwarning all other
+  // arguments to the constructor.
+  template <typename U, typename... Args>
+  inline void construct(U* p, Args&&... args);
+
+  // Deconstructs the object at p. It does not free the memory.
+  inline void destroy(pointer p);
+
+  // Deconstructs the object at p. It does not free the memory.
+  template <typename U>
+  inline void destroy(U* p);
+
+ private:
+  inline Allocation::Request request(size_t n) const;
+
+  template <typename U>
+  friend struct StlAllocator;
+  Allocator* allocator;
+};
+
+template <typename T>
+StlAllocator<T>::StlAllocator(Allocator* allocator) : allocator(allocator) {}
+
+template <typename T>
+template <typename U>
+StlAllocator<T>::StlAllocator(const StlAllocator<U>& other) {
+  allocator = other.allocator;
+}
+
+template <typename T>
+typename StlAllocator<T>::pointer StlAllocator<T>::address(reference x) const {
+  return &x;
+}
+template <typename T>
+typename StlAllocator<T>::const_pointer StlAllocator<T>::address(
+    const_reference x) const {
+  return &x;
+}
+
+template <typename T>
+T* StlAllocator<T>::allocate(std::size_t n) {
+  auto alloc = allocator->allocate(request(n));
+  return reinterpret_cast<T*>(alloc.ptr);
+}
+
+template <typename T>
+void StlAllocator<T>::deallocate(T* p, std::size_t n) {
+  Allocation alloc;
+  alloc.ptr = p;
+  alloc.request = request(n);
+  allocator->free(alloc);
+}
+
+template <typename T>
+typename StlAllocator<T>::size_type StlAllocator<T>::max_size() const {
+  return std::numeric_limits<size_type>::max() / sizeof(value_type);
+}
+
+template <typename T>
+void StlAllocator<T>::construct(pointer p, const_reference val) {
+  new (p) T(val);
+}
+
+template <typename T>
+template <typename U, typename... Args>
+void StlAllocator<T>::construct(U* p, Args&&... args) {
+  ::new ((void*)p) U(std::forward<Args>(args)...);
+}
+
+template <typename T>
+void StlAllocator<T>::destroy(pointer p) {
+  ((T*)p)->~T();
+}
+
+template <typename T>
+template <typename U>
+void StlAllocator<T>::destroy(U* p) {
+  p->~U();
+}
+
+template <typename T>
+Allocation::Request StlAllocator<T>::request(size_t n) const {
+  Allocation::Request req = {};
+  req.size = sizeof(T) * n;
+  req.alignment = alignof(T);
+  req.usage = Allocation::Usage::Stl;
+  return req;
+}
+
 }  // namespace marl
 
 #endif  // marl_memory_h
diff --git a/include/marl/pool.h b/include/marl/pool.h
index 70d53c9..41a37c8 100644
--- a/include/marl/pool.h
+++ b/include/marl/pool.h
@@ -363,7 +363,7 @@
 
     Allocator* allocator;
     marl::mutex mutex;
-    std::vector<Item*> items;
+    containers::vector<Item*, 4> items;
     Item* free = nullptr;
   };
 
@@ -373,7 +373,7 @@
 
 template <typename T, PoolPolicy POLICY>
 UnboundedPool<T, POLICY>::Storage::Storage(Allocator* allocator)
-    : allocator(allocator) {}
+    : allocator(allocator), items(allocator) {}
 
 template <typename T, PoolPolicy POLICY>
 UnboundedPool<T, POLICY>::Storage::~Storage() {
diff --git a/include/marl/scheduler.h b/include/marl/scheduler.h
index 7dc033f..12cc8c1 100644
--- a/include/marl/scheduler.h
+++ b/include/marl/scheduler.h
@@ -15,6 +15,7 @@
 #ifndef marl_scheduler_h
 #define marl_scheduler_h
 
+#include "containers.h"
 #include "debug.h"
 #include "deprecated.h"
 #include "memory.h"
@@ -26,14 +27,8 @@
 #include <atomic>
 #include <chrono>
 #include <condition_variable>
-#include <deque>
 #include <functional>
-#include <map>
-#include <set>
 #include <thread>
-#include <unordered_map>
-#include <unordered_set>
-#include <vector>
 
 namespace marl {
 
@@ -60,6 +55,7 @@
     struct WorkerThread {
       int count = 0;
       ThreadInitializer initializer;
+      std::shared_ptr<Thread::Affinity::Policy> affinityPolicy;
     };
     WorkerThread workerThread;
 
@@ -74,6 +70,8 @@
     inline Config& setAllocator(Allocator*);
     inline Config& setWorkerThreadCount(int);
     inline Config& setWorkerThreadInitializer(const ThreadInitializer&);
+    inline Config& setWorkerThreadAffinityPolicy(
+        const std::shared_ptr<Thread::Affinity::Policy>&);
   };
 
   // Constructor.
@@ -103,30 +101,30 @@
   const Config& config() const;
 
 #if MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
-  MARL_DEPRECATED("use Scheduler::Scheduler(const Config&)")
+  MARL_DEPRECATED(139, "use Scheduler::Scheduler(const Config&)")
   Scheduler(Allocator* allocator = Allocator::Default);
 
   // setThreadInitializer() sets the worker thread initializer function which
   // will be called for each new worker thread spawned.
   // The initializer will only be called on newly created threads (call
   // setThreadInitializer() before setWorkerThreadCount()).
-  MARL_DEPRECATED("use Config::setWorkerThreadInitializer()")
+  MARL_DEPRECATED(139, "use Config::setWorkerThreadInitializer()")
   void setThreadInitializer(const std::function<void()>& init);
 
   // getThreadInitializer() returns the thread initializer function set by
   // setThreadInitializer().
-  MARL_DEPRECATED("use config().workerThread.initializer")
+  MARL_DEPRECATED(139, "use config().workerThread.initializer")
   std::function<void()> getThreadInitializer();
 
   // setWorkerThreadCount() adjusts the number of dedicated worker threads.
   // A count of 0 puts the scheduler into single-threaded mode.
   // Note: Currently the number of threads cannot be adjusted once tasks
   // have been enqueued. This restriction may be lifted at a later time.
-  MARL_DEPRECATED("use Config::setWorkerThreadCount()")
+  MARL_DEPRECATED(139, "use Config::setWorkerThreadCount()")
   void setWorkerThreadCount(int count);
 
   // getWorkerThreadCount() returns the number of worker threads.
-  MARL_DEPRECATED("use config().workerThread.count")
+  MARL_DEPRECATED(139, "use config().workerThread.count")
   int getWorkerThreadCount();
 #endif  // MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
 
@@ -288,6 +286,8 @@
 
   // WaitingFibers holds all the fibers waiting on a timeout.
   struct WaitingFibers {
+    inline WaitingFibers(Allocator*);
+
     // operator bool() returns true iff there are any wait fibers.
     inline operator bool() const;
 
@@ -314,15 +314,15 @@
       Fiber* fiber;
       inline bool operator<(const Timeout&) const;
     };
-    std::set<Timeout> timeouts;
-    std::unordered_map<Fiber*, TimePoint> fibers;
+    containers::set<Timeout, std::less<Timeout>> timeouts;
+    containers::unordered_map<Fiber*, TimePoint> fibers;
   };
 
   // TODO: Implement a queue that recycles elements to reduce number of
   // heap allocations.
-  using TaskQueue = std::deque<Task>;
-  using FiberQueue = std::deque<Fiber*>;
-  using FiberSet = std::unordered_set<Fiber*>;
+  using TaskQueue = containers::deque<Task>;
+  using FiberQueue = containers::deque<Fiber*>;
+  using FiberSet = containers::unordered_set<Fiber*>;
 
   // Workers executes Tasks on a single thread.
   // Once a task is started, it may yield to other tasks on the same Worker.
@@ -434,6 +434,8 @@
 
     // Work holds tasks and fibers that are enqueued on the Worker.
     struct Work {
+      inline Work(Allocator*);
+
       std::atomic<uint64_t> num = {0};  // tasks.size() + fibers.size()
       GUARDED_BY(mutex) uint64_t numBlockedFibers = 0;
       GUARDED_BY(mutex) TaskQueue tasks;
@@ -471,7 +473,7 @@
     Thread thread;
     Work work;
     FiberSet idleFibers;  // Fibers that have completed which can be reused.
-    std::vector<Allocator::unique_ptr<Fiber>>
+    containers::vector<Allocator::unique_ptr<Fiber>, 16>
         workerFibers;  // All fibers created by this worker.
     FastRnd rng;
     bool shutdown = false;
@@ -503,8 +505,11 @@
   std::array<Worker*, MaxWorkerThreads> workerThreads;
 
   struct SingleThreadedWorkers {
+    inline SingleThreadedWorkers(Allocator*);
+
     using WorkerByTid =
-        std::unordered_map<std::thread::id, Allocator::unique_ptr<Worker>>;
+        containers::unordered_map<std::thread::id,
+                                  Allocator::unique_ptr<Worker>>;
     marl::mutex mutex;
     GUARDED_BY(mutex) std::condition_variable unbind;
     GUARDED_BY(mutex) WorkerByTid byTid;
@@ -531,6 +536,12 @@
   return *this;
 }
 
+Scheduler::Config& Scheduler::Config::setWorkerThreadAffinityPolicy(
+    const std::shared_ptr<Thread::Affinity::Policy>& policy) {
+  workerThread.affinityPolicy = policy;
+  return *this;
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // Scheduler::Fiber
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/include/marl/thread.h b/include/marl/thread.h
index f18bddd..1046922 100644
--- a/include/marl/thread.h
+++ b/include/marl/thread.h
@@ -17,26 +17,109 @@
 
 #include <functional>
 
+#include "containers.h"
+
 namespace marl {
 
 // Thread provides an OS abstraction for threads of execution.
-// Thread is used by marl instead of std::thread as Windows does not naturally
-// scale beyond 64 logical threads on a single CPU, unless you use the Win32
-// API.
-// Thread alsocontains static methods that abstract OS-specific thread / cpu
-// queries and control.
 class Thread {
  public:
   using Func = std::function<void()>;
 
+  // Core identifies a logical processor unit.
+  // How a core is identified varies by platform.
+  struct Core {
+    struct Windows {
+      uint8_t group;  // Group number
+      uint8_t index;  // Core within the processor group
+    };
+    struct Pthread {
+      uint16_t index;  // Core number
+    };
+    union {
+      Windows windows;
+      Pthread pthread;
+    };
+
+    // Comparison functions
+    inline bool operator==(const Core&) const;
+    inline bool operator<(const Core&) const;
+  };
+
+  // Affinity holds the affinity mask for a thread - a description of what cores
+  // the thread is allowed to run on.
+  struct Affinity {
+    // supported is true if marl supports controlling thread affinity for this
+    // platform.
+#if defined(_WIN32) || defined(__linux__) || defined(__FreeBSD__)
+    static constexpr bool supported = true;
+#else
+    static constexpr bool supported = false;
+#endif
+
+    // Policy is an interface that provides a get() method for returning an
+    // Affinity for the given thread by id.
+    class Policy {
+     public:
+      virtual ~Policy(){};
+
+      // anyOf() returns a Policy that returns an Affinity for a number of
+      // available cores in affinity.
+      //
+      // Windows requires that each thread is only associated with a
+      // single affinity group, so the Policy's returned affinity will contain
+      // cores all from the same group.
+      static std::shared_ptr<Policy> anyOf(
+          Affinity&& affinity,
+          Allocator* allocator = Allocator::Default);
+
+      // oneOf() returns a Policy that returns an affinity with a single enabled
+      // core from affinity. The single enabled core in the Policy's returned
+      // affinity is:
+      //      affinity[threadId % affinity.count()]
+      static std::shared_ptr<Policy> oneOf(
+          Affinity&& affinity,
+          Allocator* allocator = Allocator::Default);
+
+      // get() returns the thread Affinity for the for the given thread by id.
+      virtual Affinity get(uint32_t threadId, Allocator* allocator) const = 0;
+    };
+
+    Affinity(Allocator*);
+    Affinity(Affinity&&);
+    Affinity(const Affinity&, Allocator* allocator);
+
+    // all() returns an Affinity with all the cores available to the process.
+    static Affinity all(Allocator* allocator = Allocator::Default);
+
+    Affinity(std::initializer_list<Core>, Allocator* allocator);
+
+    // count() returns the number of enabled cores in the affinity.
+    size_t count() const;
+
+    // operator[] returns the i'th enabled core from this affinity.
+    Core operator[](size_t index) const;
+
+    // add() adds the cores from the given affinity to this affinity.
+    // This affinity is returned to allow for fluent calls.
+    Affinity& add(const Affinity&);
+
+    // remove() removes the cores from the given affinity from this affinity.
+    // This affinity is returned to allow for fluent calls.
+    Affinity& remove(const Affinity&);
+
+   private:
+    Affinity(const Affinity&) = delete;
+
+    containers::vector<Core, 32> cores;
+  };
+
   Thread() = default;
   Thread(Thread&&);
   Thread& operator=(Thread&&);
 
-  // Start a new thread that calls func.
-  // logicalCpuHint is a hint to run the thread on the specified logical CPU.
-  // logicalCpuHint may be entirely ignored.
-  Thread(unsigned int logicalCpuHint, const Func& func);
+  // Start a new thread using the given affinity that calls func.
+  Thread(Affinity&& affinity, Func&& func);
 
   ~Thread();
 
@@ -59,6 +142,18 @@
   Impl* impl = nullptr;
 };
 
+////////////////////////////////////////////////////////////////////////////////
+// Thread::Core
+////////////////////////////////////////////////////////////////////////////////
+// Comparison functions
+bool Thread::Core::operator==(const Core& other) const {
+  return pthread.index == other.pthread.index;
+}
+
+bool Thread::Core::operator<(const Core& other) const {
+  return pthread.index < other.pthread.index;
+}
+
 }  // namespace marl
 
 #endif  // marl_thread_h
diff --git a/kokoro/macos/presubmit.sh b/kokoro/macos/presubmit.sh
index f2009c8..60300c4 100755
--- a/kokoro/macos/presubmit.sh
+++ b/kokoro/macos/presubmit.sh
@@ -13,7 +13,12 @@
     mkdir build
     cd build
 
-    cmake .. -DMARL_BUILD_EXAMPLES=1 -DMARL_BUILD_TESTS=1 -DMARL_BUILD_BENCHMARKS=1 -DMARL_WARNINGS_AS_ERRORS=1
+    cmake .. -DMARL_BUILD_EXAMPLES=1 \
+             -DMARL_BUILD_TESTS=1 \
+             -DMARL_BUILD_BENCHMARKS=1 \
+             -DMARL_WARNINGS_AS_ERRORS=1 \
+             -DMARL_DEBUG_ENABLED=1
+
     make -j$(sysctl -n hw.logicalcpu)
 
     ./marl-unittests
diff --git a/kokoro/ubuntu/presubmit-docker.sh b/kokoro/ubuntu/presubmit-docker.sh
index 262ec5f..1439bd4 100755
--- a/kokoro/ubuntu/presubmit-docker.sh
+++ b/kokoro/ubuntu/presubmit-docker.sh
@@ -39,7 +39,8 @@
             -DMARL_BUILD_EXAMPLES=1 \
             -DMARL_BUILD_TESTS=1 \
             -DMARL_BUILD_BENCHMARKS=1 \
-            -DMARL_WARNINGS_AS_ERRORS=1
+            -DMARL_WARNINGS_AS_ERRORS=1 \
+            -DMARL_DEBUG_ENABLED=1
 
     make --jobs=$(nproc)
 
diff --git a/kokoro/windows/presubmit.bat b/kokoro/windows/presubmit.bat
index 1cb9caa..a73aa7c 100644
--- a/kokoro/windows/presubmit.bat
+++ b/kokoro/windows/presubmit.bat
@@ -20,7 +20,7 @@
 if !ERRORLEVEL! neq 0 exit !ERRORLEVEL!
 
 IF /I "%BUILD_SYSTEM%"=="cmake" (
-    cmake .. -G "%BUILD_GENERATOR%" "-DMARL_BUILD_TESTS=1" "-DMARL_BUILD_EXAMPLES=1" "-DMARL_BUILD_BENCHMARKS=1" "-DMARL_WARNINGS_AS_ERRORS=1"
+    cmake .. -G "%BUILD_GENERATOR%" "-DMARL_BUILD_TESTS=1" "-DMARL_BUILD_EXAMPLES=1" "-DMARL_BUILD_BENCHMARKS=1" "-DMARL_WARNINGS_AS_ERRORS=1" "-DMARL_DEBUG_ENABLED=1"
     if !ERRORLEVEL! neq 0 exit !ERRORLEVEL!
     %MSBUILD% /p:Configuration=%CONFIG% Marl.sln
     if !ERRORLEVEL! neq 0 exit !ERRORLEVEL!
diff --git a/src/containers_test.cpp b/src/containers_test.cpp
index 2bc981c..77723af 100644
--- a/src/containers_test.cpp
+++ b/src/containers_test.cpp
@@ -132,6 +132,21 @@
   vectorA[1] = "B";
   vectorA[2] = "C";
 
+  marl::containers::vector<std::string, 4> vectorB(vectorA, allocator);
+  ASSERT_EQ(vectorB.size(), size_t(3));
+  ASSERT_EQ(vectorB[0], "A");
+  ASSERT_EQ(vectorB[1], "B");
+  ASSERT_EQ(vectorB[2], "C");
+}
+
+TEST_F(ContainersVectorTest, CopyConstructDifferentBaseCapacity) {
+  marl::containers::vector<std::string, 4> vectorA(allocator);
+
+  vectorA.resize(3);
+  vectorA[0] = "A";
+  vectorA[1] = "B";
+  vectorA[2] = "C";
+
   marl::containers::vector<std::string, 2> vectorB(vectorA, allocator);
   ASSERT_EQ(vectorB.size(), size_t(3));
   ASSERT_EQ(vectorB[0], "A");
@@ -139,6 +154,38 @@
   ASSERT_EQ(vectorB[2], "C");
 }
 
+TEST_F(ContainersVectorTest, CopyAssignment) {
+  marl::containers::vector<std::string, 4> vectorA(allocator);
+
+  vectorA.resize(3);
+  vectorA[0] = "A";
+  vectorA[1] = "B";
+  vectorA[2] = "C";
+
+  marl::containers::vector<std::string, 4> vectorB(allocator);
+  vectorB = vectorA;
+  ASSERT_EQ(vectorB.size(), size_t(3));
+  ASSERT_EQ(vectorB[0], "A");
+  ASSERT_EQ(vectorB[1], "B");
+  ASSERT_EQ(vectorB[2], "C");
+}
+
+TEST_F(ContainersVectorTest, CopyAssignmentDifferentBaseCapacity) {
+  marl::containers::vector<std::string, 4> vectorA(allocator);
+
+  vectorA.resize(3);
+  vectorA[0] = "A";
+  vectorA[1] = "B";
+  vectorA[2] = "C";
+
+  marl::containers::vector<std::string, 2> vectorB(allocator);
+  vectorB = vectorA;
+  ASSERT_EQ(vectorB.size(), size_t(3));
+  ASSERT_EQ(vectorB[0], "A");
+  ASSERT_EQ(vectorB[1], "B");
+  ASSERT_EQ(vectorB[2], "C");
+}
+
 TEST_F(ContainersVectorTest, MoveConstruct) {
   marl::containers::vector<std::string, 4> vectorA(allocator);
 
diff --git a/src/event_bench.cpp b/src/event_bench.cpp
index dc8f569..50d958c 100644
--- a/src/event_bench.cpp
+++ b/src/event_bench.cpp
@@ -14,20 +14,22 @@
 
 #include "marl_bench.h"
 
+#include "marl/containers.h"
 #include "marl/event.h"
 
 #include "benchmark/benchmark.h"
 
-#include <vector>
-
 BENCHMARK_DEFINE_F(Schedule, Event)(benchmark::State& state) {
   run(state, [&](int numTasks) {
     for (auto _ : state) {
-      std::vector<marl::Event> events(numTasks + 1);
+      marl::containers::vector<marl::Event, 1> events;
+      events.resize(numTasks + 1);
       for (auto i = 0; i < numTasks; i++) {
+        marl::Event prev = events[i];
+        marl::Event next = events[i + 1];
         marl::schedule([=] {
-          events[i].wait();
-          events[i + 1].signal();
+          prev.wait();
+          next.signal();
         });
       }
       events.front().signal();
@@ -69,4 +71,4 @@
     }
   });
 }
-BENCHMARK_REGISTER_F(Schedule, EventBaton)->Apply(Schedule::args<1000000>);
+BENCHMARK_REGISTER_F(Schedule, EventBaton)->Apply(Schedule::args<262144>);
diff --git a/src/event_test.cpp b/src/event_test.cpp
index 0250329..bbc9102 100644
--- a/src/event_test.cpp
+++ b/src/event_test.cpp
@@ -18,6 +18,8 @@
 
 #include "marl_test.h"
 
+#include <array>
+
 namespace std {
 namespace chrono {
 template <typename Rep, typename Period>
@@ -28,9 +30,7 @@
 }  // namespace std
 
 TEST_P(WithBoundScheduler, EventIsSignalled) {
-  std::vector<marl::Event::Mode> modes = {marl::Event::Mode::Manual,
-                                          marl::Event::Mode::Auto};
-  for (auto mode : modes) {
+  for (auto mode : {marl::Event::Mode::Manual, marl::Event::Mode::Auto}) {
     auto event = marl::Event(mode);
     ASSERT_EQ(event.isSignalled(), false);
     event.signal();
@@ -99,9 +99,7 @@
 }
 
 TEST_P(WithBoundScheduler, EventSequence) {
-  std::vector<marl::Event::Mode> modes = {marl::Event::Mode::Manual,
-                                          marl::Event::Mode::Auto};
-  for (auto mode : modes) {
+  for (auto mode : {marl::Event::Mode::Manual, marl::Event::Mode::Auto}) {
     std::string sequence;
     auto eventA = marl::Event(mode);
     auto eventB = marl::Event(mode);
@@ -216,7 +214,7 @@
 
 TEST_P(WithBoundScheduler, EventAny) {
   for (int i = 0; i < 3; i++) {
-    std::vector<marl::Event> events = {
+    std::array<marl::Event, 3> events = {
         marl::Event(marl::Event::Mode::Auto),
         marl::Event(marl::Event::Mode::Auto),
         marl::Event(marl::Event::Mode::Auto),
diff --git a/src/marl_bench.cpp b/src/marl_bench.cpp
index f258daf..f3ea565 100644
--- a/src/marl_bench.cpp
+++ b/src/marl_bench.cpp
@@ -14,7 +14,30 @@
 
 #include "marl_bench.h"
 
-BENCHMARK_MAIN();
+#include "marl/sanitizers.h"
+
+int main(int argc, char** argv) {
+#if ADDRESS_SANITIZER_ENABLED
+  printf(
+      "***WARNING*** Marl built with address sanitizer enabled. "
+      "Timings will be affected\n");
+#endif
+#if MEMORY_SANITIZER_ENABLED
+  printf(
+      "***WARNING*** Marl built with memory sanitizer enabled. "
+      "Timings will be affected\n");
+#endif
+#if THREAD_SANITIZER_ENABLED
+  printf(
+      "***WARNING*** Marl built with thread sanitizer enabled. "
+      "Timings will be affected\n");
+#endif
+  ::benchmark::Initialize(&argc, argv);
+  if (::benchmark::ReportUnrecognizedArguments(argc, argv))
+    return 1;
+  ::benchmark::RunSpecifiedBenchmarks();
+  return 0;
+}
 
 uint32_t Schedule::doSomeWork(uint32_t x) {
   uint32_t q = x;
diff --git a/src/marl_bench.h b/src/marl_bench.h
index e4c2cee..a3ed216 100644
--- a/src/marl_bench.h
+++ b/src/marl_bench.h
@@ -17,35 +17,66 @@
 
 #include "benchmark/benchmark.h"
 
+// Define MARL_FULL_BENCHMARK to 1 if you want to run benchmarks for every
+// available logical CPU core.
+#ifndef MARL_FULL_BENCHMARK
+#define MARL_FULL_BENCHMARK 0
+#endif
+
 class Schedule : public benchmark::Fixture {
  public:
   void SetUp(const ::benchmark::State&) {}
 
   void TearDown(const ::benchmark::State&) {}
 
-  // run() creates a scheduler, sets the number of worker threads from the
-  // benchmark arguments, calls f, then unbinds and destructs the scheduler.
+  // run() creates a scheduler using the config cfg, sets the number of worker
+  // threads from the benchmark arguments, calls f, then unbinds and destructs
+  // the scheduler.
   // F must be a function of the signature: void(int numTasks)
   template <typename F>
-  void run(const ::benchmark::State& state, F&& f) {
-    marl::Scheduler scheduler;
-    scheduler.setWorkerThreadCount(numThreads(state));
+  void run(const ::benchmark::State& state,
+           marl::Scheduler::Config cfg,
+           F&& f) {
+    cfg.setWorkerThreadCount(numThreads(state));
+
+    marl::Scheduler scheduler(cfg);
     scheduler.bind();
     f(numTasks(state));
     scheduler.unbind();
   }
 
-  // args() sets up the benchmark to run from [1 .. NumTasks] tasks (in 8^n
-  // steps) across 0 worker threads to numLogicalCPUs.
+  // run() creates a scheduler, sets the number of worker threads from the
+  // benchmark arguments, calls f, then unbinds and destructs the scheduler.
+  // F must be a function of the signature: void(int numTasks)
+  template <typename F>
+  void run(const ::benchmark::State& state, F&& f) {
+    run(state, marl::Scheduler::Config{}, f);
+  }
+
+  // args() sets up the benchmark to run a number of tasks over a number of
+  // threads.
+  // If MARL_FULL_BENCHMARK is enabled, then NumTasks tasks will be run
+  // across from 0 to numLogicalCPUs worker threads.
+  // If MARL_FULL_BENCHMARK is not enabled, then NumTasks tasks will be run
+  // across [0 .. numLogicalCPUs] worker threads in 2^n steps.
   template <int NumTasks = 0x40000>
   static void args(benchmark::internal::Benchmark* b) {
     b->ArgNames({"tasks", "threads"});
-    for (unsigned int tasks = 1U; tasks <= NumTasks; tasks *= 8) {
-      for (unsigned int threads = 0U; threads <= marl::Thread::numLogicalCPUs();
-           ++threads) {
-        b->Args({tasks, threads});
-      }
+    b->Args({NumTasks, 0});
+    auto numLogicalCPUs = marl::Thread::numLogicalCPUs();
+#if MARL_FULL_BENCHMARK
+    for (unsigned int threads = 1U; threads <= numLogicalCPUs; threads++) {
+      b->Args({NumTasks, threads});
     }
+#else
+    for (unsigned int threads = 1U; threads <= numLogicalCPUs; threads *= 2) {
+      b->Args({NumTasks, threads});
+    }
+    if ((numLogicalCPUs & (numLogicalCPUs - 1)) != 0) {
+      // numLogicalCPUs is not a power-of-two. Also test with numLogicalCPUs.
+      b->Args({NumTasks, numLogicalCPUs});
+    }
+#endif
   }
 
   // numThreads() return the number of threads in the benchmark run from the
diff --git a/src/marl_test.h b/src/marl_test.h
index b636b61..5c71868 100644
--- a/src/marl_test.h
+++ b/src/marl_test.h
@@ -54,9 +54,12 @@
 
     auto& params = GetParam();
 
-    auto scheduler = new marl::Scheduler(allocator);
+    marl::Scheduler::Config cfg;
+    cfg.setAllocator(allocator);
+    cfg.setWorkerThreadCount(params.numWorkerThreads);
+
+    auto scheduler = new marl::Scheduler(cfg);
     scheduler->bind();
-    scheduler->setWorkerThreadCount(params.numWorkerThreads);
   }
 
   void TearDown() override {
diff --git a/src/memory.cpp b/src/memory.cpp
index 142d76b..b8cbaa2 100644
--- a/src/memory.cpp
+++ b/src/memory.cpp
@@ -19,7 +19,7 @@
 
 #include <cstring>
 
-#if defined(__linux__) || defined(__FreeBSD__) ||  defined(__APPLE__)
+#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
 #include <sys/mman.h>
 #include <unistd.h>
 namespace {
diff --git a/src/memory_test.cpp b/src/memory_test.cpp
index 8aa6334..f760934 100644
--- a/src/memory_test.cpp
+++ b/src/memory_test.cpp
@@ -22,14 +22,11 @@
 };
 
 TEST_F(AllocatorTest, AlignedAllocate) {
-  std::vector<bool> guards = {false, true};
-  std::vector<size_t> sizes = {1,   2,   3,   4,   5,   7,   8,   14,  16,  17,
-                               31,  34,  50,  63,  64,  65,  100, 127, 128, 129,
-                               200, 255, 256, 257, 500, 511, 512, 513};
-  std::vector<size_t> alignments = {1, 2, 4, 8, 16, 32, 64, 128};
-  for (auto useGuards : guards) {
-    for (auto alignment : alignments) {
-      for (auto size : sizes) {
+  for (auto useGuards : {false, true}) {
+    for (auto alignment : {1, 2, 4, 8, 16, 32, 64, 128}) {
+      for (auto size : {1,   2,   3,   4,   5,   7,   8,   14,  16,  17,
+                        31,  34,  50,  63,  64,  65,  100, 127, 128, 129,
+                        200, 255, 256, 257, 500, 511, 512, 513}) {
         marl::Allocation::Request request;
         request.alignment = alignment;
         request.size = size;
diff --git a/src/scheduler.cpp b/src/scheduler.cpp
index 384f67d..66440eb 100644
--- a/src/scheduler.cpp
+++ b/src/scheduler.cpp
@@ -59,21 +59,6 @@
 }
 #endif
 
-template <typename T>
-inline T take(std::deque<T>& queue) {
-  auto out = std::move(queue.front());
-  queue.pop_front();
-  return out;
-}
-
-template <typename T>
-inline T take(std::unordered_set<T>& set) {
-  auto it = set.begin();
-  auto out = std::move(*it);
-  set.erase(it);
-  return out;
-}
-
 inline void nop() {
 #if defined(_WIN32)
   __nop();
@@ -127,7 +112,12 @@
   bound = nullptr;
 }
 
-Scheduler::Scheduler(const Config& config) : cfg(config), workerThreads{} {
+Scheduler::Scheduler(const Config& config)
+    : cfg(config), workerThreads{}, singleThreadedWorkers(config.allocator) {
+  if (cfg.workerThread.count > 0 && !cfg.workerThread.affinityPolicy) {
+    cfg.workerThread.affinityPolicy = Thread::Affinity::Policy::anyOf(
+        Thread::Affinity::all(cfg.allocator), cfg.allocator);
+  }
   for (size_t i = 0; i < spinningWorkers.size(); i++) {
     spinningWorkers[i] = -1;
   }
@@ -162,7 +152,7 @@
 
 #if MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
 Scheduler::Scheduler(Allocator* allocator /* = Allocator::Default */)
-    : workerThreads{} {
+    : workerThreads{}, singleThreadedWorkers(allocator) {
   cfg.allocator = allocator;
   for (size_t i = 0; i < spinningWorkers.size(); i++) {
     spinningWorkers[i] = -1;
@@ -338,6 +328,9 @@
 ////////////////////////////////////////////////////////////////////////////////
 // Scheduler::WaitingFibers
 ////////////////////////////////////////////////////////////////////////////////
+Scheduler::WaitingFibers::WaitingFibers(Allocator* allocator)
+    : timeouts(allocator), fibers(allocator) {}
+
 Scheduler::WaitingFibers::operator bool() const {
   return !fibers.empty();
 }
@@ -399,12 +392,19 @@
 thread_local Scheduler::Worker* Scheduler::Worker::current = nullptr;
 
 Scheduler::Worker::Worker(Scheduler* scheduler, Mode mode, uint32_t id)
-    : id(id), mode(mode), scheduler(scheduler) {}
+    : id(id),
+      mode(mode),
+      scheduler(scheduler),
+      work(scheduler->cfg.allocator),
+      idleFibers(scheduler->cfg.allocator) {}
 
 void Scheduler::Worker::start() {
   switch (mode) {
-    case Mode::MultiThreaded:
-      thread = Thread(id, [=] {
+    case Mode::MultiThreaded: {
+      auto allocator = scheduler->cfg.allocator;
+      auto& affinityPolicy = scheduler->cfg.workerThread.affinityPolicy;
+      auto affinity = affinityPolicy->get(id, allocator);
+      thread = Thread(std::move(affinity), [=] {
         Thread::setName("Thread<%.2d>", int(id));
 
         if (auto const& initFunc = scheduler->cfg.workerThread.initializer) {
@@ -423,13 +423,13 @@
         Worker::current = nullptr;
       });
       break;
-
-    case Mode::SingleThreaded:
+    }
+    case Mode::SingleThreaded: {
       Worker::current = this;
       mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0);
       currentFiber = mainFiber.get();
       break;
-
+    }
     default:
       MARL_ASSERT(false, "Unknown mode: %d", int(mode));
   }
@@ -517,12 +517,12 @@
   if (!work.fibers.empty()) {
     // There's another fiber that has become unblocked, resume that.
     work.num--;
-    auto to = take(work.fibers);
+    auto to = containers::take(work.fibers);
     ASSERT_FIBER_STATE(to, Fiber::State::Queued);
     switchToFiber(to);
   } else if (!idleFibers.empty()) {
     // There's an old fiber we can reuse, resume that.
-    auto to = take(idleFibers);
+    auto to = containers::take(idleFibers);
     ASSERT_FIBER_STATE(to, Fiber::State::Idle);
     switchToFiber(to);
   } else {
@@ -597,7 +597,7 @@
     return false;
   }
   work.num--;
-  out = take(work.tasks);
+  out = containers::take(work.tasks);
   work.mutex.unlock();
   return true;
 }
@@ -714,7 +714,7 @@
 
     while (!work.fibers.empty()) {
       work.num--;
-      auto fiber = take(work.fibers);
+      auto fiber = containers::take(work.fibers);
       // Sanity checks,
       MARL_ASSERT(idleFibers.count(fiber) == 0, "dequeued fiber is idle");
       MARL_ASSERT(fiber != currentFiber, "dequeued fiber is currently running");
@@ -731,7 +731,7 @@
 
     if (!work.tasks.empty()) {
       work.num--;
-      auto task = take(work.tasks);
+      auto task = containers::take(work.tasks);
       work.mutex.unlock();
 
       // Run the task.
@@ -752,7 +752,7 @@
   auto fiber = Fiber::create(scheduler->cfg.allocator, fiberId, FiberStackSize,
                              [&]() REQUIRES(work.mutex) { run(); });
   auto ptr = fiber.get();
-  workerFibers.push_back(std::move(fiber));
+  workerFibers.emplace_back(std::move(fiber));
   return ptr;
 }
 
@@ -768,6 +768,9 @@
 ////////////////////////////////////////////////////////////////////////////////
 // Scheduler::Worker::Work
 ////////////////////////////////////////////////////////////////////////////////
+Scheduler::Worker::Work::Work(Allocator* allocator)
+    : tasks(allocator), fibers(allocator), waiting(allocator) {}
+
 template <typename F>
 void Scheduler::Worker::Work::wait(F&& f) {
   notifyAdded = true;
@@ -779,4 +782,10 @@
   notifyAdded = false;
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// Scheduler::Worker::Work
+////////////////////////////////////////////////////////////////////////////////
+Scheduler::SingleThreadedWorkers::SingleThreadedWorkers(Allocator* allocator)
+    : byTid(allocator) {}
+
 }  // namespace marl
diff --git a/src/scheduler_bench.cpp b/src/scheduler_bench.cpp
index 3548e54..d009713 100644
--- a/src/scheduler_bench.cpp
+++ b/src/scheduler_bench.cpp
@@ -46,3 +46,25 @@
   });
 }
 BENCHMARK_REGISTER_F(Schedule, SomeWork)->Apply(Schedule::args);
+
+BENCHMARK_DEFINE_F(Schedule, SomeWorkWorkerAffinityOneOf)
+(benchmark::State& state) {
+  marl::Scheduler::Config cfg;
+  cfg.setWorkerThreadAffinityPolicy(
+      marl::Thread::Affinity::Policy::oneOf(marl::Thread::Affinity::all()));
+  run(state, cfg, [&](int numTasks) {
+    for (auto _ : state) {
+      marl::WaitGroup wg;
+      wg.add(numTasks);
+      for (auto i = 0; i < numTasks; i++) {
+        marl::schedule([=] {
+          benchmark::DoNotOptimize(doSomeWork(i));
+          wg.done();
+        });
+      }
+      wg.wait();
+    }
+  });
+}
+BENCHMARK_REGISTER_F(Schedule, SomeWorkWorkerAffinityOneOf)
+    ->Apply(Schedule::args);
diff --git a/src/scheduler_test.cpp b/src/scheduler_test.cpp
index b07417f..3bf8849 100644
--- a/src/scheduler_test.cpp
+++ b/src/scheduler_test.cpp
@@ -14,32 +14,38 @@
 
 #include "marl_test.h"
 
+#include "marl/containers.h"
 #include "marl/defer.h"
 #include "marl/event.h"
 #include "marl/waitgroup.h"
 
 #include <atomic>
-#include <unordered_set>
 
 TEST_F(WithoutBoundScheduler, SchedulerConstructAndDestruct) {
-  auto scheduler = new marl::Scheduler();
-  delete scheduler;
+  auto scheduler = std::unique_ptr<marl::Scheduler>(
+      new marl::Scheduler(marl::Scheduler::Config()));
 }
 
 TEST_F(WithoutBoundScheduler, SchedulerBindGetUnbind) {
-  auto scheduler = new marl::Scheduler();
+  auto scheduler = std::unique_ptr<marl::Scheduler>(
+      new marl::Scheduler(marl::Scheduler::Config()));
   scheduler->bind();
   auto got = marl::Scheduler::get();
-  ASSERT_EQ(scheduler, got);
+  ASSERT_EQ(scheduler.get(), got);
   scheduler->unbind();
   got = marl::Scheduler::get();
   ASSERT_EQ(got, nullptr);
-  delete scheduler;
 }
 
-TEST_P(WithBoundScheduler, SetAndGetWorkerThreadCount) {
-  ASSERT_EQ(marl::Scheduler::get()->getWorkerThreadCount(),
-            GetParam().numWorkerThreads);
+TEST_F(WithoutBoundScheduler, CheckConfig) {
+  marl::Scheduler::Config cfg;
+  cfg.setAllocator(allocator).setWorkerThreadCount(10);
+
+  auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg));
+
+  auto gotCfg = scheduler->config();
+  ASSERT_EQ(gotCfg.allocator, allocator);
+  ASSERT_EQ(gotCfg.workerThread.count, 10);
 }
 
 TEST_P(WithBoundScheduler, DestructWithPendingTasks) {
@@ -56,7 +62,7 @@
   ASSERT_EQ(counter.load(), 1000);
 
   // Rebind a new scheduler so WithBoundScheduler::TearDown() is happy.
-  (new marl::Scheduler())->bind();
+  (new marl::Scheduler(marl::Scheduler::Config()))->bind();
 }
 
 TEST_P(WithBoundScheduler, DestructWithPendingFibers) {
@@ -85,7 +91,7 @@
   ASSERT_EQ(counter.load(), 1000);
 
   // Rebind a new scheduler so WithBoundScheduler::TearDown() is happy.
-  (new marl::Scheduler())->bind();
+  (new marl::Scheduler(marl::Scheduler::Config()))->bind();
 }
 
 TEST_P(WithBoundScheduler, FibersResumeOnSameThread) {
@@ -114,9 +120,9 @@
   marl::WaitGroup fence(1);
   marl::WaitGroup wg(num_threads);
 
-  std::vector<std::thread> threads;
+  marl::containers::vector<std::thread, 32> threads;
   for (int i = 0; i < num_threads; i++) {
-    threads.push_back(std::thread([=] {
+    threads.emplace_back(std::thread([=] {
       scheduler->bind();
       defer(scheduler->unbind());
 
@@ -137,12 +143,15 @@
 }
 
 TEST_F(WithoutBoundScheduler, TasksOnlyScheduledOnWorkerThreads) {
-  auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler());
+  marl::Scheduler::Config cfg;
+  cfg.setWorkerThreadCount(8);
+
+  auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg));
   scheduler->bind();
   defer(scheduler->unbind());
-  scheduler->setWorkerThreadCount(8);
+
   std::mutex mutex;
-  std::unordered_set<std::thread::id> threads;
+  marl::containers::unordered_set<std::thread::id> threads(allocator);
   marl::WaitGroup wg;
   for (int i = 0; i < 10000; i++) {
     wg.add(1);
@@ -161,8 +170,9 @@
 // Test that a marl::Scheduler *with dedicated worker threads* can be used
 // without first binding to the scheduling thread.
 TEST_F(WithoutBoundScheduler, ScheduleMTWWithNoBind) {
-  auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler());
-  scheduler->setWorkerThreadCount(8);
+  marl::Scheduler::Config cfg;
+  cfg.setWorkerThreadCount(8);
+  auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg));
 
   marl::WaitGroup wg;
   for (int i = 0; i < 100; i++) {
@@ -191,7 +201,8 @@
 // Test that a marl::Scheduler *without dedicated worker threads* cannot be used
 // without first binding to the scheduling thread.
 TEST_F(WithoutBoundScheduler, ScheduleSTWWithNoBind) {
-  auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler());
+  marl::Scheduler::Config cfg;
+  auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg));
 
 #if MARL_DEBUG_ENABLED && GTEST_HAS_DEATH_TEST
   EXPECT_DEATH(scheduler->enqueue(marl::Task([] {})),
diff --git a/src/thread.cpp b/src/thread.cpp
index c2ac40d..8d46963 100644
--- a/src/thread.cpp
+++ b/src/thread.cpp
@@ -18,14 +18,18 @@
 #include "marl/defer.h"
 #include "marl/trace.h"
 
+#include <algorithm>  // std::sort
+
 #include <cstdarg>
 #include <cstdio>
 
 #if defined(_WIN32)
 #define WIN32_LEAN_AND_MEAN 1
 #include <windows.h>
+#include <array>
 #include <cstdlib>  // mbstowcs
-#include <vector>
+#include <limits>   // std::numeric_limits
+#undef max
 #elif defined(__APPLE__)
 #include <mach/thread_act.h>
 #include <pthread.h>
@@ -42,10 +46,27 @@
 #include <thread>
 #endif
 
+namespace {
+
+struct CoreHasher {
+  inline uint64_t operator()(const marl::Thread::Core& core) const {
+    return core.pthread.index;
+  }
+};
+
+}  // anonymous namespace
+
 namespace marl {
 
 #if defined(_WIN32)
+static constexpr size_t MaxCoreCount =
+    std::numeric_limits<decltype(Thread::Core::windows.index)>::max() + 1ULL;
+static constexpr size_t MaxGroupCount =
+    std::numeric_limits<decltype(Thread::Core::windows.group)>::max() + 1ULL;
+static_assert(sizeof(KAFFINITY) * 8ULL <= MaxCoreCount,
+              "Thread::Core::windows.index is too small");
 
+namespace {
 #define CHECK_WIN32(expr)                                    \
   do {                                                       \
     auto res = expr;                                         \
@@ -54,16 +75,19 @@
                 (int)GetLastError());                        \
   } while (false)
 
-namespace {
-
 struct ProcessorGroup {
   unsigned int count;  // number of logical processors in this group.
   KAFFINITY affinity;  // affinity mask.
 };
 
-const std::vector<ProcessorGroup>& getProcessorGroups() {
-  static std::vector<ProcessorGroup> groups = [] {
-    std::vector<ProcessorGroup> out;
+struct ProcessorGroups {
+  std::array<ProcessorGroup, MaxGroupCount> groups;
+  size_t count;
+};
+
+const ProcessorGroups& getProcessorGroups() {
+  static ProcessorGroups groups = [] {
+    ProcessorGroups out = {};
     SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX info[32] = {};
     DWORD size = sizeof(info);
     CHECK_WIN32(GetLogicalProcessorInformationEx(RelationGroup, info, &size));
@@ -73,8 +97,9 @@
         auto groupCount = info[i].Group.ActiveGroupCount;
         for (WORD groupIdx = 0; groupIdx < groupCount; groupIdx++) {
           auto const& groupInfo = info[i].Group.GroupInfo[groupIdx];
-          out.emplace_back(ProcessorGroup{groupInfo.ActiveProcessorCount,
-                                          groupInfo.ActiveProcessorMask});
+          out.groups[out.count++] = ProcessorGroup{
+              groupInfo.ActiveProcessorCount, groupInfo.ActiveProcessorMask};
+          MARL_ASSERT(out.count <= MaxGroupCount, "Group index overflow");
         }
       }
     }
@@ -82,38 +107,169 @@
   }();
   return groups;
 }
+}  // namespace
+#endif  // defined(_WIN32)
 
-bool getGroupAffinity(unsigned int index, GROUP_AFFINITY* groupAffinity) {
-  auto& groups = getProcessorGroups();
-  for (size_t groupIdx = 0; groupIdx < groups.size(); groupIdx++) {
-    auto& group = groups[groupIdx];
-    if (index < group.count) {
-      for (int i = 0; i < sizeof(group.affinity) * 8; i++) {
-        if (group.affinity & (1ULL << i)) {
-          if (index == 0) {
-            groupAffinity->Group = static_cast<WORD>(groupIdx);
-            // Use the whole group's affinity, as the OS is then able to shuffle
-            // threads around based on external demands. Pinning these to a
-            // single core can cause up to 20% performance loss in benchmarking.
-            groupAffinity->Mask = group.affinity;
-            return true;
-          }
-          index--;
-        }
-      }
-      return false;
-    } else {
-      index -= group.count;
-    }
+////////////////////////////////////////////////////////////////////////////////
+// Thread::Affinty
+////////////////////////////////////////////////////////////////////////////////
+
+Thread::Affinity::Affinity(Allocator* allocator) : cores(allocator) {}
+Thread::Affinity::Affinity(Affinity&& other) : cores(std::move(other.cores)) {}
+Thread::Affinity::Affinity(const Affinity& other, Allocator* allocator)
+    : cores(other.cores, allocator) {}
+
+Thread::Affinity::Affinity(std::initializer_list<Core> list,
+                           Allocator* allocator)
+    : cores(allocator) {
+  cores.reserve(list.size());
+  for (auto core : list) {
+    cores.push_back(core);
   }
-  return false;
 }
 
-}  // namespace
+Thread::Affinity Thread::Affinity::all(
+    Allocator* allocator /* = Allocator::Default */) {
+  Thread::Affinity affinity(allocator);
+
+#if defined(_WIN32)
+  const auto& groups = getProcessorGroups();
+  for (size_t groupIdx = 0; groupIdx < groups.count; groupIdx++) {
+    const auto& group = groups.groups[groupIdx];
+    Core core;
+    core.windows.group = static_cast<decltype(Core::windows.group)>(groupIdx);
+    for (unsigned int coreIdx = 0; coreIdx < group.count; coreIdx++) {
+      if ((group.affinity >> coreIdx) & 1) {
+        core.windows.index = static_cast<decltype(core.windows.index)>(coreIdx);
+        affinity.cores.emplace_back(std::move(core));
+      }
+    }
+  }
+#elif defined(__linux__)
+  auto thread = pthread_self();
+  cpu_set_t cpuset;
+  CPU_ZERO(&cpuset);
+  if (pthread_getaffinity_np(thread, sizeof(cpu_set_t), &cpuset) == 0) {
+    int count = CPU_COUNT(&cpuset);
+    for (int i = 0; i < count; i++) {
+      Core core;
+      core.pthread.index = static_cast<uint16_t>(i);
+      affinity.cores.emplace_back(std::move(core));
+    }
+  }
+#elif defined(__FreeBSD__)
+  auto thread = pthread_self();
+  cpuset_t cpuset;
+  CPU_ZERO(&cpuset);
+  if (pthread_getaffinity_np(thread, sizeof(cpuset_t), &cpuset) == 0) {
+    int count = CPU_COUNT(&cpuset);
+    for (int i = 0; i < count; i++) {
+      Core core;
+      core.pthread.index = static_cast<uint16_t>(i);
+      affinity.cores.emplace_back(std::move(core));
+    }
+  }
+#else
+  static_assert(!supported,
+                "marl::Thread::Affinity::supported is true, but "
+                "Thread::Affinity::all() is not implemented for this platform");
+#endif
+
+  return affinity;
+}
+
+std::shared_ptr<Thread::Affinity::Policy> Thread::Affinity::Policy::anyOf(
+    Affinity&& affinity,
+    Allocator* allocator /* = Allocator::Default */) {
+  struct Policy : public Thread::Affinity::Policy {
+    Affinity affinity;
+    Policy(Affinity&& affinity) : affinity(std::move(affinity)) {}
+
+    Affinity get(uint32_t threadId, Allocator* allocator) const override {
+#if defined(_WIN32)
+      auto count = affinity.count();
+      if (count == 0) {
+        return Affinity(affinity, allocator);
+      }
+      auto group = affinity[threadId % affinity.count()].windows.group;
+      Affinity out(allocator);
+      out.cores.reserve(count);
+      for (auto core : affinity.cores) {
+        if (core.windows.group == group) {
+          out.cores.push_back(core);
+        }
+      }
+      return out;
+#else
+      return Affinity(affinity, allocator);
+#endif
+    }
+  };
+
+  return allocator->make_shared<Policy>(std::move(affinity));
+}
+
+std::shared_ptr<Thread::Affinity::Policy> Thread::Affinity::Policy::oneOf(
+    Affinity&& affinity,
+    Allocator* allocator /* = Allocator::Default */) {
+  struct Policy : public Thread::Affinity::Policy {
+    Affinity affinity;
+    Policy(Affinity&& affinity) : affinity(std::move(affinity)) {}
+
+    Affinity get(uint32_t threadId, Allocator* allocator) const override {
+      auto count = affinity.count();
+      if (count == 0) {
+        return Affinity(affinity, allocator);
+      }
+      return Affinity({affinity[threadId % affinity.count()]}, allocator);
+    }
+  };
+
+  return allocator->make_shared<Policy>(std::move(affinity));
+}
+
+size_t Thread::Affinity::count() const {
+  return cores.size();
+}
+
+Thread::Core Thread::Affinity::operator[](size_t index) const {
+  return cores[index];
+}
+
+Thread::Affinity& Thread::Affinity::add(const Thread::Affinity& other) {
+  containers::unordered_set<Core, CoreHasher> set(cores.allocator);
+  for (auto core : cores) {
+    set.emplace(core);
+  }
+  for (auto core : other.cores) {
+    if (set.count(core) == 0) {
+      cores.push_back(core);
+    }
+  }
+  std::sort(cores.begin(), cores.end());
+  return *this;
+}
+
+Thread::Affinity& Thread::Affinity::remove(const Thread::Affinity& other) {
+  containers::unordered_set<Core, CoreHasher> set(cores.allocator);
+  for (auto core : other.cores) {
+    set.emplace(core);
+  }
+  for (size_t i = 0; i < cores.size(); i++) {
+    if (set.count(cores[i]) != 0) {
+      cores[i] = cores.back();
+      cores.resize(cores.size() - 1);
+    }
+  }
+  std::sort(cores.begin(), cores.end());
+  return *this;
+}
+
+#if defined(_WIN32)
 
 class Thread::Impl {
  public:
-  Impl(const Func& func) : func(func) {}
+  Impl(Func&& func) : func(std::move(func)) {}
   static DWORD WINAPI run(void* self) {
     reinterpret_cast<Impl*>(self)->func();
     return 0;
@@ -123,7 +279,7 @@
   HANDLE handle;
 };
 
-Thread::Thread(unsigned int logicalCpu, const Func& func) {
+Thread::Thread(Affinity&& affinity, Func&& func) {
   SIZE_T size = 0;
   InitializeProcThreadAttributeList(nullptr, 1, 0, &size);
   MARL_ASSERT(size > 0,
@@ -136,13 +292,22 @@
   defer(DeleteProcThreadAttributeList(attributes));
 
   GROUP_AFFINITY groupAffinity = {};
-  if (getGroupAffinity(logicalCpu, &groupAffinity)) {
+
+  auto count = affinity.count();
+  if (count > 0) {
+    groupAffinity.Group = affinity[0].windows.group;
+    for (size_t i = 0; i < count; i++) {
+      auto core = affinity[i];
+      MARL_ASSERT(groupAffinity.Group == core.windows.group,
+                  "Cannot create thread that uses multiple affinity groups");
+      groupAffinity.Mask |= (1ULL << core.windows.index);
+    }
     CHECK_WIN32(UpdateProcThreadAttribute(
         attributes, 0, PROC_THREAD_ATTRIBUTE_GROUP_AFFINITY, &groupAffinity,
         sizeof(groupAffinity), nullptr, nullptr));
   }
 
-  impl = new Impl(func);
+  impl = new Impl(std::move(func));
   impl->handle = CreateRemoteThreadEx(GetCurrentProcess(), nullptr, 0,
                                       &Impl::run, impl, 0, attributes, nullptr);
 }
@@ -181,7 +346,9 @@
 
 unsigned int Thread::numLogicalCPUs() {
   unsigned int count = 0;
-  for (auto& group : getProcessorGroups()) {
+  const auto& groups = getProcessorGroups();
+  for (size_t groupIdx = 0; groupIdx < groups.count; groupIdx++) {
+    const auto& group = groups.groups[groupIdx];
     count += group.count;
   }
   return count;
@@ -191,13 +358,47 @@
 
 class Thread::Impl {
  public:
-  template <typename F>
-  Impl(F&& func) : thread(func) {}
+  Impl(Affinity&& affinity, Thread::Func&& f)
+      : affinity(std::move(affinity)), func(std::move(f)), thread([this] {
+          setAffinity();
+          func();
+        }) {}
+
+  Affinity affinity;
+  Func func;
   std::thread thread;
+
+  void setAffinity() {
+    auto count = affinity.count();
+    if (count == 0) {
+      return;
+    }
+
+#if defined(__linux__)
+    cpu_set_t cpuset;
+    CPU_ZERO(&cpuset);
+    for (size_t i = 0; i < count; i++) {
+      CPU_SET(affinity[i].pthread.index, &cpuset);
+    }
+    auto thread = pthread_self();
+    pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
+#elif defined(__FreeBSD__)
+    cpuset_t cpuset;
+    CPU_ZERO(&cpuset);
+    for (size_t i = 0; i < count; i++) {
+      CPU_SET(affinity[i].pthread.index, &cpuset);
+    }
+    auto thread = pthread_self();
+    pthread_setaffinity_np(thread, sizeof(cpuset_t), &cpuset);
+#else
+    MARL_ASSERT(!marl::Thread::Affinity::supported,
+                "Attempting to use thread affinity on a unsupported platform");
+#endif
+  }
 };
 
-Thread::Thread(unsigned int /* logicalCpu */, const Func& func)
-    : impl(new Thread::Impl(func)) {}
+Thread::Thread(Affinity&& affinity, Func&& func)
+    : impl(new Thread::Impl(std::move(affinity), std::move(func))) {}
 
 Thread::~Thread() {
   delete impl;
diff --git a/src/thread_test.cpp b/src/thread_test.cpp
new file mode 100644
index 0000000..69370f9
--- /dev/null
+++ b/src/thread_test.cpp
@@ -0,0 +1,123 @@
+// Copyright 2019 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "marl_test.h"
+
+#include "marl/thread.h"
+
+namespace {
+
+marl::Thread::Core core(int idx) {
+  marl::Thread::Core c;
+  c.pthread.index = static_cast<uint16_t>(idx);
+  return c;
+}
+
+}  // anonymous namespace
+
+TEST_F(WithoutBoundScheduler, ThreadAffinityCount) {
+  auto affinity = marl::Thread::Affinity(
+      {
+          core(10),
+          core(20),
+          core(30),
+          core(40),
+      },
+      allocator);
+  EXPECT_EQ(affinity.count(), 4U);
+}
+
+TEST_F(WithoutBoundScheduler, ThreadAdd) {
+  auto affinity = marl::Thread::Affinity(
+      {
+          core(10),
+          core(20),
+          core(30),
+          core(40),
+      },
+      allocator);
+
+  affinity
+      .add(marl::Thread::Affinity(
+          {
+              core(25),
+              core(15),
+          },
+          allocator))
+      .add(marl::Thread::Affinity({core(35)}, allocator));
+
+  EXPECT_EQ(affinity.count(), 7U);
+  EXPECT_EQ(affinity[0], core(10));
+  EXPECT_EQ(affinity[1], core(15));
+  EXPECT_EQ(affinity[2], core(20));
+  EXPECT_EQ(affinity[3], core(25));
+  EXPECT_EQ(affinity[4], core(30));
+  EXPECT_EQ(affinity[5], core(35));
+  EXPECT_EQ(affinity[6], core(40));
+}
+
+TEST_F(WithoutBoundScheduler, ThreadRemove) {
+  auto affinity = marl::Thread::Affinity(
+      {
+          core(10),
+          core(20),
+          core(30),
+          core(40),
+      },
+      allocator);
+
+  affinity
+      .remove(marl::Thread::Affinity(
+          {
+              core(25),
+              core(20),
+          },
+          allocator))
+      .remove(marl::Thread::Affinity({core(40)}, allocator));
+
+  EXPECT_EQ(affinity.count(), 2U);
+  EXPECT_EQ(affinity[0], core(10));
+  EXPECT_EQ(affinity[1], core(30));
+}
+
+TEST_F(WithoutBoundScheduler, ThreadAffinityAllCountNonzero) {
+  auto affinity = marl::Thread::Affinity::all(allocator);
+  if (marl::Thread::Affinity::supported) {
+    EXPECT_NE(affinity.count(), 0U);
+  } else {
+    EXPECT_EQ(affinity.count(), 0U);
+  }
+}
+
+TEST_F(WithoutBoundScheduler, ThreadAffinityPolicyOneOf) {
+  auto all = marl::Thread::Affinity(
+      {
+          core(10),
+          core(20),
+          core(30),
+          core(40),
+      },
+      allocator);
+
+  auto policy =
+      marl::Thread::Affinity::Policy::oneOf(std::move(all), allocator);
+  EXPECT_EQ(policy->get(0, allocator).count(), 1U);
+  EXPECT_EQ(policy->get(0, allocator)[0].pthread.index, 10);
+  EXPECT_EQ(policy->get(1, allocator).count(), 1U);
+  EXPECT_EQ(policy->get(1, allocator)[0].pthread.index, 20);
+  EXPECT_EQ(policy->get(2, allocator).count(), 1U);
+  EXPECT_EQ(policy->get(2, allocator)[0].pthread.index, 30);
+  EXPECT_EQ(policy->get(3, allocator).count(), 1U);
+  EXPECT_EQ(policy->get(3, allocator)[0].pthread.index, 40);
+}
diff --git a/src/trace.cpp b/src/trace.cpp
index 702aa74..e238523 100644
--- a/src/trace.cpp
+++ b/src/trace.cpp
@@ -28,7 +28,6 @@
 
 #include <atomic>
 #include <fstream>
-#include <unordered_set>
 
 namespace {