Yarn: Add Ticket, Ticket::Queue
Ticket is a synchronization primitive used to serially order execution.
Bug: b/139010488
Change-Id: I7f34556609b0206c87f57be9c6bc14b21096f8f4
Reviewed-on: https://swiftshader-review.googlesource.com/c/SwiftShader/+/34818
Tested-by: Ben Clayton <bclayton@google.com>
Reviewed-by: Nicolas Capens <nicolascapens@google.com>
Kokoro-Presubmit: kokoro <noreply+kokoro@google.com>
diff --git a/src/Yarn/Ticket.hpp b/src/Yarn/Ticket.hpp
new file mode 100644
index 0000000..ebe4d3b
--- /dev/null
+++ b/src/Yarn/Ticket.hpp
@@ -0,0 +1,267 @@
+// Copyright 2019 The SwiftShader Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef yarn_ticket_hpp
+#define yarn_ticket_hpp
+
+#include "Pool.hpp"
+#include "ConditionVariable.hpp"
+#include "Scheduler.hpp"
+
+namespace yarn {
+
+// Ticket is a synchronization primitive used to serially order execution.
+//
+// Tickets exist in 3 mutually exclusive states: Waiting, Called and Finished.
+//
+// Tickets are obtained from a Ticket::Queue, using the Ticket::Queue::take()
+// methods. The order in which tickets are taken from the queue dictates the
+// order in which they are called.
+//
+// The first ticket to be taken from a queue will be in the 'called' state,
+// others will be in the 'waiting' state until the previous ticket has finished.
+//
+// Ticket::wait() will block until the ticket is called.
+// Ticket::done() sets the ticket into the 'finished' state and calls the next
+// taken ticket from the queue.
+//
+// If a ticket is taken from a queue and does not have done() called before
+// its last reference is dropped, it will implicitly call done(), calling the
+// next ticket.
+//
+// Example:
+//
+// void runTasksConcurrentThenSerially(int numConcurrentTasks)
+// {
+// yarn::Ticket::Queue queue;
+// for (int i = 0; i < numConcurrentTasks; i++)
+// {
+// auto ticket = queue.take();
+// yarn::schedule([=] {
+// doConcurrentWork(); // <- function may be called concurrently
+// ticket.wait(); // <- serialize tasks
+// doSerialWork(); // <- function will not be called concurrently
+// ticket.done(); // <- optional, as done() is called implicitly on dropping of last reference
+// });
+// }
+// }
+class Ticket
+{
+ struct Shared;
+ struct Record;
+public:
+
+ // Queue hands out Tickets.
+ class Queue
+ {
+ public:
+ // take() returns a single ticket from the queue.
+ inline Ticket take();
+
+ // take() retrieves count tickets from the queue, calling f() with each
+ // retrieved ticket.
+ // F must be a function of the signature: void(Ticket&&)
+ template <typename F>
+ inline void take(size_t count, const F& f);
+
+ private:
+ std::shared_ptr<Shared> shared = std::make_shared<Shared>();
+ UnboundedPool<Record> pool;
+ };
+
+ inline Ticket() = default;
+ inline Ticket(const Ticket& other) = default;
+ inline Ticket(Ticket&& other) = default;
+ inline Ticket& operator = (const Ticket& other) = default;
+
+ // wait() blocks until the ticket is called.
+ inline void wait() const;
+
+ // done() marks the ticket as finished and calls the next ticket.
+ inline void done() const;
+
+ // onCall() registers the function f to be invoked when this ticket is
+ // called. If the ticket is already called prior to calling onCall(), then
+ // f() will be executed immediately.
+ // F must be a function of the signature: void F()
+ template<typename F>
+ inline void onCall(F&& f) const;
+
+private:
+ // Internal doubly-linked-list data structure. One per ticket instance.
+ struct Record
+ {
+ inline ~Record();
+
+ inline void done();
+ inline void callAndUnlock(std::unique_lock<std::mutex> &lock);
+
+ ConditionVariable isCalledCondVar;
+
+ std::shared_ptr<Shared> shared;
+ Record *next = nullptr; // guarded by shared->mutex
+ Record *prev = nullptr; // guarded by shared->mutex
+ inline void unlink(); // guarded by shared->mutex
+ Task onCall; // guarded by shared->mutex
+ bool isCalled = false; // guarded by shared->mutex
+ std::atomic<bool> isDone = { false };
+ };
+
+ // Data shared between all tickets and the queue.
+ struct Shared
+ {
+ std::mutex mutex;
+ Record tail;
+ };
+
+ inline Ticket(Loan<Record>&& record);
+
+ Loan<Record> record;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+// Ticket
+////////////////////////////////////////////////////////////////////////////////
+
+Ticket::Ticket(Loan<Record>&& record) : record(std::move(record)) {}
+
+void Ticket::wait() const
+{
+ std::unique_lock<std::mutex> lock(record->shared->mutex);
+ record->isCalledCondVar.wait(lock, [this] { return record->isCalled; });
+}
+
+void Ticket::done() const
+{
+ record->done();
+}
+
+template<typename Function>
+void Ticket::onCall(Function&& f) const
+{
+ std::unique_lock<std::mutex> lock(record->shared->mutex);
+ if (record->isCalled)
+ {
+ yarn::schedule(std::move(f));
+ return;
+ }
+ if (record->onCall)
+ {
+ struct Joined
+ {
+ void operator() () const { a(); b(); }
+ Task a, b;
+ };
+ record->onCall = std::move(Joined{ std::move(record->onCall), std::move(f) });
+ }
+ else
+ {
+ record->onCall = std::move(f);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Ticket::Queue
+////////////////////////////////////////////////////////////////////////////////
+
+Ticket Ticket::Queue::take()
+{
+ Ticket out;
+ take(1, [&](Ticket&& ticket) { out = std::move(ticket); });
+ return out;
+}
+
+template <typename F>
+void Ticket::Queue::take(size_t n, const F& f)
+{
+ Loan<Record> first, last;
+ pool.borrow(n, [&] (Loan<Record>&& record) {
+ Loan<Record> rec = std::move(record);
+ rec->shared = shared;
+ if (first.get() == nullptr)
+ {
+ first = rec;
+ }
+ if (last.get() != nullptr)
+ {
+ last->next = rec.get();
+ rec->prev = last.get();
+ }
+ last = rec;
+ f(std::move(Ticket(std::move(rec))));
+ });
+ last->next = &shared->tail;
+ std::unique_lock<std::mutex> lock(shared->mutex);
+ first->prev = shared->tail.prev;
+ shared->tail.prev = last.get();
+ if (first->prev == nullptr)
+ {
+ first->callAndUnlock(lock);
+ }
+ else
+ {
+ first->prev->next = first.get();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Ticket::Record
+////////////////////////////////////////////////////////////////////////////////
+
+Ticket::Record::~Record()
+{
+ if (shared != nullptr)
+ {
+ done();
+ }
+}
+
+void Ticket::Record::done()
+{
+ if (isDone.exchange(true)) { return; }
+ std::unique_lock<std::mutex> lock(shared->mutex);
+ auto callNext = (prev == nullptr && next != nullptr) ? next : nullptr;
+ unlink();
+ if (callNext != nullptr) // lock needs to be held otherwise callNext might be destructed.
+ {
+ callNext->callAndUnlock(lock);
+ }
+}
+
+void Ticket::Record::callAndUnlock(std::unique_lock<std::mutex> &lock)
+{
+ if (isCalled) { return; }
+ isCalled = true;
+ Task task;
+ std::swap(task, onCall);
+ isCalledCondVar.notify_all();
+ lock.unlock();
+
+ if (task)
+ {
+ yarn::schedule(std::move(task));
+ }
+}
+
+void Ticket::Record::unlink()
+{
+ if (prev != nullptr) { prev->next = next; }
+ if (next != nullptr) { next->prev = prev; }
+ prev = nullptr;
+ next = nullptr;
+}
+
+} // namespace yarn
+
+#endif // yarn_ticket_hpp
diff --git a/src/Yarn/Ticket_test.cpp b/src/Yarn/Ticket_test.cpp
new file mode 100644
index 0000000..3bb4b3b
--- /dev/null
+++ b/src/Yarn/Ticket_test.cpp
@@ -0,0 +1,43 @@
+// Copyright 2019 The SwiftShader Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "Yarn_test.hpp"
+
+#include "Yarn/Ticket.hpp"
+
+TEST_P(WithBoundScheduler, Ticket)
+{
+ yarn::Ticket::Queue queue;
+
+ constexpr int count = 1000;
+ std::atomic<int> next = { 0 };
+ int result[count] = {};
+
+ for (int i = 0; i < count; i++)
+ {
+ auto ticket = queue.take();
+ yarn::schedule([ticket, i, &result, &next] {
+ ticket.wait();
+ result[next++] = i;
+ ticket.done();
+ });
+ }
+
+ queue.take().wait();
+
+ for (int i = 0; i < count; i++)
+ {
+ ASSERT_EQ(result[i], i);
+ }
+}