| // Copyright 2019 The Marl Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include "marl/thread.h" |
| |
| #include "marl/debug.h" |
| #include "marl/defer.h" |
| #include "marl/trace.h" |
| |
| #include <algorithm> // std::sort |
| |
| #include <cstdarg> |
| #include <cstdio> |
| |
| #if defined(_WIN32) |
| #define WIN32_LEAN_AND_MEAN 1 |
| #include <windows.h> |
| #include <array> |
| #include <cstdlib> // mbstowcs |
| #include <limits> // std::numeric_limits |
| #include <vector> |
| #undef max |
| #elif defined(__APPLE__) |
| #include <mach/thread_act.h> |
| #include <pthread.h> |
| #include <unistd.h> |
| #include <thread> |
| #elif defined(__FreeBSD__) |
| #include <pthread.h> |
| #include <pthread_np.h> |
| #include <unistd.h> |
| #include <thread> |
| #else |
| #include <pthread.h> |
| #include <unistd.h> |
| #include <thread> |
| #endif |
| |
| namespace { |
| |
| struct CoreHasher { |
| inline uint64_t operator()(const marl::Thread::Core& core) const { |
| return core.pthread.index; |
| } |
| }; |
| |
| } // anonymous namespace |
| |
| namespace marl { |
| |
| #if defined(_WIN32) |
| static constexpr size_t MaxCoreCount = |
| std::numeric_limits<decltype(Thread::Core::windows.index)>::max() + 1ULL; |
| static constexpr size_t MaxGroupCount = |
| std::numeric_limits<decltype(Thread::Core::windows.group)>::max() + 1ULL; |
| static_assert(sizeof(KAFFINITY) * 8ULL <= MaxCoreCount, |
| "Thread::Core::windows.index is too small"); |
| |
| namespace { |
| #define CHECK_WIN32(expr) \ |
| do { \ |
| auto res = expr; \ |
| (void)res; \ |
| MARL_ASSERT(res == TRUE, #expr " failed with error: %d", \ |
| (int)GetLastError()); \ |
| } while (false) |
| |
| struct ProcessorGroup { |
| unsigned int count; // number of logical processors in this group. |
| KAFFINITY affinity; // affinity mask. |
| }; |
| |
| struct ProcessorGroups { |
| std::array<ProcessorGroup, MaxGroupCount> groups; |
| size_t count; |
| }; |
| |
| const ProcessorGroups& getProcessorGroups() { |
| static ProcessorGroups groups = [] { |
| ProcessorGroups out = {}; |
| SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX info[32] = {}; |
| DWORD size = sizeof(info); |
| CHECK_WIN32(GetLogicalProcessorInformationEx(RelationGroup, info, &size)); |
| DWORD count = size / sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX); |
| for (DWORD i = 0; i < count; i++) { |
| if (info[i].Relationship == RelationGroup) { |
| auto groupCount = info[i].Group.ActiveGroupCount; |
| for (WORD groupIdx = 0; groupIdx < groupCount; groupIdx++) { |
| auto const& groupInfo = info[i].Group.GroupInfo[groupIdx]; |
| out.groups[out.count++] = ProcessorGroup{ |
| groupInfo.ActiveProcessorCount, groupInfo.ActiveProcessorMask}; |
| MARL_ASSERT(out.count <= MaxGroupCount, "Group index overflow"); |
| } |
| } |
| } |
| return out; |
| }(); |
| return groups; |
| } |
| } // namespace |
| #endif // defined(_WIN32) |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Thread::Affinty |
| //////////////////////////////////////////////////////////////////////////////// |
| |
| Thread::Affinity::Affinity(Allocator* allocator) : cores(allocator) {} |
| Thread::Affinity::Affinity(Affinity&& other) : cores(std::move(other.cores)) {} |
| Thread::Affinity::Affinity(const Affinity& other, Allocator* allocator) |
| : cores(other.cores, allocator) {} |
| |
| Thread::Affinity::Affinity(std::initializer_list<Core> list, |
| Allocator* allocator) |
| : cores(allocator) { |
| cores.reserve(list.size()); |
| for (auto core : list) { |
| cores.push_back(core); |
| } |
| } |
| |
| Thread::Affinity::Affinity(const containers::vector<Core, 32>& coreList, |
| Allocator* allocator) |
| : cores(coreList, allocator) {} |
| |
| Thread::Affinity Thread::Affinity::all( |
| Allocator* allocator /* = Allocator::Default */) { |
| Thread::Affinity affinity(allocator); |
| |
| #if defined(_WIN32) |
| const auto& groups = getProcessorGroups(); |
| for (size_t groupIdx = 0; groupIdx < groups.count; groupIdx++) { |
| const auto& group = groups.groups[groupIdx]; |
| Core core; |
| core.windows.group = static_cast<decltype(Core::windows.group)>(groupIdx); |
| for (unsigned int coreIdx = 0; coreIdx < group.count; coreIdx++) { |
| if ((group.affinity >> coreIdx) & 1) { |
| core.windows.index = static_cast<decltype(core.windows.index)>(coreIdx); |
| affinity.cores.emplace_back(std::move(core)); |
| } |
| } |
| } |
| #elif defined(__linux__) && !defined(__ANDROID__) |
| auto thread = pthread_self(); |
| cpu_set_t cpuset; |
| CPU_ZERO(&cpuset); |
| if (pthread_getaffinity_np(thread, sizeof(cpu_set_t), &cpuset) == 0) { |
| int count = CPU_COUNT(&cpuset); |
| for (int i = 0; i < count; i++) { |
| Core core; |
| core.pthread.index = static_cast<uint16_t>(i); |
| affinity.cores.emplace_back(std::move(core)); |
| } |
| } |
| #elif defined(__FreeBSD__) |
| auto thread = pthread_self(); |
| cpuset_t cpuset; |
| CPU_ZERO(&cpuset); |
| if (pthread_getaffinity_np(thread, sizeof(cpuset_t), &cpuset) == 0) { |
| int count = CPU_COUNT(&cpuset); |
| for (int i = 0; i < count; i++) { |
| Core core; |
| core.pthread.index = static_cast<uint16_t>(i); |
| affinity.cores.emplace_back(std::move(core)); |
| } |
| } |
| #else |
| static_assert(!supported, |
| "marl::Thread::Affinity::supported is true, but " |
| "Thread::Affinity::all() is not implemented for this platform"); |
| #endif |
| |
| return affinity; |
| } |
| |
| std::shared_ptr<Thread::Affinity::Policy> Thread::Affinity::Policy::anyOf( |
| Affinity&& affinity, |
| Allocator* allocator /* = Allocator::Default */) { |
| struct Policy : public Thread::Affinity::Policy { |
| Affinity affinity; |
| Policy(Affinity&& affinity) : affinity(std::move(affinity)) {} |
| |
| Affinity get(uint32_t threadId, Allocator* allocator) const override { |
| #if defined(_WIN32) |
| auto count = affinity.count(); |
| if (count == 0) { |
| return Affinity(affinity, allocator); |
| } |
| auto group = affinity[threadId % affinity.count()].windows.group; |
| Affinity out(allocator); |
| out.cores.reserve(count); |
| for (auto core : affinity.cores) { |
| if (core.windows.group == group) { |
| out.cores.push_back(core); |
| } |
| } |
| return out; |
| #else |
| return Affinity(affinity, allocator); |
| #endif |
| } |
| }; |
| |
| return allocator->make_shared<Policy>(std::move(affinity)); |
| } |
| |
| std::shared_ptr<Thread::Affinity::Policy> Thread::Affinity::Policy::oneOf( |
| Affinity&& affinity, |
| Allocator* allocator /* = Allocator::Default */) { |
| struct Policy : public Thread::Affinity::Policy { |
| Affinity affinity; |
| Policy(Affinity&& affinity) : affinity(std::move(affinity)) {} |
| |
| Affinity get(uint32_t threadId, Allocator* allocator) const override { |
| auto count = affinity.count(); |
| if (count == 0) { |
| return Affinity(affinity, allocator); |
| } |
| return Affinity({affinity[threadId % affinity.count()]}, allocator); |
| } |
| }; |
| |
| return allocator->make_shared<Policy>(std::move(affinity)); |
| } |
| |
| size_t Thread::Affinity::count() const { |
| return cores.size(); |
| } |
| |
| Thread::Core Thread::Affinity::operator[](size_t index) const { |
| return cores[index]; |
| } |
| |
| Thread::Affinity& Thread::Affinity::add(const Thread::Affinity& other) { |
| containers::unordered_set<Core, CoreHasher> set(cores.allocator); |
| for (auto core : cores) { |
| set.emplace(core); |
| } |
| for (auto core : other.cores) { |
| if (set.count(core) == 0) { |
| cores.push_back(core); |
| } |
| } |
| std::sort(cores.begin(), cores.end()); |
| return *this; |
| } |
| |
| Thread::Affinity& Thread::Affinity::remove(const Thread::Affinity& other) { |
| containers::unordered_set<Core, CoreHasher> set(cores.allocator); |
| for (auto core : other.cores) { |
| set.emplace(core); |
| } |
| for (size_t i = 0; i < cores.size(); i++) { |
| if (set.count(cores[i]) != 0) { |
| cores[i] = cores.back(); |
| cores.resize(cores.size() - 1); |
| } |
| } |
| std::sort(cores.begin(), cores.end()); |
| return *this; |
| } |
| |
| #if defined(_WIN32) |
| |
| class Thread::Impl { |
| public: |
| Impl(Func&& func) : func(std::move(func)) {} |
| static DWORD WINAPI run(void* self) { |
| reinterpret_cast<Impl*>(self)->func(); |
| return 0; |
| } |
| |
| Func func; |
| HANDLE handle; |
| }; |
| |
| Thread::Thread(Affinity&& affinity, Func&& func) { |
| SIZE_T size = 0; |
| InitializeProcThreadAttributeList(nullptr, 1, 0, &size); |
| MARL_ASSERT(size > 0, |
| "InitializeProcThreadAttributeList() did not give a size"); |
| |
| std::vector<uint8_t> buffer(size); |
| LPPROC_THREAD_ATTRIBUTE_LIST attributes = |
| reinterpret_cast<LPPROC_THREAD_ATTRIBUTE_LIST>(buffer.data()); |
| CHECK_WIN32(InitializeProcThreadAttributeList(attributes, 1, 0, &size)); |
| defer(DeleteProcThreadAttributeList(attributes)); |
| |
| GROUP_AFFINITY groupAffinity = {}; |
| |
| auto count = affinity.count(); |
| if (count > 0) { |
| groupAffinity.Group = affinity[0].windows.group; |
| for (size_t i = 0; i < count; i++) { |
| auto core = affinity[i]; |
| MARL_ASSERT(groupAffinity.Group == core.windows.group, |
| "Cannot create thread that uses multiple affinity groups"); |
| groupAffinity.Mask |= (1ULL << core.windows.index); |
| } |
| CHECK_WIN32(UpdateProcThreadAttribute( |
| attributes, 0, PROC_THREAD_ATTRIBUTE_GROUP_AFFINITY, &groupAffinity, |
| sizeof(groupAffinity), nullptr, nullptr)); |
| } |
| |
| impl = new Impl(std::move(func)); |
| impl->handle = CreateRemoteThreadEx(GetCurrentProcess(), nullptr, 0, |
| &Impl::run, impl, 0, attributes, nullptr); |
| } |
| |
| Thread::~Thread() { |
| if (impl) { |
| CloseHandle(impl->handle); |
| delete impl; |
| } |
| } |
| |
| void Thread::join() { |
| MARL_ASSERT(impl != nullptr, "join() called on unjoinable thread"); |
| WaitForSingleObject(impl->handle, INFINITE); |
| } |
| |
| void Thread::setName(const char* fmt, ...) { |
| static auto setThreadDescription = |
| reinterpret_cast<HRESULT(WINAPI*)(HANDLE, PCWSTR)>(GetProcAddress( |
| GetModuleHandleA("kernelbase.dll"), "SetThreadDescription")); |
| if (setThreadDescription == nullptr) { |
| return; |
| } |
| |
| char name[1024]; |
| va_list vararg; |
| va_start(vararg, fmt); |
| vsnprintf(name, sizeof(name), fmt, vararg); |
| va_end(vararg); |
| |
| wchar_t wname[1024]; |
| mbstowcs(wname, name, 1024); |
| setThreadDescription(GetCurrentThread(), wname); |
| MARL_NAME_THREAD("%s", name); |
| } |
| |
| unsigned int Thread::numLogicalCPUs() { |
| unsigned int count = 0; |
| const auto& groups = getProcessorGroups(); |
| for (size_t groupIdx = 0; groupIdx < groups.count; groupIdx++) { |
| const auto& group = groups.groups[groupIdx]; |
| count += group.count; |
| } |
| return count; |
| } |
| |
| #else |
| |
| class Thread::Impl { |
| public: |
| Impl(Affinity&& affinity, Thread::Func&& f) |
| : affinity(std::move(affinity)), func(std::move(f)), thread([this] { |
| setAffinity(); |
| func(); |
| }) {} |
| |
| Affinity affinity; |
| Func func; |
| std::thread thread; |
| |
| void setAffinity() { |
| auto count = affinity.count(); |
| if (count == 0) { |
| return; |
| } |
| |
| #if defined(__linux__) && !defined(__ANDROID__) |
| cpu_set_t cpuset; |
| CPU_ZERO(&cpuset); |
| for (size_t i = 0; i < count; i++) { |
| CPU_SET(affinity[i].pthread.index, &cpuset); |
| } |
| auto thread = pthread_self(); |
| pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); |
| #elif defined(__FreeBSD__) |
| cpuset_t cpuset; |
| CPU_ZERO(&cpuset); |
| for (size_t i = 0; i < count; i++) { |
| CPU_SET(affinity[i].pthread.index, &cpuset); |
| } |
| auto thread = pthread_self(); |
| pthread_setaffinity_np(thread, sizeof(cpuset_t), &cpuset); |
| #else |
| MARL_ASSERT(!marl::Thread::Affinity::supported, |
| "Attempting to use thread affinity on a unsupported platform"); |
| #endif |
| } |
| }; |
| |
| Thread::Thread(Affinity&& affinity, Func&& func) |
| : impl(new Thread::Impl(std::move(affinity), std::move(func))) {} |
| |
| Thread::~Thread() { |
| MARL_ASSERT(!impl, "Thread::join() was not called before destruction"); |
| } |
| |
| void Thread::join() { |
| impl->thread.join(); |
| delete impl; |
| impl = nullptr; |
| } |
| |
| void Thread::setName(const char* fmt, ...) { |
| char name[1024]; |
| va_list vararg; |
| va_start(vararg, fmt); |
| vsnprintf(name, sizeof(name), fmt, vararg); |
| va_end(vararg); |
| |
| #if defined(__APPLE__) |
| pthread_setname_np(name); |
| #elif defined(__FreeBSD__) |
| pthread_set_name_np(pthread_self(), name); |
| #elif !defined(__Fuchsia__) |
| pthread_setname_np(pthread_self(), name); |
| #endif |
| |
| MARL_NAME_THREAD("%s", name); |
| } |
| |
| unsigned int Thread::numLogicalCPUs() { |
| return static_cast<unsigned int>(sysconf(_SC_NPROCESSORS_ONLN)); |
| } |
| |
| #endif // OS |
| |
| Thread::Thread(Thread&& rhs) : impl(rhs.impl) { |
| rhs.impl = nullptr; |
| } |
| |
| Thread& Thread::operator=(Thread&& rhs) { |
| if (impl) { |
| delete impl; |
| impl = nullptr; |
| } |
| impl = rhs.impl; |
| rhs.impl = nullptr; |
| return *this; |
| } |
| |
| } // namespace marl |