Merge changes I5eaceebc,I4d78c011,I7d01a3c8
* changes:
Update Marl to 38c0c7a0f
Squashed 'third_party/marl/' changes from 3c643dd4c..38c0c7a0f
Fix LLVM_VERSION_MAJOR for LLVM 10 on macOS
diff --git a/third_party/llvm-10.0/configs/darwin/include/llvm/Config/llvm-config.h b/third_party/llvm-10.0/configs/darwin/include/llvm/Config/llvm-config.h
index 3bef488..527cb0c 100644
--- a/third_party/llvm-10.0/configs/darwin/include/llvm/Config/llvm-config.h
+++ b/third_party/llvm-10.0/configs/darwin/include/llvm/Config/llvm-config.h
@@ -63,7 +63,7 @@
#define LLVM_USE_PERF 0
/* Major version of the LLVM API */
-#define LLVM_VERSION_MAJOR 11
+#define LLVM_VERSION_MAJOR 10
/* Minor version of the LLVM API */
#define LLVM_VERSION_MINOR 0
diff --git a/third_party/marl/CMakeLists.txt b/third_party/marl/CMakeLists.txt
index 4707a86..eee64fc 100644
--- a/third_party/marl/CMakeLists.txt
+++ b/third_party/marl/CMakeLists.txt
@@ -20,6 +20,12 @@
include(CheckCXXSourceCompiles)
+# MARL_IS_SUBPROJECT is 1 if added via add_subdirectory() from another project.
+get_directory_property(MARL_IS_SUBPROJECT PARENT_DIRECTORY)
+if(MARL_IS_SUBPROJECT)
+ set(MARL_IS_SUBPROJECT 1)
+endif()
+
###########################################################
# Options
###########################################################
@@ -100,8 +106,10 @@
MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED)
set(CMAKE_REQUIRED_FLAGS ${SAVE_CMAKE_REQUIRED_FLAGS})
-# Export MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED as this may be useful to parent projects
-set(MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED PARENT_SCOPE ${MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED})
+if(MARL_IS_SUBPROJECT)
+ # Export MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED as this may be useful to parent projects
+ set(MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED PARENT_SCOPE ${MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED})
+endif()
###########################################################
# File lists
diff --git a/third_party/marl/examples/BUILD.bazel b/third_party/marl/examples/BUILD.bazel
index d06f4b8..aa67646 100644
--- a/third_party/marl/examples/BUILD.bazel
+++ b/third_party/marl/examples/BUILD.bazel
@@ -41,3 +41,13 @@
"//:marl",
],
)
+
+cc_binary(
+ name = "tasks_in_tasks",
+ srcs = [
+ "tasks_in_tasks.cpp",
+ ],
+ deps = [
+ "//:marl",
+ ],
+)
diff --git a/third_party/marl/include/marl/deprecated.h b/third_party/marl/include/marl/deprecated.h
new file mode 100644
index 0000000..1f5caed
--- /dev/null
+++ b/third_party/marl/include/marl/deprecated.h
@@ -0,0 +1,36 @@
+// Copyright 2020 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef marl_deprecated_h
+#define marl_deprecated_h
+
+// Deprecated marl::Scheduler methods:
+// Scheduler(Allocator* allocator = Allocator::Default)
+// getThreadInitializer(), setThreadInitializer()
+// getWorkerThreadCount(), setWorkerThreadCount()
+#ifndef MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
+#define MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS 1
+#endif // MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
+
+#ifndef MARL_WARN_DEPRECATED
+#define MARL_WARN_DEPRECATED 0
+#endif // MARL_WARN_DEPRECATED
+
+#if MARL_WARN_DEPRECATED
+#define MARL_DEPRECATED(message) __attribute__((deprecated(message)))
+#else
+#define MARL_DEPRECATED(message)
+#endif
+
+#endif // marl_deprecated_h
diff --git a/third_party/marl/include/marl/parallelize.h b/third_party/marl/include/marl/parallelize.h
index ce75b02..f8ce6df 100644
--- a/third_party/marl/include/marl/parallelize.h
+++ b/third_party/marl/include/marl/parallelize.h
@@ -22,10 +22,10 @@
namespace detail {
-void parallelizeChain(WaitGroup&) {}
+inline void parallelizeChain(WaitGroup&) {}
template <typename F, typename... L>
-void parallelizeChain(WaitGroup& wg, F&& f, L&&... l) {
+inline void parallelizeChain(WaitGroup& wg, F&& f, L&&... l) {
schedule([=] {
f();
wg.done();
@@ -35,13 +35,26 @@
} // namespace detail
-// parallelize() schedules all the function parameters and waits for them to
-// complete. These functions may execute concurrently.
+// parallelize() invokes all the function parameters, potentially concurrently,
+// and waits for them all to complete before returning.
+//
// Each function must take no parameters.
-template <typename... FUNCTIONS>
-inline void parallelize(FUNCTIONS&&... functions) {
- WaitGroup wg(sizeof...(FUNCTIONS));
- detail::parallelizeChain(wg, functions...);
+//
+// parallelize() does the following:
+// (1) Schedules the function parameters in the parameter pack fn.
+// (2) Calls f0 on the current thread.
+// (3) Once f0 returns, waits for the scheduled functions in fn to all
+// complete.
+// As the fn functions are scheduled before running f0, it is recommended to
+// pass the function that'll take the most time as the first argument. That way
+// you'll be more likely to avoid the cost of a fiber switch.
+template <typename F0, typename... FN>
+inline void parallelize(F0&& f0, FN&&... fn) {
+ WaitGroup wg(sizeof...(FN));
+ // Schedule all the functions in fn.
+ detail::parallelizeChain(wg, std::forward<FN>(fn)...);
+ // While we wait for fn to complete, run the first function on this thread.
+ f0();
wg.wait();
}
diff --git a/third_party/marl/include/marl/scheduler.h b/third_party/marl/include/marl/scheduler.h
index 0acfdb9..7dc033f 100644
--- a/third_party/marl/include/marl/scheduler.h
+++ b/third_party/marl/include/marl/scheduler.h
@@ -16,6 +16,7 @@
#define marl_scheduler_h
#include "debug.h"
+#include "deprecated.h"
#include "memory.h"
#include "mutex.h"
#include "task.h"
@@ -50,8 +51,33 @@
public:
using TimePoint = std::chrono::system_clock::time_point;
using Predicate = std::function<bool()>;
+ using ThreadInitializer = std::function<void(int workerId)>;
- Scheduler(Allocator* allocator = Allocator::Default);
+ // Config holds scheduler configuration settings that can be passed to the
+ // Scheduler constructor.
+ struct Config {
+ // Per-worker-thread settings.
+ struct WorkerThread {
+ int count = 0;
+ ThreadInitializer initializer;
+ };
+ WorkerThread workerThread;
+
+ // Memory allocator to use for the scheduler and internal allocations.
+ Allocator* allocator = Allocator::Default;
+
+ // allCores() returns a Config with a worker thread for each of the logical
+ // cpus available to the process.
+ static Config allCores();
+
+ // Fluent setters that return this Config so set calls can be chained.
+ inline Config& setAllocator(Allocator*);
+ inline Config& setWorkerThreadCount(int);
+ inline Config& setWorkerThreadInitializer(const ThreadInitializer&);
+ };
+
+ // Constructor.
+ Scheduler(const Config&);
// Destructor.
// Blocks until the scheduler is unbound from all threads before returning.
@@ -73,24 +99,36 @@
// enqueue() queues the task for asynchronous execution.
void enqueue(Task&& task);
+ // config() returns the Config that was used to build the schededuler.
+ const Config& config() const;
+
+#if MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
+ MARL_DEPRECATED("use Scheduler::Scheduler(const Config&)")
+ Scheduler(Allocator* allocator = Allocator::Default);
+
// setThreadInitializer() sets the worker thread initializer function which
// will be called for each new worker thread spawned.
// The initializer will only be called on newly created threads (call
// setThreadInitializer() before setWorkerThreadCount()).
+ MARL_DEPRECATED("use Config::setWorkerThreadInitializer()")
void setThreadInitializer(const std::function<void()>& init);
// getThreadInitializer() returns the thread initializer function set by
// setThreadInitializer().
- const std::function<void()>& getThreadInitializer();
+ MARL_DEPRECATED("use config().workerThread.initializer")
+ std::function<void()> getThreadInitializer();
// setWorkerThreadCount() adjusts the number of dedicated worker threads.
// A count of 0 puts the scheduler into single-threaded mode.
// Note: Currently the number of threads cannot be adjusted once tasks
// have been enqueued. This restriction may be lifted at a later time.
+ MARL_DEPRECATED("use Config::setWorkerThreadCount()")
void setWorkerThreadCount(int count);
// getWorkerThreadCount() returns the number of worker threads.
+ MARL_DEPRECATED("use config().workerThread.count")
int getWorkerThreadCount();
+#endif // MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
// Fibers expose methods to perform cooperative multitasking and are
// automatically created by the Scheduler.
@@ -451,18 +489,17 @@
// The scheduler currently bound to the current thread.
static thread_local Scheduler* bound;
- Allocator* const allocator;
-
- std::function<void()> threadInitFunc;
+#if MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
+ Config cfg;
mutex threadInitFuncMutex;
+#else
+ const Config cfg;
+#endif
std::array<std::atomic<int>, 8> spinningWorkers;
std::atomic<unsigned int> nextSpinningWorkerIdx = {0x8000000};
- // TODO: Make this lot thread-safe so setWorkerThreadCount() can be called
- // during execution of tasks.
std::atomic<unsigned int> nextEnqueueIndex = {0};
- unsigned int numWorkerThreads = 0;
std::array<Worker*, MaxWorkerThreads> workerThreads;
struct SingleThreadedWorkers {
@@ -475,6 +512,28 @@
SingleThreadedWorkers singleThreadedWorkers;
};
+////////////////////////////////////////////////////////////////////////////////
+// Scheduler::Config
+////////////////////////////////////////////////////////////////////////////////
+Scheduler::Config& Scheduler::Config::setAllocator(Allocator* alloc) {
+ allocator = alloc;
+ return *this;
+}
+
+Scheduler::Config& Scheduler::Config::setWorkerThreadCount(int count) {
+ workerThread.count = count;
+ return *this;
+}
+
+Scheduler::Config& Scheduler::Config::setWorkerThreadInitializer(
+ const ThreadInitializer& initializer) {
+ workerThread.initializer = initializer;
+ return *this;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Scheduler::Fiber
+////////////////////////////////////////////////////////////////////////////////
template <typename Clock, typename Duration>
bool Scheduler::Fiber::wait(
marl::lock& lock,
diff --git a/third_party/marl/kokoro/ubuntu/presubmit-docker.sh b/third_party/marl/kokoro/ubuntu/presubmit-docker.sh
new file mode 100755
index 0000000..262ec5f
--- /dev/null
+++ b/third_party/marl/kokoro/ubuntu/presubmit-docker.sh
@@ -0,0 +1,63 @@
+#!/bin/bash
+
+set -e # Fail on any error.
+
+. /bin/using.sh # Declare the bash `using` function for configuring toolchains.
+
+set -x # Display commands being run.
+
+cd github/marl
+
+git submodule update --init
+
+using gcc-9 # Always update gcc so we get a newer standard library.
+
+if [ "$BUILD_SYSTEM" == "cmake" ]; then
+ using cmake-3.17.2
+
+ mkdir build
+ cd build
+
+ if [ "$BUILD_TOOLCHAIN" == "clang" ]; then
+ using clang-10.0.0
+ fi
+
+ EXTRA_CMAKE_FLAGS=""
+ if [ "$BUILD_TARGET_ARCH" == "x86" ]; then
+ EXTRA_CMAKE_FLAGS="-DCMAKE_CXX_FLAGS=-m32 -DCMAKE_C_FLAGS=-m32 -DCMAKE_ASM_FLAGS=-m32"
+ fi
+
+ if [ "$BUILD_SANITIZER" == "asan" ]; then
+ EXTRA_CMAKE_FLAGS="$EXTRA_CMAKE_FLAGS -DMARL_ASAN=1"
+ elif [ "$BUILD_SANITIZER" == "msan" ]; then
+ EXTRA_CMAKE_FLAGS="$EXTRA_CMAKE_FLAGS -DMARL_MSAN=1"
+ elif [ "$BUILD_SANITIZER" == "tsan" ]; then
+ EXTRA_CMAKE_FLAGS="$EXTRA_CMAKE_FLAGS -DMARL_TSAN=1"
+ fi
+
+ cmake .. ${EXTRA_CMAKE_FLAGS} \
+ -DMARL_BUILD_EXAMPLES=1 \
+ -DMARL_BUILD_TESTS=1 \
+ -DMARL_BUILD_BENCHMARKS=1 \
+ -DMARL_WARNINGS_AS_ERRORS=1
+
+ make --jobs=$(nproc)
+
+ ./marl-unittests
+ ./fractal
+ ./hello_task
+ ./primes > /dev/null
+ ./tasks_in_tasks
+
+elif [ "$BUILD_SYSTEM" == "bazel" ]; then
+ using bazel-3.1.0
+
+ bazel test //:tests --test_output=all
+ bazel run //examples:fractal
+ bazel run //examples:hello_task
+ bazel run //examples:primes > /dev/null
+ bazel run //examples:tasks_in_tasks
+else
+ echo "Unknown build system: $BUILD_SYSTEM"
+ exit 1
+fi
\ No newline at end of file
diff --git a/third_party/marl/kokoro/ubuntu/presubmit.sh b/third_party/marl/kokoro/ubuntu/presubmit.sh
index 2004b4b..20220b6 100755
--- a/third_party/marl/kokoro/ubuntu/presubmit.sh
+++ b/third_party/marl/kokoro/ubuntu/presubmit.sh
@@ -1,93 +1,17 @@
#!/bin/bash
set -e # Fail on any error.
-set -x # Display commands being run.
-BUILD_ROOT=$PWD
+ROOT_DIR=`pwd`
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd )"
-UBUNTU_VERSION=`cat /etc/os-release | grep -oP "Ubuntu \K([0-9]+\.[0-9]+)"`
-cd github/marl
-
-git submodule update --init
-
-# Always update gcc so we get a newer standard library.
-sudo add-apt-repository ppa:ubuntu-toolchain-r/test
-sudo apt-get update
-sudo apt-get install -y gcc-9-multilib g++-9-multilib linux-libc-dev:i386
-sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-9 100 --slave /usr/bin/g++ g++ /usr/bin/g++-9
-sudo update-alternatives --set gcc "/usr/bin/gcc-9"
-
-if [ "$BUILD_SYSTEM" == "cmake" ]; then
- mkdir build
- cd build
-
- if [ "$BUILD_TOOLCHAIN" == "clang" ]; then
- # Download clang tar
- CLANG_TAR="/tmp/clang-8.tar.xz"
- curl -L "https://releases.llvm.org/8.0.0/clang+llvm-8.0.0-x86_64-linux-gnu-ubuntu-${UBUNTU_VERSION}.tar.xz" > ${CLANG_TAR}
-
- # Verify clang tar
- sudo apt-get install pgpgpg
- gpg --import "${SCRIPT_DIR}/clang-8.pubkey.asc"
- gpg --verify "${SCRIPT_DIR}/clang-8-ubuntu-${UBUNTU_VERSION}.sig" ${CLANG_TAR}
- if [ $? -ne 0 ]; then
- echo "clang download failed PGP check"
- exit 1
- fi
-
- # Untar into tmp
- CLANG_DIR=/tmp/clang-8
- mkdir ${CLANG_DIR}
- tar -xf ${CLANG_TAR} -C ${CLANG_DIR}
-
- # Use clang as compiler
- export CC="${CLANG_DIR}/clang+llvm-8.0.0-x86_64-linux-gnu-ubuntu-${UBUNTU_VERSION}/bin/clang"
- export CXX="${CLANG_DIR}/clang+llvm-8.0.0-x86_64-linux-gnu-ubuntu-${UBUNTU_VERSION}/bin/clang++"
- fi
-
- extra_cmake_flags=""
- if [ "$BUILD_TARGET_ARCH" == "x86" ]; then
- extra_cmake_flags="-DCMAKE_CXX_FLAGS=-m32 -DCMAKE_C_FLAGS=-m32 -DCMAKE_ASM_FLAGS=-m32"
- fi
-
- build_and_run() {
- cmake .. ${extra_cmake_flags} \
- -DMARL_BUILD_EXAMPLES=1 \
- -DMARL_BUILD_TESTS=1 \
- -DMARL_BUILD_BENCHMARKS=1 \
- -DMARL_WARNINGS_AS_ERRORS=1 \
- $1
-
- make --jobs=$(nproc)
-
- ./marl-unittests
- ./fractal
- ./hello_task
- ./primes > /dev/null
- ./tasks_in_tasks
- }
-
- if [ "$BUILD_SANITIZER" == "asan" ]; then
- build_and_run "-DMARL_ASAN=1"
- elif [ "$BUILD_SANITIZER" == "msan" ]; then
- build_and_run "-DMARL_MSAN=1"
- elif [ "$BUILD_SANITIZER" == "tsan" ]; then
- build_and_run "-DMARL_TSAN=1"
- else
- build_and_run
- fi
-elif [ "$BUILD_SYSTEM" == "bazel" ]; then
- # Get bazel
- curl -L -k -O -s https://github.com/bazelbuild/bazel/releases/download/0.29.1/bazel-0.29.1-installer-linux-x86_64.sh
- mkdir $BUILD_ROOT/bazel
- bash bazel-0.29.1-installer-linux-x86_64.sh --prefix=$BUILD_ROOT/bazel
- rm bazel-0.29.1-installer-linux-x86_64.sh
- # Build and run
- $BUILD_ROOT/bazel/bin/bazel test //:tests --test_output=all
- $BUILD_ROOT/bazel/bin/bazel run //examples:fractal
- $BUILD_ROOT/bazel/bin/bazel run //examples:primes > /dev/null
-else
- echo "Unknown build system: $BUILD_SYSTEM"
- exit 1
-fi
\ No newline at end of file
+docker run --rm -i \
+ --volume "${ROOT_DIR}:${ROOT_DIR}" \
+ --volume "${KOKORO_ARTIFACTS_DIR}:/mnt/artifacts" \
+ --workdir "${ROOT_DIR}" \
+ --env BUILD_SYSTEM=$BUILD_SYSTEM \
+ --env BUILD_TOOLCHAIN=$BUILD_TOOLCHAIN \
+ --env BUILD_TARGET_ARCH=$BUILD_TARGET_ARCH \
+ --env BUILD_SANITIZER=$BUILD_SANITIZER \
+ --entrypoint "${SCRIPT_DIR}/presubmit-docker.sh" \
+ "gcr.io/shaderc-build/radial-build:latest"
diff --git a/third_party/marl/src/memory.cpp b/third_party/marl/src/memory.cpp
index 3ccfa3d..142d76b 100644
--- a/third_party/marl/src/memory.cpp
+++ b/third_party/marl/src/memory.cpp
@@ -19,7 +19,7 @@
#include <cstring>
-#if defined(__linux__) || defined(__APPLE__)
+#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
#include <sys/mman.h>
#include <unistd.h>
namespace {
diff --git a/third_party/marl/src/memory_test.cpp b/third_party/marl/src/memory_test.cpp
index c15af7e..8aa6334 100644
--- a/third_party/marl/src/memory_test.cpp
+++ b/third_party/marl/src/memory_test.cpp
@@ -78,6 +78,7 @@
allocator->destroy(s16);
}
+#if GTEST_HAS_DEATH_TEST
TEST_F(AllocatorTest, Guards) {
marl::Allocation::Request request;
request.alignment = 16;
@@ -88,3 +89,4 @@
EXPECT_DEATH(ptr[-1] = 1, "");
EXPECT_DEATH(ptr[marl::pageSize()] = 1, "");
}
+#endif
diff --git a/third_party/marl/src/osfiber_asm.h b/third_party/marl/src/osfiber_asm.h
index c515ca6..0afc780 100644
--- a/third_party/marl/src/osfiber_asm.h
+++ b/third_party/marl/src/osfiber_asm.h
@@ -119,9 +119,9 @@
out->context = {};
out->target = func;
out->stack = allocator->allocate(request);
- marl_fiber_set_target(&out->context, out->stack.ptr, stackSize,
- reinterpret_cast<void (*)(void*)>(&OSFiber::run),
- out.get());
+ marl_fiber_set_target(
+ &out->context, out->stack.ptr, static_cast<uint32_t>(stackSize),
+ reinterpret_cast<void (*)(void*)>(&OSFiber::run), out.get());
return out;
}
diff --git a/third_party/marl/src/osfiber_windows.h b/third_party/marl/src/osfiber_windows.h
index 699001f..7a43b08 100644
--- a/third_party/marl/src/osfiber_windows.h
+++ b/third_party/marl/src/osfiber_windows.h
@@ -64,10 +64,10 @@
Allocator::unique_ptr<OSFiber> OSFiber::createFiberFromCurrentThread(
Allocator* allocator) {
auto out = allocator->make_unique<OSFiber>();
- out->fiber = ConvertThreadToFiber(nullptr);
+ out->fiber = ConvertThreadToFiberEx(nullptr,FIBER_FLAG_FLOAT_SWITCH);
out->isFiberFromThread = true;
MARL_ASSERT(out->fiber != nullptr,
- "ConvertThreadToFiber() failed with error 0x%x",
+ "ConvertThreadToFiberEx() failed with error 0x%x",
int(GetLastError()));
return out;
}
@@ -77,9 +77,10 @@
size_t stackSize,
const std::function<void()>& func) {
auto out = allocator->make_unique<OSFiber>();
- out->fiber = CreateFiber(stackSize, &OSFiber::run, out.get());
+ // stackSize is rounded up to the system's allocation granularity (typically 64 KB).
+ out->fiber = CreateFiberEx(stackSize - 1,stackSize,FIBER_FLAG_FLOAT_SWITCH,&OSFiber::run, out.get());
out->target = func;
- MARL_ASSERT(out->fiber != nullptr, "CreateFiber() failed with error 0x%x",
+ MARL_ASSERT(out->fiber != nullptr, "CreateFiberEx() failed with error 0x%x",
int(GetLastError()));
return out;
}
diff --git a/third_party/marl/src/scheduler.cpp b/third_party/marl/src/scheduler.cpp
index b6443d1..384f67d 100644
--- a/third_party/marl/src/scheduler.cpp
+++ b/third_party/marl/src/scheduler.cpp
@@ -100,8 +100,8 @@
bound = this;
{
marl::lock lock(singleThreadedWorkers.mutex);
- auto worker =
- allocator->make_unique<Worker>(this, Worker::Mode::SingleThreaded, -1);
+ auto worker = cfg.allocator->make_unique<Worker>(
+ this, Worker::Mode::SingleThreaded, -1);
worker->start();
auto tid = std::this_thread::get_id();
singleThreadedWorkers.byTid.emplace(tid, std::move(worker));
@@ -110,7 +110,7 @@
void Scheduler::unbind() {
MARL_ASSERT(bound != nullptr, "No scheduler bound");
- auto worker = Scheduler::Worker::getCurrent();
+ auto worker = Worker::getCurrent();
worker->stop();
{
marl::lock lock(bound->singleThreadedWorkers.mutex);
@@ -127,11 +127,17 @@
bound = nullptr;
}
-Scheduler::Scheduler(Allocator* allocator /* = Allocator::Default */)
- : allocator(allocator), workerThreads{} {
+Scheduler::Scheduler(const Config& config) : cfg(config), workerThreads{} {
for (size_t i = 0; i < spinningWorkers.size(); i++) {
spinningWorkers[i] = -1;
}
+ for (int i = 0; i < cfg.workerThread.count; i++) {
+ workerThreads[i] =
+ cfg.allocator->create<Worker>(this, Worker::Mode::MultiThreaded, i);
+ }
+ for (int i = 0; i < cfg.workerThread.count; i++) {
+ workerThreads[i]->start();
+ }
}
Scheduler::~Scheduler() {
@@ -146,17 +152,35 @@
// Release all worker threads.
// This will wait for all in-flight tasks to complete before returning.
- setWorkerThreadCount(0);
+ for (int i = cfg.workerThread.count - 1; i >= 0; i--) {
+ workerThreads[i]->stop();
+ }
+ for (int i = cfg.workerThread.count - 1; i >= 0; i--) {
+ cfg.allocator->destroy(workerThreads[i]);
+ }
+}
+
+#if MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
+Scheduler::Scheduler(Allocator* allocator /* = Allocator::Default */)
+ : workerThreads{} {
+ cfg.allocator = allocator;
+ for (size_t i = 0; i < spinningWorkers.size(); i++) {
+ spinningWorkers[i] = -1;
+ }
}
void Scheduler::setThreadInitializer(const std::function<void()>& init) {
marl::lock lock(threadInitFuncMutex);
- threadInitFunc = init;
+ cfg.workerThread.initializer = [=](int) { init(); };
}
-const std::function<void()>& Scheduler::getThreadInitializer() {
+std::function<void()> Scheduler::getThreadInitializer() {
marl::lock lock(threadInitFuncMutex);
- return threadInitFunc;
+ if (!cfg.workerThread.initializer) {
+ return {};
+ }
+ auto init = cfg.workerThread.initializer;
+ return [=]() { init(0); };
}
void Scheduler::setWorkerThreadCount(int newCount) {
@@ -169,33 +193,34 @@
newCount, int(MaxWorkerThreads), int(MaxWorkerThreads));
newCount = MaxWorkerThreads;
}
- auto oldCount = numWorkerThreads;
+ auto oldCount = cfg.workerThread.count;
for (int idx = oldCount - 1; idx >= newCount; idx--) {
workerThreads[idx]->stop();
}
for (int idx = oldCount - 1; idx >= newCount; idx--) {
- allocator->destroy(workerThreads[idx]);
+ cfg.allocator->destroy(workerThreads[idx]);
}
for (int idx = oldCount; idx < newCount; idx++) {
workerThreads[idx] =
- allocator->create<Worker>(this, Worker::Mode::MultiThreaded, idx);
+ cfg.allocator->create<Worker>(this, Worker::Mode::MultiThreaded, idx);
}
- numWorkerThreads = newCount;
+ cfg.workerThread.count = newCount;
for (int idx = oldCount; idx < newCount; idx++) {
workerThreads[idx]->start();
}
}
int Scheduler::getWorkerThreadCount() {
- return numWorkerThreads;
+ return cfg.workerThread.count;
}
+#endif // MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
void Scheduler::enqueue(Task&& task) {
if (task.is(Task::Flags::SameThread)) {
- Scheduler::Worker::getCurrent()->enqueue(std::move(task));
+ Worker::getCurrent()->enqueue(std::move(task));
return;
}
- if (numWorkerThreads > 0) {
+ if (cfg.workerThread.count > 0) {
while (true) {
// Prioritize workers that have recently started spinning.
auto i = --nextSpinningWorkerIdx % spinningWorkers.size();
@@ -203,7 +228,7 @@
if (idx < 0) {
// If a spinning worker couldn't be found, round-robin the
// workers.
- idx = nextEnqueueIndex++ % numWorkerThreads;
+ idx = nextEnqueueIndex++ % cfg.workerThread.count;
}
auto worker = workerThreads[idx];
@@ -213,15 +238,23 @@
}
}
} else {
- auto worker = Worker::getCurrent();
- MARL_ASSERT(worker, "singleThreadedWorker not found");
- worker->enqueue(std::move(task));
+ if (auto worker = Worker::getCurrent()) {
+ worker->enqueue(std::move(task));
+ } else {
+ MARL_FATAL(
+ "singleThreadedWorker not found. Did you forget to call "
+ "marl::Scheduler::bind()?");
+ }
}
}
+const Scheduler::Config& Scheduler::config() const {
+ return cfg;
+}
+
bool Scheduler::stealWork(Worker* thief, uint64_t from, Task& out) {
- if (numWorkerThreads > 0) {
- auto thread = workerThreads[from % numWorkerThreads];
+ if (cfg.workerThread.count > 0) {
+ auto thread = workerThreads[from % cfg.workerThread.count];
if (thread != thief) {
if (thread->steal(out)) {
return true;
@@ -237,15 +270,22 @@
}
////////////////////////////////////////////////////////////////////////////////
-// Fiber
+// Scheduler::Config
+////////////////////////////////////////////////////////////////////////////////
+Scheduler::Config Scheduler::Config::allCores() {
+ return Config().setWorkerThreadCount(Thread::numLogicalCPUs());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Scheduler::Fiber
////////////////////////////////////////////////////////////////////////////////
Scheduler::Fiber::Fiber(Allocator::unique_ptr<OSFiber>&& impl, uint32_t id)
- : id(id), impl(std::move(impl)), worker(Scheduler::Worker::getCurrent()) {
+ : id(id), impl(std::move(impl)), worker(Worker::getCurrent()) {
MARL_ASSERT(worker != nullptr, "No Scheduler::Worker bound");
}
Scheduler::Fiber* Scheduler::Fiber::current() {
- auto worker = Scheduler::Worker::getCurrent();
+ auto worker = Worker::getCurrent();
return worker != nullptr ? worker->getCurrentFiber() : nullptr;
}
@@ -367,13 +407,13 @@
thread = Thread(id, [=] {
Thread::setName("Thread<%.2d>", int(id));
- if (auto const& initFunc = scheduler->getThreadInitializer()) {
- initFunc();
+ if (auto const& initFunc = scheduler->cfg.workerThread.initializer) {
+ initFunc(id);
}
Scheduler::bound = scheduler;
Worker::current = this;
- mainFiber = Fiber::createFromCurrentThread(scheduler->allocator, 0);
+ mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0);
currentFiber = mainFiber.get();
{
marl::lock lock(work.mutex);
@@ -386,7 +426,7 @@
case Mode::SingleThreaded:
Worker::current = this;
- mainFiber = Fiber::createFromCurrentThread(scheduler->allocator, 0);
+ mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0);
currentFiber = mainFiber.get();
break;
@@ -709,7 +749,7 @@
Scheduler::Fiber* Scheduler::Worker::createWorkerFiber() {
auto fiberId = static_cast<uint32_t>(workerFibers.size() + 1);
DBG_LOG("%d: CREATE(%d)", (int)id, (int)fiberId);
- auto fiber = Fiber::create(scheduler->allocator, fiberId, FiberStackSize,
+ auto fiber = Fiber::create(scheduler->cfg.allocator, fiberId, FiberStackSize,
[&]() REQUIRES(work.mutex) { run(); });
auto ptr = fiber.get();
workerFibers.push_back(std::move(fiber));
diff --git a/third_party/marl/src/scheduler_test.cpp b/third_party/marl/src/scheduler_test.cpp
index 245c334..b07417f 100644
--- a/third_party/marl/src/scheduler_test.cpp
+++ b/third_party/marl/src/scheduler_test.cpp
@@ -15,6 +15,7 @@
#include "marl_test.h"
#include "marl/defer.h"
+#include "marl/event.h"
#include "marl/waitgroup.h"
#include <atomic>
@@ -117,13 +118,12 @@
for (int i = 0; i < num_threads; i++) {
threads.push_back(std::thread([=] {
scheduler->bind();
+ defer(scheduler->unbind());
auto threadID = std::this_thread::get_id();
fence.wait();
ASSERT_EQ(threadID, std::this_thread::get_id());
wg.done();
-
- scheduler->unbind();
}));
}
// just to try and get some tasks to yield.
@@ -139,6 +139,7 @@
TEST_F(WithoutBoundScheduler, TasksOnlyScheduledOnWorkerThreads) {
auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler());
scheduler->bind();
+ defer(scheduler->unbind());
scheduler->setWorkerThreadCount(8);
std::mutex mutex;
std::unordered_set<std::thread::id> threads;
@@ -155,6 +156,47 @@
ASSERT_LE(threads.size(), 8U);
ASSERT_EQ(threads.count(std::this_thread::get_id()), 0U);
+}
- scheduler->unbind();
-}
\ No newline at end of file
+// Test that a marl::Scheduler *with dedicated worker threads* can be used
+// without first binding to the scheduling thread.
+TEST_F(WithoutBoundScheduler, ScheduleMTWWithNoBind) {
+ auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler());
+ scheduler->setWorkerThreadCount(8);
+
+ marl::WaitGroup wg;
+ for (int i = 0; i < 100; i++) {
+ wg.add(1);
+
+ marl::Event event;
+ scheduler->enqueue(marl::Task([event, wg] {
+ event.wait(); // Test that tasks can wait on other tasks.
+ wg.done();
+ }));
+
+ scheduler->enqueue(marl::Task([event, &scheduler] {
+ // Despite the main thread never binding the scheduler, the scheduler
+ // should be automatically bound to worker threads.
+ ASSERT_EQ(marl::Scheduler::get(), scheduler.get());
+
+ event.signal();
+ }));
+ }
+
+ // As the scheduler has not been bound to the main thread, the wait() call
+ // here will block **without** fiber yielding.
+ wg.wait();
+}
+
+// Test that a marl::Scheduler *without dedicated worker threads* cannot be used
+// without first binding to the scheduling thread.
+TEST_F(WithoutBoundScheduler, ScheduleSTWWithNoBind) {
+ auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler());
+
+#if MARL_DEBUG_ENABLED && GTEST_HAS_DEATH_TEST
+ EXPECT_DEATH(scheduler->enqueue(marl::Task([] {})),
+ "Did you forget to call marl::Scheduler::bind");
+#elif !MARL_DEBUG_ENABLED
+ scheduler->enqueue(marl::Task([] { FAIL() << "Should not be called"; }));
+#endif
+}
diff --git a/third_party/marl/src/thread.cpp b/third_party/marl/src/thread.cpp
index afb4432..c2ac40d 100644
--- a/third_party/marl/src/thread.cpp
+++ b/third_party/marl/src/thread.cpp
@@ -31,6 +31,11 @@
#include <pthread.h>
#include <unistd.h>
#include <thread>
+#elif defined(__FreeBSD__)
+#include <pthread.h>
+#include <pthread_np.h>
+#include <unistd.h>
+#include <thread>
#else
#include <pthread.h>
#include <unistd.h>
@@ -211,6 +216,8 @@
#if defined(__APPLE__)
pthread_setname_np(name);
+#elif defined(__FreeBSD__)
+ pthread_set_name_np(pthread_self(), name);
#elif !defined(__Fuchsia__)
pthread_setname_np(pthread_self(), name);
#endif
@@ -219,7 +226,7 @@
}
unsigned int Thread::numLogicalCPUs() {
- return sysconf(_SC_NPROCESSORS_ONLN);
+ return static_cast<unsigned int>(sysconf(_SC_NPROCESSORS_ONLN));
}
#endif // OS
diff --git a/third_party/marl/src/waitgroup_test.cpp b/third_party/marl/src/waitgroup_test.cpp
index 3756f2f..e6858ae 100644
--- a/third_party/marl/src/waitgroup_test.cpp
+++ b/third_party/marl/src/waitgroup_test.cpp
@@ -22,14 +22,14 @@
wg.done();
}
-#if MARL_DEBUG_ENABLED
+#if MARL_DEBUG_ENABLED && GTEST_HAS_DEATH_TEST
TEST_F(WithoutBoundScheduler, WaitGroupDoneTooMany) {
marl::WaitGroup wg(2); // Should not require a scheduler.
wg.done();
wg.done();
EXPECT_DEATH(wg.done(), "done\\(\\) called too many times");
}
-#endif // MARL_DEBUG_ENABLED
+#endif // MARL_DEBUG_ENABLED && GTEST_HAS_DEATH_TEST
TEST_P(WithBoundScheduler, WaitGroup_OneTask) {
marl::WaitGroup wg(1);