Update Marl to 94a361cf0
Contains a fix for an arm linker error (chromium:1058107) and
scaling beyond 64 threads on Windows.
Changes:
94a361cf0 thread.cpp Fix minor bug in getProcessorGroups()
0249a2624 Thread: Use WaitForObject for Thread::join()
62f209bbb arm: Annotate marl_fiber_swap as type %function
00f091e08 Update README.md
773d9f475 Add new example 'tasks_in_tasks'
3f69e73ce Scheduler: Replace use of std::thread with marl::Thread
Commands:
git subtree pull --prefix third_party/marl https://github.com/google/marl master --squash
Bug: b/140546382
Bug: chromium:1058107
Change-Id: Ic5a96ad5f054a19f6a5a77e1f106c73ba60c0a78
diff --git a/third_party/marl/CMakeLists.txt b/third_party/marl/CMakeLists.txt
index 1119719..54690f9 100644
--- a/third_party/marl/CMakeLists.txt
+++ b/third_party/marl/CMakeLists.txt
@@ -264,4 +264,5 @@
build_example(fractal)
build_example(hello_task)
build_example(primes)
+ build_example(tasks_in_tasks)
endif(MARL_BUILD_EXAMPLES)
diff --git a/third_party/marl/README.md b/third_party/marl/README.md
index 3fd10b4..cea4522 100644
--- a/third_party/marl/README.md
+++ b/third_party/marl/README.md
@@ -65,6 +65,12 @@
}
```
+
+## Benchmarks
+
+Graphs of several microbenchmarks can be found [here](https://google.github.io/marl/benchmarks).
+
+
## Building
Marl contains many unit tests and examples that can be built using CMake.
@@ -117,10 +123,6 @@
add_subdirectory(${MARL_DIR})
```
-## Benchmarks
-
-Graphs of several microbenchmarks can be found [here](https://google.github.io/marl/benchmarks).
-
---
Note: This is not an officially supported Google product
diff --git a/third_party/marl/examples/tasks_in_tasks.cpp b/third_party/marl/examples/tasks_in_tasks.cpp
new file mode 100644
index 0000000..f34c476
--- /dev/null
+++ b/third_party/marl/examples/tasks_in_tasks.cpp
@@ -0,0 +1,81 @@
+// Copyright 2020 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Example of a task that creates and waits on sub tasks.
+
+#include "marl/defer.h"
+#include "marl/scheduler.h"
+#include "marl/waitgroup.h"
+
+#include <cstdio>
+
+int main() {
+ // Create a marl scheduler using the 4 hardware threads.
+ // Bind this scheduler to the main thread so we can call marl::schedule()
+ marl::Scheduler scheduler;
+ scheduler.bind();
+ scheduler.setWorkerThreadCount(4);
+ defer(scheduler.unbind()); // Automatically unbind before returning.
+
+ // marl::schedule() requires the scheduler to be bound to the current thread
+ // (see above). The scheduler ensures that tasks are run on a thread with the
+ // same scheduler automatically bound, so we don't need to call
+ // marl::Scheduler::bind() again below.
+
+ // Sequence of task events:
+ // __________________________________________________________
+ // | |
+ // | ---> [task B] ---- |
+ // | / \ |
+ // | [task A] -----> [task A: wait] -----> [task A: resume] |
+ // | \ / |
+ // | ---> [task C] ---- |
+ // |__________________________________________________________|
+
+ // Create a WaitGroup for waiting for task A to finish.
+ // This has an initial count of 1 (A)
+ marl::WaitGroup a_wg(1);
+
+ // Schedule task A
+ marl::schedule([=] {
+ defer(a_wg.done()); // Decrement a_wg when task A is done
+
+ printf("Hello from task A\n");
+ printf("Starting tasks B and C...\n");
+
+ // Create a WaitGroup for waiting on task B and C to finish.
+ // This has an initial count of 2 (B + C)
+ marl::WaitGroup bc_wg(2);
+
+ // Schedule task B
+ marl::schedule([=] {
+ defer(bc_wg.done()); // Decrement bc_wg when task B is done
+ printf("Hello from task B\n");
+ });
+
+ // Schedule task C
+ marl::schedule([=] {
+ defer(bc_wg.done()); // Decrement bc_wg when task C is done
+ printf("Hello from task C\n");
+ });
+
+ // Wait for tasks B and C to finish.
+ bc_wg.wait();
+ });
+
+ // Wait for task A (and so B and C) to finish.
+ a_wg.wait();
+
+ printf("Task A has finished\n");
+}
diff --git a/third_party/marl/include/marl/scheduler.h b/third_party/marl/include/marl/scheduler.h
index 03d121c..e94d035 100644
--- a/third_party/marl/include/marl/scheduler.h
+++ b/third_party/marl/include/marl/scheduler.h
@@ -18,6 +18,7 @@
#include "debug.h"
#include "memory.h"
#include "sal.h"
+#include "thread.h"
#include <array>
#include <atomic>
@@ -411,7 +412,7 @@
Scheduler* const scheduler;
Allocator::unique_ptr<Fiber> mainFiber;
Fiber* currentFiber = nullptr;
- std::thread thread;
+ Thread thread;
Work work;
FiberSet idleFibers; // Fibers that have completed which can be reused.
std::vector<Allocator::unique_ptr<Fiber>>
diff --git a/third_party/marl/include/marl/thread.h b/third_party/marl/include/marl/thread.h
index 7ce20b3..f18bddd 100644
--- a/third_party/marl/include/marl/thread.h
+++ b/third_party/marl/include/marl/thread.h
@@ -15,12 +15,34 @@
#ifndef marl_thread_h
#define marl_thread_h
+#include <functional>
+
namespace marl {
-// Thread contains static methods that abstract OS-specific thread / cpu
+// Thread provides an OS abstraction for threads of execution.
+// Thread is used by marl instead of std::thread as Windows does not naturally
+// scale beyond 64 logical threads on a single CPU, unless you use the Win32
+// API.
+// Thread alsocontains static methods that abstract OS-specific thread / cpu
// queries and control.
class Thread {
public:
+ using Func = std::function<void()>;
+
+ Thread() = default;
+ Thread(Thread&&);
+ Thread& operator=(Thread&&);
+
+ // Start a new thread that calls func.
+ // logicalCpuHint is a hint to run the thread on the specified logical CPU.
+ // logicalCpuHint may be entirely ignored.
+ Thread(unsigned int logicalCpuHint, const Func& func);
+
+ ~Thread();
+
+ // join() blocks until the thread completes.
+ void join();
+
// setName() sets the name of the currently executing thread for displaying
// in a debugger.
static void setName(const char* fmt, ...);
@@ -28,6 +50,13 @@
// numLogicalCPUs() returns the number of available logical CPU cores for
// the system.
static unsigned int numLogicalCPUs();
+
+ private:
+ Thread(const Thread&) = delete;
+ Thread& operator=(const Thread&) = delete;
+
+ class Impl;
+ Impl* impl = nullptr;
};
} // namespace marl
diff --git a/third_party/marl/src/osfiber_asm_arm.S b/third_party/marl/src/osfiber_asm_arm.S
index d276ac8..c7103dc 100644
--- a/third_party/marl/src/osfiber_asm_arm.S
+++ b/third_party/marl/src/osfiber_asm_arm.S
@@ -23,6 +23,7 @@
.text
.global marl_fiber_swap
.align 4
+.type marl_fiber_swap, %function
marl_fiber_swap:
// Save context 'from'
diff --git a/third_party/marl/src/scheduler.cpp b/third_party/marl/src/scheduler.cpp
index 197dbb9..d8de740 100644
--- a/third_party/marl/src/scheduler.cpp
+++ b/third_party/marl/src/scheduler.cpp
@@ -361,7 +361,7 @@
void Scheduler::Worker::start() {
switch (mode) {
case Mode::MultiThreaded:
- thread = std::thread([=] {
+ thread = Thread(id, [=] {
Thread::setName("Thread<%.2d>", int(id));
if (auto const& initFunc = scheduler->getThreadInitializer()) {
diff --git a/third_party/marl/src/thread.cpp b/third_party/marl/src/thread.cpp
index 6c505d7..fa69bdf 100644
--- a/third_party/marl/src/thread.cpp
+++ b/third_party/marl/src/thread.cpp
@@ -14,6 +14,8 @@
#include "marl/thread.h"
+#include "marl/debug.h"
+#include "marl/defer.h"
#include "marl/trace.h"
#include <cstdarg>
@@ -22,20 +24,135 @@
#if defined(_WIN32)
#define WIN32_LEAN_AND_MEAN 1
#include <windows.h>
-#include <cstdlib> // mbstowcs
+#include <cstdlib> // mbstowcs
+#include <vector>
#elif defined(__APPLE__)
#include <mach/thread_act.h>
#include <pthread.h>
#include <unistd.h>
+#include <thread>
#else
#include <pthread.h>
#include <unistd.h>
+#include <thread>
#endif
namespace marl {
#if defined(_WIN32)
+#define CHECK_WIN32(expr) \
+ do { \
+ auto res = expr; \
+ (void)res; \
+ MARL_ASSERT(res == TRUE, #expr " failed with error: %d", \
+ (int)GetLastError()); \
+ } while (false)
+
+namespace {
+
+struct ProcessorGroup {
+ unsigned int count; // number of logical processors in this group.
+ KAFFINITY affinity; // affinity mask.
+};
+
+const std::vector<ProcessorGroup>& getProcessorGroups() {
+ static std::vector<ProcessorGroup> groups = [] {
+ std::vector<ProcessorGroup> out;
+ SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX info[32] = {};
+ DWORD size = sizeof(info);
+ CHECK_WIN32(GetLogicalProcessorInformationEx(RelationGroup, info, &size));
+ DWORD count = size / sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX);
+ for (DWORD i = 0; i < count; i++) {
+ if (info[i].Relationship == RelationGroup) {
+ auto groupCount = info[i].Group.ActiveGroupCount;
+ for (WORD groupIdx = 0; groupIdx < groupCount; groupIdx++) {
+ auto const& groupInfo = info[i].Group.GroupInfo[groupIdx];
+ out.emplace_back(ProcessorGroup{groupInfo.ActiveProcessorCount,
+ groupInfo.ActiveProcessorMask});
+ }
+ }
+ }
+ return out;
+ }();
+ return groups;
+}
+
+bool getGroupAffinity(unsigned int index, GROUP_AFFINITY* groupAffinity) {
+ auto& groups = getProcessorGroups();
+ for (size_t groupIdx = 0; groupIdx < groups.size(); groupIdx++) {
+ auto& group = groups[groupIdx];
+ if (index < group.count) {
+ for (int i = 0; i < sizeof(group.affinity) * 8; i++) {
+ if (group.affinity & (1ULL << i)) {
+ if (index == 0) {
+ groupAffinity->Group = static_cast<WORD>(groupIdx);
+ // Use the whole group's affinity, as the OS is then able to shuffle
+ // threads around based on external demands. Pinning these to a
+ // single core can cause up to 20% performance loss in benchmarking.
+ groupAffinity->Mask = group.affinity;
+ return true;
+ }
+ index--;
+ }
+ }
+ return false;
+ } else {
+ index -= group.count;
+ }
+ }
+ return false;
+}
+
+} // namespace
+
+class Thread::Impl {
+ public:
+ Impl(const Func& func) : func(func) {}
+ static DWORD WINAPI run(void* self) {
+ reinterpret_cast<Impl*>(self)->func();
+ return 0;
+ }
+
+ Func func;
+ HANDLE handle;
+};
+
+Thread::Thread(unsigned int logicalCpu, const Func& func) {
+ SIZE_T size = 0;
+ InitializeProcThreadAttributeList(nullptr, 1, 0, &size);
+ MARL_ASSERT(size > 0,
+ "InitializeProcThreadAttributeList() did not give a size");
+
+ LPPROC_THREAD_ATTRIBUTE_LIST attributes =
+ reinterpret_cast<LPPROC_THREAD_ATTRIBUTE_LIST>(alloca(size));
+ CHECK_WIN32(InitializeProcThreadAttributeList(attributes, 1, 0, &size));
+ defer(DeleteProcThreadAttributeList(attributes));
+
+ GROUP_AFFINITY groupAffinity = {};
+ if (getGroupAffinity(logicalCpu, &groupAffinity)) {
+ CHECK_WIN32(UpdateProcThreadAttribute(
+ attributes, 0, PROC_THREAD_ATTRIBUTE_GROUP_AFFINITY, &groupAffinity,
+ sizeof(groupAffinity), nullptr, nullptr));
+ }
+
+ impl = new Impl(func);
+ impl->handle = CreateRemoteThreadEx(GetCurrentProcess(), nullptr, 0,
+ &Impl::run, impl, 0, attributes, nullptr);
+}
+
+Thread::~Thread() {
+ if (impl) {
+ CloseHandle(impl->handle);
+ delete impl;
+ }
+}
+
+void Thread::join() {
+ MARL_ASSERT(impl != nullptr, "join() called on unjoinable thread");
+ WaitForSingleObject(impl->handle, INFINITE);
+}
+
void Thread::setName(const char* fmt, ...) {
static auto setThreadDescription =
reinterpret_cast<HRESULT(WINAPI*)(HANDLE, PCWSTR)>(GetProcAddress(
@@ -57,25 +174,33 @@
}
unsigned int Thread::numLogicalCPUs() {
- DWORD_PTR processAffinityMask = 1;
- DWORD_PTR systemAffinityMask = 1;
-
- GetProcessAffinityMask(GetCurrentProcess(), &processAffinityMask,
- &systemAffinityMask);
-
- auto count = 0;
- while (processAffinityMask > 0) {
- if (processAffinityMask & 1) {
- count++;
- }
-
- processAffinityMask >>= 1;
+ unsigned int count = 0;
+ for (auto& group : getProcessorGroups()) {
+ count += group.count;
}
return count;
}
#else
+class Thread::Impl {
+ public:
+ template <typename F>
+ Impl(F&& func) : thread(func) {}
+ std::thread thread;
+};
+
+Thread::Thread(unsigned int /* logicalCpu */, const Func& func)
+ : impl(new Thread::Impl(func)) {}
+
+Thread::~Thread() {
+ delete impl;
+}
+
+void Thread::join() {
+ impl->thread.join();
+}
+
void Thread::setName(const char* fmt, ...) {
char name[1024];
va_list vararg;
@@ -96,6 +221,20 @@
return sysconf(_SC_NPROCESSORS_ONLN);
}
-#endif
+#endif // OS
+
+Thread::Thread(Thread&& rhs) : impl(rhs.impl) {
+ rhs.impl = nullptr;
+}
+
+Thread& Thread::operator=(Thread&& rhs) {
+ if (impl) {
+ delete impl;
+ impl = nullptr;
+ }
+ impl = rhs.impl;
+ rhs.impl = nullptr;
+ return *this;
+}
} // namespace marl