Merge changes I2b7adc3c,I5873dfa8
* changes:
Update Marl to ca8408f68
Squashed 'third_party/marl/' changes from 64d123947..ca8408f68
diff --git a/third_party/marl/include/marl/conditionvariable.h b/third_party/marl/include/marl/conditionvariable.h
index 2eb7094..8dfb9c7 100644
--- a/third_party/marl/include/marl/conditionvariable.h
+++ b/third_party/marl/include/marl/conditionvariable.h
@@ -15,14 +15,15 @@
#ifndef marl_condition_variable_h
#define marl_condition_variable_h
+#include "containers.h"
#include "debug.h"
#include "defer.h"
+#include "memory.h"
#include "scheduler.h"
#include <atomic>
#include <condition_variable>
#include <mutex>
-#include <unordered_set>
namespace marl {
@@ -34,7 +35,7 @@
// thread will work on other tasks until the ConditionVariable is unblocked.
class ConditionVariable {
public:
- inline ConditionVariable();
+ inline ConditionVariable(Allocator* allocator = Allocator::Default);
// notify_one() notifies and potentially unblocks one waiting fiber or thread.
inline void notify_one();
@@ -73,13 +74,15 @@
ConditionVariable& operator=(ConditionVariable&&) = delete;
std::mutex mutex;
- std::unordered_set<Scheduler::Fiber*> waiting;
+ containers::list<Scheduler::Fiber*> waiting;
std::condition_variable condition;
std::atomic<int> numWaiting = {0};
std::atomic<int> numWaitingOnCondition = {0};
};
-ConditionVariable::ConditionVariable() {}
+ConditionVariable::ConditionVariable(
+ Allocator* allocator /* = Allocator::Default */)
+ : waiting(allocator) {}
void ConditionVariable::notify_one() {
if (numWaiting == 0) {
@@ -122,13 +125,13 @@
// Currently executing on a scheduler fiber.
// Yield to let other tasks run that can unblock this fiber.
mutex.lock();
- waiting.emplace(fiber);
+ auto it = waiting.emplace_front(fiber);
mutex.unlock();
fiber->wait(lock, pred);
mutex.lock();
- waiting.erase(fiber);
+ waiting.erase(it);
mutex.unlock();
} else {
// Currently running outside of the scheduler.
@@ -163,13 +166,13 @@
// Currently executing on a scheduler fiber.
// Yield to let other tasks run that can unblock this fiber.
mutex.lock();
- waiting.emplace(fiber);
+ auto it = waiting.emplace_front(fiber);
mutex.unlock();
auto res = fiber->wait(lock, timeout, pred);
mutex.lock();
- waiting.erase(fiber);
+ waiting.erase(it);
mutex.unlock();
return res;
diff --git a/third_party/marl/include/marl/containers.h b/third_party/marl/include/marl/containers.h
index ef8c5c0..fcac46e 100644
--- a/third_party/marl/include/marl/containers.h
+++ b/third_party/marl/include/marl/containers.h
@@ -19,10 +19,9 @@
#include "memory.h"
#include <algorithm> // std::max
+#include <cstddef> // size_t
#include <utility> // std::move
-#include <cstddef> // size_t
-
namespace marl {
namespace containers {
@@ -243,6 +242,207 @@
}
}
+////////////////////////////////////////////////////////////////////////////////
+// list<T, BASE_CAPACITY>
+////////////////////////////////////////////////////////////////////////////////
+
+// list is a minimal std::list like container that supports constant time
+// insertion and removal of elements.
+// list keeps hold of allocations (it only releases allocations on destruction),
+// to avoid repeated heap allocations and frees when frequently inserting and
+// removing elements.
+template <typename T>
+class list {
+ struct Entry {
+ T data;
+ Entry* next;
+ Entry* prev;
+ };
+
+ public:
+ class iterator {
+ public:
+ inline iterator(Entry*);
+ inline T* operator->();
+ inline T& operator*();
+ inline iterator& operator++();
+ inline bool operator==(const iterator&) const;
+ inline bool operator!=(const iterator&) const;
+
+ private:
+ friend list;
+ Entry* entry;
+ };
+
+ inline list(Allocator* allocator = Allocator::Default);
+ inline ~list();
+
+ inline iterator begin();
+ inline iterator end();
+ inline size_t size() const;
+
+ template <typename... Args>
+ iterator emplace_front(Args&&... args);
+ inline void erase(iterator);
+
+ private:
+ // copy / move is currently unsupported.
+ list(const list&) = delete;
+ list(list&&) = delete;
+ list& operator=(const list&) = delete;
+ list& operator=(list&&) = delete;
+
+ void grow(size_t count);
+
+ static void unlink(Entry* entry, Entry*& list);
+ static void link(Entry* entry, Entry*& list);
+
+ Allocator* const allocator;
+ size_t size_ = 0;
+ size_t capacity = 0;
+ vector<Allocation, 8> allocations;
+ Entry* free = nullptr;
+ Entry* head = nullptr;
+};
+
+template <typename T>
+list<T>::iterator::iterator(Entry* entry) : entry(entry) {}
+
+template <typename T>
+T* list<T>::iterator::operator->() {
+ return &entry->data;
+}
+
+template <typename T>
+T& list<T>::iterator::operator*() {
+ return entry->data;
+}
+
+template <typename T>
+typename list<T>::iterator& list<T>::iterator::operator++() {
+ entry = entry->next;
+ return *this;
+}
+
+template <typename T>
+bool list<T>::iterator::operator==(const iterator& rhs) const {
+ return entry == rhs.entry;
+}
+
+template <typename T>
+bool list<T>::iterator::operator!=(const iterator& rhs) const {
+ return entry != rhs.entry;
+}
+
+template <typename T>
+list<T>::list(Allocator* allocator /* = Allocator::Default */)
+ : allocator(allocator), allocations(allocator) {
+ grow(8);
+}
+
+template <typename T>
+list<T>::~list() {
+ for (auto el = head; el != nullptr; el = el->next) {
+ el->data.~T();
+ }
+ for (auto alloc : allocations) {
+ allocator->free(alloc);
+ }
+}
+
+template <typename T>
+typename list<T>::iterator list<T>::begin() {
+ return {head};
+}
+
+template <typename T>
+typename list<T>::iterator list<T>::end() {
+ return {nullptr};
+}
+
+template <typename T>
+size_t list<T>::size() const {
+ return size_;
+}
+
+template <typename T>
+template <typename... Args>
+typename list<T>::iterator list<T>::emplace_front(Args&&... args) {
+ if (free == nullptr) {
+ grow(capacity);
+ }
+
+ auto entry = free;
+
+ unlink(entry, free);
+ link(entry, head);
+
+ new (&entry->data) T(std::forward<T>(args)...);
+ size_++;
+
+ return entry;
+}
+
+template <typename T>
+void list<T>::erase(iterator it) {
+ auto entry = it.entry;
+ unlink(entry, head);
+ link(entry, free);
+
+ entry->data.~T();
+ size_--;
+}
+
+template <typename T>
+void list<T>::grow(size_t count) {
+ Allocation::Request request;
+ request.size = sizeof(Entry) * count;
+ request.alignment = alignof(Entry);
+ request.usage = Allocation::Usage::List;
+ auto alloc = allocator->allocate(request);
+
+ auto entries = reinterpret_cast<Entry*>(alloc.ptr);
+ for (size_t i = 0; i < count; i++) {
+ auto entry = &entries[i];
+ entry->prev = nullptr;
+ entry->next = free;
+ if (free) {
+ free->prev = entry;
+ }
+ free = entry;
+ }
+
+ allocations.emplace_back(std::move(alloc));
+
+ capacity += count;
+}
+
+template <typename T>
+void list<T>::unlink(Entry* entry, Entry*& list) {
+ if (list == entry) {
+ list = list->next;
+ }
+ if (entry->prev) {
+ entry->prev->next = entry->next;
+ }
+ if (entry->next) {
+ entry->next->prev = entry->prev;
+ }
+ entry->prev = nullptr;
+ entry->next = nullptr;
+}
+
+template <typename T>
+void list<T>::link(Entry* entry, Entry*& list) {
+ MARL_ASSERT(entry->next == nullptr, "link() called on entry already linked");
+ MARL_ASSERT(entry->prev == nullptr, "link() called on entry already linked");
+ if (list) {
+ entry->next = list;
+ list->prev = entry;
+ }
+ list = entry;
+}
+
} // namespace containers
} // namespace marl
diff --git a/third_party/marl/include/marl/event.h b/third_party/marl/include/marl/event.h
index 2c8078e..f7b3023 100644
--- a/third_party/marl/include/marl/event.h
+++ b/third_party/marl/include/marl/event.h
@@ -102,7 +102,7 @@
private:
struct Shared {
- inline Shared(Mode mode, bool initialState);
+ inline Shared(Allocator* allocator, Mode mode, bool initialState);
inline void signal();
inline void wait();
@@ -123,8 +123,8 @@
const std::shared_ptr<Shared> shared;
};
-Event::Shared::Shared(Mode mode, bool initialState)
- : mode(mode), signalled(initialState) {}
+Event::Shared::Shared(Allocator* allocator, Mode mode, bool initialState)
+ : cv(allocator), mode(mode), signalled(initialState) {}
void Event::Shared::signal() {
std::unique_lock<std::mutex> lock(mutex);
@@ -179,7 +179,7 @@
Event::Event(Mode mode /* = Mode::Auto */,
bool initialState /* = false */,
Allocator* allocator /* = Allocator::Default */)
- : shared(allocator->make_shared<Shared>(mode, initialState)) {}
+ : shared(allocator->make_shared<Shared>(allocator, mode, initialState)) {}
void Event::signal() const {
shared->signal();
diff --git a/third_party/marl/include/marl/memory.h b/third_party/marl/include/marl/memory.h
index d150fdf..8c35e01 100644
--- a/third_party/marl/include/marl/memory.h
+++ b/third_party/marl/include/marl/memory.h
@@ -38,7 +38,8 @@
Undefined = 0,
Stack, // Fiber stack
Create, // Allocator::create(), make_unique(), make_shared()
- Vector, // marl::vector<T>
+ Vector, // marl::containers::vector<T>
+ List, // marl::containers::list<T>
Count, // Not intended to be used as a usage type - used for upper bound.
};
diff --git a/third_party/marl/include/marl/pool.h b/third_party/marl/include/marl/pool.h
index 693ef3f..934fa40 100644
--- a/third_party/marl/include/marl/pool.h
+++ b/third_party/marl/include/marl/pool.h
@@ -232,7 +232,7 @@
private:
class Storage : public Pool<T>::Storage {
public:
- inline Storage();
+ inline Storage(Allocator* allocator);
inline ~Storage();
inline void return_(Item*) override;
@@ -245,7 +245,8 @@
};
template <typename T, int N, PoolPolicy POLICY>
-BoundedPool<T, N, POLICY>::Storage::Storage() {
+BoundedPool<T, N, POLICY>::Storage::Storage(Allocator* allocator)
+ : returned(allocator) {
for (int i = 0; i < N; i++) {
if (POLICY == PoolPolicy::Preserve) {
items[i].construct();
@@ -267,7 +268,7 @@
template <typename T, int N, PoolPolicy POLICY>
BoundedPool<T, N, POLICY>::BoundedPool(
Allocator* allocator /* = Allocator::Default */)
- : storage(allocator->make_shared<Storage>()) {}
+ : storage(allocator->make_shared<Storage>(allocator)) {}
template <typename T, int N, PoolPolicy POLICY>
typename BoundedPool<T, N, POLICY>::Loan BoundedPool<T, N, POLICY>::borrow()
diff --git a/third_party/marl/include/marl/scheduler.h b/third_party/marl/include/marl/scheduler.h
index e94d035..92acbde 100644
--- a/third_party/marl/include/marl/scheduler.h
+++ b/third_party/marl/include/marl/scheduler.h
@@ -18,28 +18,27 @@
#include "debug.h"
#include "memory.h"
#include "sal.h"
+#include "task.h"
#include "thread.h"
#include <array>
#include <atomic>
#include <chrono>
#include <condition_variable>
+#include <deque>
#include <functional>
#include <map>
#include <mutex>
-#include <queue>
#include <set>
#include <thread>
#include <unordered_map>
#include <unordered_set>
+#include <vector>
namespace marl {
class OSFiber;
-// Task is a unit of work for the scheduler.
-using Task = std::function<void()>;
-
// Scheduler asynchronously processes Tasks.
// A scheduler can be bound to one or more threads using the bind() method.
// Once bound to a thread, that thread can call marl::schedule() to enqueue
@@ -257,8 +256,8 @@
// TODO: Implement a queue that recycles elements to reduce number of
// heap allocations.
- using TaskQueue = std::queue<Task>;
- using FiberQueue = std::queue<Fiber*>;
+ using TaskQueue = std::deque<Task>;
+ using FiberQueue = std::deque<Fiber*>;
using FiberSet = std::unordered_set<Fiber*>;
// Workers executes Tasks on a single thread.
@@ -318,9 +317,9 @@
// flush() processes all pending tasks before returning.
void flush();
- // dequeue() attempts to take a Task from the worker. Returns true if
- // a task was taken and assigned to out, otherwise false.
- bool dequeue(Task& out);
+ // steal() attempts to steal a Task from the worker for another worker.
+ // Returns true if a task was taken and assigned to out, otherwise false.
+ bool steal(Task& out);
// getCurrent() returns the Worker currently bound to the current
// thread.
@@ -338,14 +337,17 @@
// continue to process tasks until stop() is called.
// If the worker was constructed in Mode::SingleThreaded, run() call
// flush() and return.
+ _Requires_lock_held_(work.mutex)
void run();
// createWorkerFiber() creates a new fiber that when executed calls
// run().
+ _Requires_lock_held_(work.mutex)
Fiber* createWorkerFiber();
// switchToFiber() switches execution to the given fiber. The fiber
// must belong to this worker.
+ _Requires_lock_held_(work.mutex)
void switchToFiber(Fiber*);
// runUntilIdle() executes all pending tasks and then returns.
@@ -387,8 +389,13 @@
_Guarded_by_(mutex) TaskQueue tasks;
_Guarded_by_(mutex) FiberQueue fibers;
_Guarded_by_(mutex) WaitingFibers waiting;
+ _Guarded_by_(mutex) bool notifyAdded = true;
std::condition_variable added;
std::mutex mutex;
+
+ _Requires_lock_held_(mutex)
+ template <typename F>
+ inline void wait(F&&);
};
// https://en.wikipedia.org/wiki/Xorshift
@@ -418,7 +425,7 @@
std::vector<Allocator::unique_ptr<Fiber>>
workerFibers; // All fibers created by this worker.
FastRnd rng;
- std::atomic<bool> shutdown = {false};
+ bool shutdown = false;
};
// stealWork() attempts to steal a task from the worker with the given id.
@@ -472,6 +479,14 @@
return currentFiber;
}
+// schedule() schedules the task T to be asynchronously called using the
+// currently bound scheduler.
+inline void schedule(Task&& t) {
+ MARL_ASSERT_HAS_BOUND_SCHEDULER("marl::schedule");
+ auto scheduler = Scheduler::get();
+ scheduler->enqueue(std::move(t));
+}
+
// schedule() schedules the function f to be asynchronously called with the
// given arguments using the currently bound scheduler.
template <typename Function, typename... Args>
@@ -488,7 +503,7 @@
inline void schedule(Function&& f) {
MARL_ASSERT_HAS_BOUND_SCHEDULER("marl::schedule");
auto scheduler = Scheduler::get();
- scheduler->enqueue(std::forward<Function>(f));
+ scheduler->enqueue(Task(std::forward<Function>(f)));
}
} // namespace marl
diff --git a/third_party/marl/include/marl/task.h b/third_party/marl/include/marl/task.h
new file mode 100644
index 0000000..440615b
--- /dev/null
+++ b/third_party/marl/include/marl/task.h
@@ -0,0 +1,104 @@
+// Copyright 2020 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef marl_task_h
+#define marl_task_h
+
+#include <functional>
+
+namespace marl {
+
+// Task is a unit of work for the scheduler.
+class Task {
+ public:
+ using Function = std::function<void()>;
+
+ enum class Flags {
+ None = 0,
+
+ // SameThread ensures the task will be run on the same thread that scheduled
+ // the task. This can offer performance improvements if the current thread
+ // is immediately going to block on the newly scheduled task, by reducing
+ // overheads of waking another thread.
+ SameThread = 1,
+ };
+
+ inline Task();
+ inline Task(const Task&);
+ inline Task(Task&&);
+ inline Task(const Function& function, Flags flags = Flags::None);
+ inline Task(Function&& function, Flags flags = Flags::None);
+ inline Task& operator=(const Task&);
+ inline Task& operator=(Task&&);
+ inline Task& operator=(const Function&);
+ inline Task& operator=(Function&&);
+
+ // operator bool() returns true if the Task has a valid function.
+ inline operator bool() const;
+
+ // operator()() runs the task.
+ inline void operator()() const;
+
+ // is() returns true if the Task was created with the given flag.
+ inline bool is(Flags flag) const;
+
+ private:
+ Function function;
+ Flags flags = Flags::None;
+};
+
+Task::Task() {}
+Task::Task(const Task& o) : function(o.function), flags(o.flags) {}
+Task::Task(Task&& o) : function(std::move(o.function)), flags(o.flags) {}
+Task::Task(const Function& function, Flags flags /* = Flags::None */)
+ : function(function), flags(flags) {}
+Task::Task(Function&& function, Flags flags /* = Flags::None */)
+ : function(std::move(function)), flags(flags) {}
+Task& Task::operator=(const Task& o) {
+ function = o.function;
+ flags = o.flags;
+ return *this;
+}
+Task& Task::operator=(Task&& o) {
+ function = std::move(o.function);
+ flags = o.flags;
+ return *this;
+}
+
+Task& Task::operator=(const Function& f) {
+ function = f;
+ flags = Flags::None;
+ return *this;
+}
+Task& Task::operator=(Function&& f) {
+ function = std::move(f);
+ flags = Flags::None;
+ return *this;
+}
+Task::operator bool() const {
+ return function.operator bool();
+}
+
+void Task::operator()() const {
+ function();
+}
+
+bool Task::is(Flags flag) const {
+ return (static_cast<int>(flags) & static_cast<int>(flag)) ==
+ static_cast<int>(flag);
+}
+
+} // namespace marl
+
+#endif // marl_task_h
diff --git a/third_party/marl/include/marl/ticket.h b/third_party/marl/include/marl/ticket.h
index 10c2b77..59a38fc 100644
--- a/third_party/marl/include/marl/ticket.h
+++ b/third_party/marl/include/marl/ticket.h
@@ -62,6 +62,8 @@
struct Record;
public:
+ using OnCall = std::function<void()>;
+
// Queue hands out Tickets.
class Queue {
public:
@@ -93,7 +95,7 @@
// onCall() registers the function f to be invoked when this ticket is
// called. If the ticket is already called prior to calling onCall(), then
// f() will be executed immediately.
- // F must be a function of the signature: void F()
+ // F must be a function of the OnCall signature.
template <typename F>
inline void onCall(F&& f) const;
@@ -111,7 +113,7 @@
Record* next = nullptr; // guarded by shared->mutex
Record* prev = nullptr; // guarded by shared->mutex
inline void unlink(); // guarded by shared->mutex
- Task onCall; // guarded by shared->mutex
+ OnCall onCall; // guarded by shared->mutex
bool isCalled = false; // guarded by shared->mutex
std::atomic<bool> isDone = {false};
};
@@ -155,7 +157,7 @@
a();
b();
}
- Task a, b;
+ OnCall a, b;
};
record->onCall = std::move(Joined{std::move(record->onCall), std::move(f)});
} else {
@@ -228,13 +230,13 @@
return;
}
isCalled = true;
- Task task;
- std::swap(task, onCall);
+ OnCall callback;
+ std::swap(callback, onCall);
isCalledCondVar.notify_all();
lock.unlock();
- if (task) {
- marl::schedule(std::move(task));
+ if (callback) {
+ marl::schedule(std::move(callback));
}
}
diff --git a/third_party/marl/include/marl/waitgroup.h b/third_party/marl/include/marl/waitgroup.h
index df7c349..7adbb19 100644
--- a/third_party/marl/include/marl/waitgroup.h
+++ b/third_party/marl/include/marl/waitgroup.h
@@ -51,7 +51,8 @@
class WaitGroup {
public:
// Constructs the WaitGroup with the specified initial count.
- inline WaitGroup(unsigned int initialCount = 0);
+ inline WaitGroup(unsigned int initialCount = 0,
+ Allocator* allocator = Allocator::Default);
// add() increments the internal counter by count.
inline void add(unsigned int count = 1) const;
@@ -65,14 +66,20 @@
private:
struct Data {
+ inline Data(Allocator* allocator);
+
std::atomic<unsigned int> count = {0};
- ConditionVariable condition;
+ ConditionVariable cv;
std::mutex mutex;
};
- const std::shared_ptr<Data> data = std::make_shared<Data>();
+ const std::shared_ptr<Data> data;
};
-inline WaitGroup::WaitGroup(unsigned int initialCount /* = 0 */) {
+WaitGroup::Data::Data(Allocator* allocator) : cv(allocator) {}
+
+WaitGroup::WaitGroup(unsigned int initialCount /* = 0 */,
+ Allocator* allocator /* = Allocator::Default */)
+ : data(std::make_shared<Data>(allocator)) {
data->count = initialCount;
}
@@ -85,7 +92,7 @@
auto count = --data->count;
if (count == 0) {
std::unique_lock<std::mutex> lock(data->mutex);
- data->condition.notify_all();
+ data->cv.notify_all();
return true;
}
return false;
@@ -93,7 +100,7 @@
void WaitGroup::wait() const {
std::unique_lock<std::mutex> lock(data->mutex);
- data->condition.wait(lock, [this] { return data->count == 0; });
+ data->cv.wait(lock, [this] { return data->count == 0; });
}
} // namespace marl
diff --git a/third_party/marl/src/containers_test.cpp b/third_party/marl/src/containers_test.cpp
index bb5c9b0..2bc981c 100644
--- a/third_party/marl/src/containers_test.cpp
+++ b/third_party/marl/src/containers_test.cpp
@@ -193,3 +193,93 @@
ASSERT_EQ(vectorB[1], "B");
ASSERT_EQ(vectorB[2], "C");
}
+
+class ContainersListTest : public WithoutBoundScheduler {};
+
+TEST_F(ContainersListTest, Empty) {
+ marl::containers::list<std::string> list(allocator);
+ ASSERT_EQ(list.size(), size_t(0));
+}
+
+TEST_F(ContainersListTest, EmplaceOne) {
+ marl::containers::list<std::string> list(allocator);
+ auto itEntry = list.emplace_front("hello world");
+ ASSERT_EQ(*itEntry, "hello world");
+ ASSERT_EQ(list.size(), size_t(1));
+ auto it = list.begin();
+ ASSERT_EQ(it, itEntry);
+ ++it;
+ ASSERT_EQ(it, list.end());
+}
+
+TEST_F(ContainersListTest, EmplaceThree) {
+ marl::containers::list<std::string> list(allocator);
+ auto itA = list.emplace_front("a");
+ auto itB = list.emplace_front("b");
+ auto itC = list.emplace_front("c");
+ ASSERT_EQ(*itA, "a");
+ ASSERT_EQ(*itB, "b");
+ ASSERT_EQ(*itC, "c");
+ ASSERT_EQ(list.size(), size_t(3));
+ auto it = list.begin();
+ ASSERT_EQ(it, itC);
+ ++it;
+ ASSERT_EQ(it, itB);
+ ++it;
+ ASSERT_EQ(it, itA);
+ ++it;
+ ASSERT_EQ(it, list.end());
+}
+
+TEST_F(ContainersListTest, EraseFront) {
+ marl::containers::list<std::string> list(allocator);
+ auto itA = list.emplace_front("a");
+ auto itB = list.emplace_front("b");
+ auto itC = list.emplace_front("c");
+ list.erase(itC);
+ ASSERT_EQ(list.size(), size_t(2));
+ auto it = list.begin();
+ ASSERT_EQ(it, itB);
+ ++it;
+ ASSERT_EQ(it, itA);
+ ++it;
+ ASSERT_EQ(it, list.end());
+}
+
+TEST_F(ContainersListTest, EraseBack) {
+ marl::containers::list<std::string> list(allocator);
+ auto itA = list.emplace_front("a");
+ auto itB = list.emplace_front("b");
+ auto itC = list.emplace_front("c");
+ list.erase(itA);
+ ASSERT_EQ(list.size(), size_t(2));
+ auto it = list.begin();
+ ASSERT_EQ(it, itC);
+ ++it;
+ ASSERT_EQ(it, itB);
+ ++it;
+ ASSERT_EQ(it, list.end());
+}
+
+TEST_F(ContainersListTest, EraseMid) {
+ marl::containers::list<std::string> list(allocator);
+ auto itA = list.emplace_front("a");
+ auto itB = list.emplace_front("b");
+ auto itC = list.emplace_front("c");
+ list.erase(itB);
+ ASSERT_EQ(list.size(), size_t(2));
+ auto it = list.begin();
+ ASSERT_EQ(it, itC);
+ ++it;
+ ASSERT_EQ(it, itA);
+ ++it;
+ ASSERT_EQ(it, list.end());
+}
+
+TEST_F(ContainersListTest, Grow) {
+ marl::containers::list<std::string> list(allocator);
+ for (int i = 0; i < 256; i++) {
+ list.emplace_front(std::to_string(i));
+ }
+ ASSERT_EQ(list.size(), size_t(256));
+}
diff --git a/third_party/marl/src/event_bench.cpp b/third_party/marl/src/event_bench.cpp
index 985c718..dc8f569 100644
--- a/third_party/marl/src/event_bench.cpp
+++ b/third_party/marl/src/event_bench.cpp
@@ -36,3 +36,37 @@
});
}
BENCHMARK_REGISTER_F(Schedule, Event)->Apply(Schedule::args<512>);
+
+// EventBaton benchmarks alternating execution of two tasks.
+BENCHMARK_DEFINE_F(Schedule, EventBaton)(benchmark::State& state) {
+ run(state, [&](int numPasses) {
+ for (auto _ : state) {
+ marl::Event passToA(marl::Event::Mode::Auto);
+ marl::Event passToB(marl::Event::Mode::Auto);
+ marl::Event done(marl::Event::Mode::Auto);
+
+ marl::schedule(marl::Task(
+ [=] {
+ for (int i = 0; i < numPasses; i++) {
+ passToA.wait();
+ passToB.signal();
+ }
+ },
+ marl::Task::Flags::SameThread));
+
+ marl::schedule(marl::Task(
+ [=] {
+ for (int i = 0; i < numPasses; i++) {
+ passToB.wait();
+ passToA.signal();
+ }
+ done.signal();
+ },
+ marl::Task::Flags::SameThread));
+
+ passToA.signal();
+ done.wait();
+ }
+ });
+}
+BENCHMARK_REGISTER_F(Schedule, EventBaton)->Apply(Schedule::args<1000000>);
diff --git a/third_party/marl/src/marl_bench.h b/third_party/marl/src/marl_bench.h
index 4932b40..e4c2cee 100644
--- a/third_party/marl/src/marl_bench.h
+++ b/third_party/marl/src/marl_bench.h
@@ -48,13 +48,13 @@
}
}
- // numThreads return the number of threads in the benchmark run from the
+ // numThreads() return the number of threads in the benchmark run from the
// state.
static int numThreads(const ::benchmark::State& state) {
return static_cast<int>(state.range(1));
}
- // numTasks return the number of tasks in the benchmark run from the state.
+ // numTasks() return the number of tasks in the benchmark run from the state.
static int numTasks(const ::benchmark::State& state) {
return static_cast<int>(state.range(0));
}
diff --git a/third_party/marl/src/scheduler.cpp b/third_party/marl/src/scheduler.cpp
index d8de740..1f10f3a 100644
--- a/third_party/marl/src/scheduler.cpp
+++ b/third_party/marl/src/scheduler.cpp
@@ -60,9 +60,9 @@
#endif
template <typename T>
-inline T take(std::queue<T>& queue) {
+inline T take(std::deque<T>& queue) {
auto out = std::move(queue.front());
- queue.pop();
+ queue.pop_front();
return out;
}
@@ -189,6 +189,10 @@
}
void Scheduler::enqueue(Task&& task) {
+ if (task.is(Task::Flags::SameThread)) {
+ Scheduler::Worker::getCurrent()->enqueue(std::move(task));
+ return;
+ }
if (numWorkerThreads > 0) {
while (true) {
// Prioritize workers that have recently started spinning.
@@ -220,7 +224,7 @@
if (numWorkerThreads > 0) {
auto thread = workerThreads[from % numWorkerThreads];
if (thread != thief) {
- if (thread->dequeue(out)) {
+ if (thread->steal(out)) {
return true;
}
}
@@ -372,7 +376,10 @@
Worker::current = this;
mainFiber = Fiber::createFromCurrentThread(scheduler->allocator, 0);
currentFiber = mainFiber.get();
- run();
+ {
+ std::unique_lock<std::mutex> lock(work.mutex);
+ run();
+ }
mainFiber.reset();
Worker::current = nullptr;
});
@@ -392,8 +399,7 @@
void Scheduler::Worker::stop() {
switch (mode) {
case Mode::MultiThreaded:
- shutdown = true;
- enqueue([] {}); // Ensure the worker is woken up to notice the shutdown.
+ enqueue(Task([this] { shutdown = true; }, Task::Flags::SameThread));
thread.join();
break;
@@ -462,22 +468,16 @@
work.num--;
auto to = take(work.fibers);
ASSERT_FIBER_STATE(to, Fiber::State::Queued);
- work.mutex.unlock();
switchToFiber(to);
- work.mutex.lock();
} else if (idleFibers.size() > 0) {
// There's an old fiber we can reuse, resume that.
auto to = take(idleFibers);
ASSERT_FIBER_STATE(to, Fiber::State::Idle);
- work.mutex.unlock();
switchToFiber(to);
- work.mutex.lock();
} else {
// Tasks to process and no existing fibers to resume.
// Spawn a new fiber.
- work.mutex.unlock();
switchToFiber(createWorkerFiber());
- work.mutex.lock();
}
setFiberState(currentFiber, Fiber::State::Running);
@@ -503,15 +503,15 @@
case Fiber::State::Yielded:
break;
}
- bool wasIdle = work.num == 0;
- work.fibers.push(std::move(fiber));
+ bool notify = work.notifyAdded;
+ work.fibers.push_back(std::move(fiber));
MARL_ASSERT(!work.waiting.contains(fiber),
"fiber is unexpectedly in the waiting list");
setFiberState(fiber, Fiber::State::Queued);
work.num++;
lock.unlock();
- if (wasIdle) {
+ if (notify) {
work.added.notify_one();
}
}
@@ -524,23 +524,24 @@
_Requires_lock_held_(work.mutex)
_Releases_lock_(work.mutex)
void Scheduler::Worker::enqueueAndUnlock(Task&& task) {
- auto wasIdle = work.num == 0;
- work.tasks.push(std::move(task));
+ auto notify = work.notifyAdded;
+ work.tasks.push_back(std::move(task));
work.num++;
work.mutex.unlock();
- if (wasIdle) {
+ if (notify) {
work.added.notify_one();
}
}
-bool Scheduler::Worker::dequeue(Task& out) {
+bool Scheduler::Worker::steal(Task& out) {
if (work.num.load() == 0) {
return false;
}
if (!work.mutex.try_lock()) {
return false;
}
- if (work.tasks.size() == 0) {
+ if (work.tasks.size() == 0 ||
+ work.tasks.front().is(Task::Flags::SameThread)) {
work.mutex.unlock();
return false;
}
@@ -550,6 +551,7 @@
return true;
}
+_Requires_lock_held_(work.mutex)
void Scheduler::Worker::flush() {
MARL_ASSERT(mode == Mode::SingleThreaded,
"flush() can only be used on a single-threaded worker");
@@ -557,33 +559,30 @@
runUntilIdle();
}
+_Requires_lock_held_(work.mutex)
void Scheduler::Worker::run() {
switch (mode) {
case Mode::MultiThreaded: {
MARL_NAME_THREAD("Thread<%.2d> Fiber<%.2d>", int(id),
Fiber::current()->id);
- {
- std::unique_lock<std::mutex> lock(work.mutex);
- work.added.wait(
- lock, [this] { return work.num > 0 || work.waiting || shutdown; });
- while (!shutdown || work.num > 0 || numBlockedFibers() > 0U) {
- waitForWork();
- runUntilIdle();
- }
- Worker::current = nullptr;
+ work.wait([this] { return work.num > 0 || work.waiting || shutdown; });
+ while (!shutdown || work.num > 0 || numBlockedFibers() > 0U) {
+ waitForWork();
+ runUntilIdle();
}
+ Worker::current = nullptr;
switchToFiber(mainFiber.get());
break;
}
- case Mode::SingleThreaded:
+ case Mode::SingleThreaded: {
ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
while (!shutdown) {
- flush();
+ runUntilIdle();
idleFibers.emplace(currentFiber);
switchToFiber(mainFiber.get());
}
break;
-
+ }
default:
MARL_ASSERT(false, "Unknown mode: %d", int(mode));
}
@@ -593,26 +592,22 @@
void Scheduler::Worker::waitForWork() {
MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(),
"work.num out of sync");
- if (work.num == 0 && mode == Mode::MultiThreaded) {
+ if (work.num > 0) {
+ return;
+ }
+
+ if (mode == Mode::MultiThreaded) {
scheduler->onBeginSpinning(id);
work.mutex.unlock();
spinForWork();
work.mutex.lock();
}
+ work.wait([this] {
+ return work.num > 0 || (shutdown && numBlockedFibers() == 0U);
+ });
if (work.waiting) {
- std::unique_lock<std::mutex> lock(work.mutex, std::adopt_lock);
- work.added.wait_until(lock, work.waiting.next(), [this] {
- return work.num > 0 || (shutdown && numBlockedFibers() == 0U);
- });
- lock.release(); // Keep the lock held.
enqueueFiberTimeouts();
- } else {
- std::unique_lock<std::mutex> lock(work.mutex, std::adopt_lock);
- work.added.wait(lock, [this] {
- return work.num > 0 || (shutdown && numBlockedFibers() == 0U);
- });
- lock.release(); // Keep the lock held.
}
}
@@ -622,7 +617,7 @@
while (auto fiber = work.waiting.take(now)) {
changeFiberState(fiber, Fiber::State::Waiting, Fiber::State::Queued);
DBG_LOG("%d: TIMEOUT(%d)", (int)id, (int)fiber->id);
- work.fibers.push(fiber);
+ work.fibers.push_back(fiber);
work.num++;
}
}
@@ -667,7 +662,7 @@
if (scheduler->stealWork(this, rng(), stolen)) {
std::unique_lock<std::mutex> lock(work.mutex);
- work.tasks.emplace(std::move(stolen));
+ work.tasks.emplace_back(std::move(stolen));
work.num++;
return;
}
@@ -695,15 +690,11 @@
ASSERT_FIBER_STATE(fiber, Fiber::State::Queued);
changeFiberState(currentFiber, Fiber::State::Running, Fiber::State::Idle);
- work.mutex.unlock();
- { // unlocked
- auto added = idleFibers.emplace(currentFiber).second;
- (void)added;
- MARL_ASSERT(added, "fiber already idle");
+ auto added = idleFibers.emplace(currentFiber).second;
+ (void)added;
+ MARL_ASSERT(added, "fiber already idle");
- switchToFiber(fiber);
- }
- work.mutex.lock();
+ switchToFiber(fiber);
changeFiberState(currentFiber, Fiber::State::Idle, Fiber::State::Running);
}
@@ -724,6 +715,7 @@
}
}
+_Requires_lock_held_(work.mutex)
Scheduler::Fiber* Scheduler::Worker::createWorkerFiber() {
auto fiberId = static_cast<uint32_t>(workerFibers.size() + 1);
DBG_LOG("%d: CREATE(%d)", (int)id, (int)fiberId);
@@ -734,6 +726,7 @@
return ptr;
}
+_Requires_lock_held_(work.mutex)
void Scheduler::Worker::switchToFiber(Fiber* to) {
DBG_LOG("%d: SWITCH(%d -> %d)", (int)id, (int)currentFiber->id, (int)to->id);
MARL_ASSERT(to == mainFiber.get() || idleFibers.count(to) == 0,
@@ -743,4 +736,21 @@
from->switchTo(to);
}
+////////////////////////////////////////////////////////////////////////////////
+// Scheduler::Worker::Work
+////////////////////////////////////////////////////////////////////////////////
+_Requires_lock_held_(mutex)
+template <typename F>
+void Scheduler::Worker::Work::wait(F&& f) {
+ std::unique_lock<std::mutex> lock(mutex, std::adopt_lock);
+ notifyAdded = true;
+ if (waiting) {
+ added.wait_until(lock, waiting.next(), f);
+ } else {
+ added.wait(lock, f);
+ }
+ notifyAdded = false;
+ lock.release(); // Keep the lock held.
+}
+
} // namespace marl