Update Marl to 94a361cf0

Contains a fix for an arm linker error (chromium:1058107) and
scaling beyond 64 threads on Windows.

Changes:
    94a361cf0 thread.cpp Fix minor bug in getProcessorGroups()
    0249a2624 Thread: Use WaitForObject for Thread::join()
    62f209bbb arm: Annotate marl_fiber_swap as type %function
    00f091e08 Update README.md
    773d9f475 Add new example 'tasks_in_tasks'
    3f69e73ce Scheduler: Replace use of std::thread with marl::Thread

Commands:
    git subtree pull --prefix third_party/marl https://github.com/google/marl master --squash

Bug: b/140546382
Bug: chromium:1058107
Change-Id: Ic5a96ad5f054a19f6a5a77e1f106c73ba60c0a78
diff --git a/third_party/marl/CMakeLists.txt b/third_party/marl/CMakeLists.txt
index 1119719..54690f9 100644
--- a/third_party/marl/CMakeLists.txt
+++ b/third_party/marl/CMakeLists.txt
@@ -264,4 +264,5 @@
     build_example(fractal)
     build_example(hello_task)
     build_example(primes)
+    build_example(tasks_in_tasks)
 endif(MARL_BUILD_EXAMPLES)
diff --git a/third_party/marl/README.md b/third_party/marl/README.md
index 3fd10b4..cea4522 100644
--- a/third_party/marl/README.md
+++ b/third_party/marl/README.md
@@ -65,6 +65,12 @@
 }
 ```
 
+
+## Benchmarks
+
+Graphs of several microbenchmarks can be found [here](https://google.github.io/marl/benchmarks).
+
+
 ## Building
 
 Marl contains many unit tests and examples that can be built using CMake.
@@ -117,10 +123,6 @@
 add_subdirectory(${MARL_DIR})
 ```
 
-## Benchmarks
-
-Graphs of several microbenchmarks can be found [here](https://google.github.io/marl/benchmarks).
-
 ---
 
 Note: This is not an officially supported Google product
diff --git a/third_party/marl/examples/tasks_in_tasks.cpp b/third_party/marl/examples/tasks_in_tasks.cpp
new file mode 100644
index 0000000..f34c476
--- /dev/null
+++ b/third_party/marl/examples/tasks_in_tasks.cpp
@@ -0,0 +1,81 @@
+// Copyright 2020 The Marl Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Example of a task that creates and waits on sub tasks.
+
+#include "marl/defer.h"
+#include "marl/scheduler.h"
+#include "marl/waitgroup.h"
+
+#include <cstdio>
+
+int main() {
+  // Create a marl scheduler using the 4 hardware threads.
+  // Bind this scheduler to the main thread so we can call marl::schedule()
+  marl::Scheduler scheduler;
+  scheduler.bind();
+  scheduler.setWorkerThreadCount(4);
+  defer(scheduler.unbind());  // Automatically unbind before returning.
+
+  // marl::schedule() requires the scheduler to be bound to the current thread
+  // (see above). The scheduler ensures that tasks are run on a thread with the
+  // same scheduler automatically bound, so we don't need to call
+  // marl::Scheduler::bind() again below.
+
+  // Sequence of task events:
+  //   __________________________________________________________
+  //  |                                                          |
+  //  |               ---> [task B] ----                         |
+  //  |             /                    \                       |
+  //  |  [task A] -----> [task A: wait] -----> [task A: resume]  |
+  //  |             \                    /                       |
+  //  |               ---> [task C] ----                         |
+  //  |__________________________________________________________|
+
+  // Create a WaitGroup for waiting for task A to finish.
+  // This has an initial count of 1 (A)
+  marl::WaitGroup a_wg(1);
+
+  // Schedule task A
+  marl::schedule([=] {
+    defer(a_wg.done());  // Decrement a_wg when task A is done
+
+    printf("Hello from task A\n");
+    printf("Starting tasks B and C...\n");
+
+    // Create a WaitGroup for waiting on task B and C to finish.
+    // This has an initial count of 2 (B + C)
+    marl::WaitGroup bc_wg(2);
+
+    // Schedule task B
+    marl::schedule([=] {
+      defer(bc_wg.done());  // Decrement bc_wg when task B is done
+      printf("Hello from task B\n");
+    });
+
+    // Schedule task C
+    marl::schedule([=] {
+      defer(bc_wg.done());  // Decrement bc_wg when task C is done
+      printf("Hello from task C\n");
+    });
+
+    // Wait for tasks B and C to finish.
+    bc_wg.wait();
+  });
+
+  // Wait for task A (and so B and C) to finish.
+  a_wg.wait();
+
+  printf("Task A has finished\n");
+}
diff --git a/third_party/marl/include/marl/scheduler.h b/third_party/marl/include/marl/scheduler.h
index 03d121c..e94d035 100644
--- a/third_party/marl/include/marl/scheduler.h
+++ b/third_party/marl/include/marl/scheduler.h
@@ -18,6 +18,7 @@
 #include "debug.h"
 #include "memory.h"
 #include "sal.h"
+#include "thread.h"
 
 #include <array>
 #include <atomic>
@@ -411,7 +412,7 @@
     Scheduler* const scheduler;
     Allocator::unique_ptr<Fiber> mainFiber;
     Fiber* currentFiber = nullptr;
-    std::thread thread;
+    Thread thread;
     Work work;
     FiberSet idleFibers;  // Fibers that have completed which can be reused.
     std::vector<Allocator::unique_ptr<Fiber>>
diff --git a/third_party/marl/include/marl/thread.h b/third_party/marl/include/marl/thread.h
index 7ce20b3..f18bddd 100644
--- a/third_party/marl/include/marl/thread.h
+++ b/third_party/marl/include/marl/thread.h
@@ -15,12 +15,34 @@
 #ifndef marl_thread_h
 #define marl_thread_h
 
+#include <functional>
+
 namespace marl {
 
-// Thread contains static methods that abstract OS-specific thread / cpu
+// Thread provides an OS abstraction for threads of execution.
+// Thread is used by marl instead of std::thread as Windows does not naturally
+// scale beyond 64 logical threads on a single CPU, unless you use the Win32
+// API.
+// Thread alsocontains static methods that abstract OS-specific thread / cpu
 // queries and control.
 class Thread {
  public:
+  using Func = std::function<void()>;
+
+  Thread() = default;
+  Thread(Thread&&);
+  Thread& operator=(Thread&&);
+
+  // Start a new thread that calls func.
+  // logicalCpuHint is a hint to run the thread on the specified logical CPU.
+  // logicalCpuHint may be entirely ignored.
+  Thread(unsigned int logicalCpuHint, const Func& func);
+
+  ~Thread();
+
+  // join() blocks until the thread completes.
+  void join();
+
   // setName() sets the name of the currently executing thread for displaying
   // in a debugger.
   static void setName(const char* fmt, ...);
@@ -28,6 +50,13 @@
   // numLogicalCPUs() returns the number of available logical CPU cores for
   // the system.
   static unsigned int numLogicalCPUs();
+
+ private:
+  Thread(const Thread&) = delete;
+  Thread& operator=(const Thread&) = delete;
+
+  class Impl;
+  Impl* impl = nullptr;
 };
 
 }  // namespace marl
diff --git a/third_party/marl/src/osfiber_asm_arm.S b/third_party/marl/src/osfiber_asm_arm.S
index d276ac8..c7103dc 100644
--- a/third_party/marl/src/osfiber_asm_arm.S
+++ b/third_party/marl/src/osfiber_asm_arm.S
@@ -23,6 +23,7 @@
 .text
 .global marl_fiber_swap
 .align 4
+.type marl_fiber_swap, %function
 marl_fiber_swap:
 
     // Save context 'from'
diff --git a/third_party/marl/src/scheduler.cpp b/third_party/marl/src/scheduler.cpp
index 197dbb9..d8de740 100644
--- a/third_party/marl/src/scheduler.cpp
+++ b/third_party/marl/src/scheduler.cpp
@@ -361,7 +361,7 @@
 void Scheduler::Worker::start() {
   switch (mode) {
     case Mode::MultiThreaded:
-      thread = std::thread([=] {
+      thread = Thread(id, [=] {
         Thread::setName("Thread<%.2d>", int(id));
 
         if (auto const& initFunc = scheduler->getThreadInitializer()) {
diff --git a/third_party/marl/src/thread.cpp b/third_party/marl/src/thread.cpp
index 6c505d7..fa69bdf 100644
--- a/third_party/marl/src/thread.cpp
+++ b/third_party/marl/src/thread.cpp
@@ -14,6 +14,8 @@
 
 #include "marl/thread.h"
 
+#include "marl/debug.h"
+#include "marl/defer.h"
 #include "marl/trace.h"
 
 #include <cstdarg>
@@ -22,20 +24,135 @@
 #if defined(_WIN32)
 #define WIN32_LEAN_AND_MEAN 1
 #include <windows.h>
-#include <cstdlib> // mbstowcs
+#include <cstdlib>  // mbstowcs
+#include <vector>
 #elif defined(__APPLE__)
 #include <mach/thread_act.h>
 #include <pthread.h>
 #include <unistd.h>
+#include <thread>
 #else
 #include <pthread.h>
 #include <unistd.h>
+#include <thread>
 #endif
 
 namespace marl {
 
 #if defined(_WIN32)
 
+#define CHECK_WIN32(expr)                                    \
+  do {                                                       \
+    auto res = expr;                                         \
+    (void)res;                                               \
+    MARL_ASSERT(res == TRUE, #expr " failed with error: %d", \
+                (int)GetLastError());                        \
+  } while (false)
+
+namespace {
+
+struct ProcessorGroup {
+  unsigned int count;  // number of logical processors in this group.
+  KAFFINITY affinity;  // affinity mask.
+};
+
+const std::vector<ProcessorGroup>& getProcessorGroups() {
+  static std::vector<ProcessorGroup> groups = [] {
+    std::vector<ProcessorGroup> out;
+    SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX info[32] = {};
+    DWORD size = sizeof(info);
+    CHECK_WIN32(GetLogicalProcessorInformationEx(RelationGroup, info, &size));
+    DWORD count = size / sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX);
+    for (DWORD i = 0; i < count; i++) {
+      if (info[i].Relationship == RelationGroup) {
+        auto groupCount = info[i].Group.ActiveGroupCount;
+        for (WORD groupIdx = 0; groupIdx < groupCount; groupIdx++) {
+          auto const& groupInfo = info[i].Group.GroupInfo[groupIdx];
+          out.emplace_back(ProcessorGroup{groupInfo.ActiveProcessorCount,
+                                          groupInfo.ActiveProcessorMask});
+        }
+      }
+    }
+    return out;
+  }();
+  return groups;
+}
+
+bool getGroupAffinity(unsigned int index, GROUP_AFFINITY* groupAffinity) {
+  auto& groups = getProcessorGroups();
+  for (size_t groupIdx = 0; groupIdx < groups.size(); groupIdx++) {
+    auto& group = groups[groupIdx];
+    if (index < group.count) {
+      for (int i = 0; i < sizeof(group.affinity) * 8; i++) {
+        if (group.affinity & (1ULL << i)) {
+          if (index == 0) {
+            groupAffinity->Group = static_cast<WORD>(groupIdx);
+            // Use the whole group's affinity, as the OS is then able to shuffle
+            // threads around based on external demands. Pinning these to a
+            // single core can cause up to 20% performance loss in benchmarking.
+            groupAffinity->Mask = group.affinity;
+            return true;
+          }
+          index--;
+        }
+      }
+      return false;
+    } else {
+      index -= group.count;
+    }
+  }
+  return false;
+}
+
+}  // namespace
+
+class Thread::Impl {
+ public:
+  Impl(const Func& func) : func(func) {}
+  static DWORD WINAPI run(void* self) {
+    reinterpret_cast<Impl*>(self)->func();
+    return 0;
+  }
+
+  Func func;
+  HANDLE handle;
+};
+
+Thread::Thread(unsigned int logicalCpu, const Func& func) {
+  SIZE_T size = 0;
+  InitializeProcThreadAttributeList(nullptr, 1, 0, &size);
+  MARL_ASSERT(size > 0,
+              "InitializeProcThreadAttributeList() did not give a size");
+
+  LPPROC_THREAD_ATTRIBUTE_LIST attributes =
+      reinterpret_cast<LPPROC_THREAD_ATTRIBUTE_LIST>(alloca(size));
+  CHECK_WIN32(InitializeProcThreadAttributeList(attributes, 1, 0, &size));
+  defer(DeleteProcThreadAttributeList(attributes));
+
+  GROUP_AFFINITY groupAffinity = {};
+  if (getGroupAffinity(logicalCpu, &groupAffinity)) {
+    CHECK_WIN32(UpdateProcThreadAttribute(
+        attributes, 0, PROC_THREAD_ATTRIBUTE_GROUP_AFFINITY, &groupAffinity,
+        sizeof(groupAffinity), nullptr, nullptr));
+  }
+
+  impl = new Impl(func);
+  impl->handle = CreateRemoteThreadEx(GetCurrentProcess(), nullptr, 0,
+                                      &Impl::run, impl, 0, attributes, nullptr);
+}
+
+Thread::~Thread() {
+  if (impl) {
+    CloseHandle(impl->handle);
+    delete impl;
+  }
+}
+
+void Thread::join() {
+  MARL_ASSERT(impl != nullptr, "join() called on unjoinable thread");
+  WaitForSingleObject(impl->handle, INFINITE);
+}
+
 void Thread::setName(const char* fmt, ...) {
   static auto setThreadDescription =
       reinterpret_cast<HRESULT(WINAPI*)(HANDLE, PCWSTR)>(GetProcAddress(
@@ -57,25 +174,33 @@
 }
 
 unsigned int Thread::numLogicalCPUs() {
-  DWORD_PTR processAffinityMask = 1;
-  DWORD_PTR systemAffinityMask = 1;
-
-  GetProcessAffinityMask(GetCurrentProcess(), &processAffinityMask,
-                         &systemAffinityMask);
-
-  auto count = 0;
-  while (processAffinityMask > 0) {
-    if (processAffinityMask & 1) {
-      count++;
-    }
-
-    processAffinityMask >>= 1;
+  unsigned int count = 0;
+  for (auto& group : getProcessorGroups()) {
+    count += group.count;
   }
   return count;
 }
 
 #else
 
+class Thread::Impl {
+ public:
+  template <typename F>
+  Impl(F&& func) : thread(func) {}
+  std::thread thread;
+};
+
+Thread::Thread(unsigned int /* logicalCpu */, const Func& func)
+    : impl(new Thread::Impl(func)) {}
+
+Thread::~Thread() {
+  delete impl;
+}
+
+void Thread::join() {
+  impl->thread.join();
+}
+
 void Thread::setName(const char* fmt, ...) {
   char name[1024];
   va_list vararg;
@@ -96,6 +221,20 @@
   return sysconf(_SC_NPROCESSORS_ONLN);
 }
 
-#endif
+#endif  // OS
+
+Thread::Thread(Thread&& rhs) : impl(rhs.impl) {
+  rhs.impl = nullptr;
+}
+
+Thread& Thread::operator=(Thread&& rhs) {
+  if (impl) {
+    delete impl;
+    impl = nullptr;
+  }
+  impl = rhs.impl;
+  rhs.impl = nullptr;
+  return *this;
+}
 
 }  // namespace marl