From 4c97b55adfe2f517a5ef7b36a2dfafddec85c8ea Mon Sep 17 00:00:00 2001 From: Alexander Shabalin Date: Fri, 28 Jan 2022 12:11:44 +0000 Subject: [PATCH] [K/N] Simplify SingleThreadExecutor Merge-request: KT-MR-5591 Merged-by: Alexander Shabalin --- .../gc/cms/cpp/ConcurrentMarkAndSweepTest.cpp | 26 ++-- .../src/gc/common/cpp/GCSchedulerTest.cpp | 9 +- .../stms/cpp/SameThreadMarkAndSweepTest.cpp | 26 ++-- .../src/main/cpp/SingleThreadExecutor.hpp | 138 ++++-------------- .../src/main/cpp/SingleThreadExecutorTest.cpp | 68 ++++----- 5 files changed, 92 insertions(+), 175 deletions(-) diff --git a/kotlin-native/runtime/src/gc/cms/cpp/ConcurrentMarkAndSweepTest.cpp b/kotlin-native/runtime/src/gc/cms/cpp/ConcurrentMarkAndSweepTest.cpp index 03b15336934..03ef0e734a4 100644 --- a/kotlin-native/runtime/src/gc/cms/cpp/ConcurrentMarkAndSweepTest.cpp +++ b/kotlin-native/runtime/src/gc/cms/cpp/ConcurrentMarkAndSweepTest.cpp @@ -648,17 +648,15 @@ namespace { class Mutator : private Pinned { public: - Mutator() : executor_(MakeSingleThreadExecutorWithContext()) {} - template [[nodiscard]] std::future Execute(F&& f) { - return executor_.Execute( - [this, f = std::forward(f)] { f(*executor_.thread().context().memory_->memoryState()->GetThreadData(), *this); }); + return executor_.execute( + [this, f = std::forward(f)] { f(*executor_.context().memory_->memoryState()->GetThreadData(), *this); }); } StackObjectHolder& AddStackRoot() { - RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddStackRoot can only be called in the mutator thread"); - auto& context = executor_.thread().context(); + RuntimeAssert(std::this_thread::get_id() == executor_.threadId(), "AddStackRoot can only be called in the mutator thread"); + auto& context = executor_.context(); auto holder = make_unique(*context.memory_->memoryState()->GetThreadData()); auto& holderRef = *holder; context.stackRoots_.push_back(std::move(holder)); @@ -666,8 +664,8 @@ public: } StackObjectHolder& AddStackRoot(ObjHeader* object) { - RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddStackRoot can only be called in the mutator thread"); - auto& context = executor_.thread().context(); + RuntimeAssert(std::this_thread::get_id() == executor_.threadId(), "AddStackRoot can only be called in the mutator thread"); + auto& context = executor_.context(); auto holder = make_unique(object); auto& holderRef = *holder; context.stackRoots_.push_back(std::move(holder)); @@ -675,8 +673,8 @@ public: } GlobalObjectHolder& AddGlobalRoot() { - RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddGlobalRoot can only be called in the mutator thread"); - auto& context = executor_.thread().context(); + RuntimeAssert(std::this_thread::get_id() == executor_.threadId(), "AddGlobalRoot can only be called in the mutator thread"); + auto& context = executor_.context(); auto holder = make_unique(*context.memory_->memoryState()->GetThreadData()); auto& holderRef = *holder; context.globalRoots_.push_back(std::move(holder)); @@ -684,15 +682,15 @@ public: } GlobalObjectHolder& AddGlobalRoot(ObjHeader* object) { - RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddGlobalRoot can only be called in the mutator thread"); - auto& context = executor_.thread().context(); + RuntimeAssert(std::this_thread::get_id() == executor_.threadId(), "AddGlobalRoot can only be called in the mutator thread"); + auto& context = executor_.context(); auto holder = make_unique(*context.memory_->memoryState()->GetThreadData(), object); auto& holderRef = *holder; context.globalRoots_.push_back(std::move(holder)); return holderRef; } - KStdVector Alive() { return ::Alive(*executor_.thread().context().memory_->memoryState()->GetThreadData()); } + KStdVector Alive() { return ::Alive(*executor_.context().memory_->memoryState()->GetThreadData()); } private: struct Context { @@ -706,7 +704,7 @@ private: } }; - SingleThreadExecutor> executor_; + SingleThreadExecutor executor_; }; } // namespace diff --git a/kotlin-native/runtime/src/gc/common/cpp/GCSchedulerTest.cpp b/kotlin-native/runtime/src/gc/common/cpp/GCSchedulerTest.cpp index fae2994726b..32abf876fd3 100644 --- a/kotlin-native/runtime/src/gc/common/cpp/GCSchedulerTest.cpp +++ b/kotlin-native/runtime/src/gc/common/cpp/GCSchedulerTest.cpp @@ -286,12 +286,11 @@ using TimePoint = std::chrono::time_point; class MutatorThread : private Pinned { public: MutatorThread(GCSchedulerConfig& config, std::function slowPath) : - executor_(MakeSingleThreadExecutorWithContext( - [&config, slowPath = std::move(slowPath)] { return Context(config, std::move(slowPath)); })) {} + executor_([&config, slowPath = std::move(slowPath)] { return Context(config, std::move(slowPath)); }) {} std::future Allocate(size_t bytes) { - return executor_.Execute([&, bytes] { - auto& context = executor_.thread().context(); + return executor_.execute([&, bytes] { + auto& context = executor_.context(); context.threadDataTestApi.SetAllocatedBytes(bytes); context.slowPath(context.threadData); }); @@ -307,7 +306,7 @@ private: threadData(config, [](GCSchedulerThreadData&) {}), threadDataTestApi(threadData), slowPath(slowPath) {} }; - SingleThreadExecutor> executor_; + SingleThreadExecutor executor_; }; template diff --git a/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweepTest.cpp b/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweepTest.cpp index 9b21127b5fb..9892f368df4 100644 --- a/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweepTest.cpp +++ b/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweepTest.cpp @@ -647,17 +647,15 @@ namespace { class Mutator : private Pinned { public: - Mutator() : executor_(MakeSingleThreadExecutorWithContext()) {} - template [[nodiscard]] std::future Execute(F&& f) { - return executor_.Execute( - [this, f = std::forward(f)] { f(*executor_.thread().context().memory_->memoryState()->GetThreadData(), *this); }); + return executor_.execute( + [this, f = std::forward(f)] { f(*executor_.context().memory_->memoryState()->GetThreadData(), *this); }); } StackObjectHolder& AddStackRoot() { - RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddStackRoot can only be called in the mutator thread"); - auto& context = executor_.thread().context(); + RuntimeAssert(std::this_thread::get_id() == executor_.threadId(), "AddStackRoot can only be called in the mutator thread"); + auto& context = executor_.context(); auto holder = make_unique(*context.memory_->memoryState()->GetThreadData()); auto& holderRef = *holder; context.stackRoots_.push_back(std::move(holder)); @@ -665,8 +663,8 @@ public: } StackObjectHolder& AddStackRoot(ObjHeader* object) { - RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddStackRoot can only be called in the mutator thread"); - auto& context = executor_.thread().context(); + RuntimeAssert(std::this_thread::get_id() == executor_.threadId(), "AddStackRoot can only be called in the mutator thread"); + auto& context = executor_.context(); auto holder = make_unique(object); auto& holderRef = *holder; context.stackRoots_.push_back(std::move(holder)); @@ -674,8 +672,8 @@ public: } GlobalObjectHolder& AddGlobalRoot() { - RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddGlobalRoot can only be called in the mutator thread"); - auto& context = executor_.thread().context(); + RuntimeAssert(std::this_thread::get_id() == executor_.threadId(), "AddGlobalRoot can only be called in the mutator thread"); + auto& context = executor_.context(); auto holder = make_unique(*context.memory_->memoryState()->GetThreadData()); auto& holderRef = *holder; context.globalRoots_.push_back(std::move(holder)); @@ -683,15 +681,15 @@ public: } GlobalObjectHolder& AddGlobalRoot(ObjHeader* object) { - RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddGlobalRoot can only be called in the mutator thread"); - auto& context = executor_.thread().context(); + RuntimeAssert(std::this_thread::get_id() == executor_.threadId(), "AddGlobalRoot can only be called in the mutator thread"); + auto& context = executor_.context(); auto holder = make_unique(*context.memory_->memoryState()->GetThreadData(), object); auto& holderRef = *holder; context.globalRoots_.push_back(std::move(holder)); return holderRef; } - KStdVector Alive() { return ::Alive(*executor_.thread().context().memory_->memoryState()->GetThreadData()); } + KStdVector Alive() { return ::Alive(*executor_.context().memory_->memoryState()->GetThreadData()); } private: struct Context { @@ -705,7 +703,7 @@ private: } }; - SingleThreadExecutor> executor_; + SingleThreadExecutor executor_; }; } // namespace diff --git a/kotlin-native/runtime/src/main/cpp/SingleThreadExecutor.hpp b/kotlin-native/runtime/src/main/cpp/SingleThreadExecutor.hpp index cfc54f41f4c..cb88a0854ee 100644 --- a/kotlin-native/runtime/src/main/cpp/SingleThreadExecutor.hpp +++ b/kotlin-native/runtime/src/main/cpp/SingleThreadExecutor.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include "Types.h" @@ -20,99 +21,19 @@ namespace kotlin { // Requirements: avoid heap allocations as much as possible. // TODO: Try to generalize so enough, that Worker.cpp can be written on top of this. // Requirements: delayed tasks. +// TODO: Makes sense to specialize Context to void when context is unneeded. -// Thread that has a context attached to it. Context is created and destroyed on the thread. -// Thread cannot join before the destructor, because otherwise `context()` will dangle. +// Execute tasks on a single worker thread. `Context` is created and destroyed on the worker thread. template -class ThreadWithContext : private Pinned { -public: - ThreadWithContext() = default; - - template - explicit ThreadWithContext(ContextFactory&& contextFactory, Function&& f, Args&&... args) : - thread_( - [this, contextFactory = std::forward(contextFactory), f = std::forward(f)]( - Args&&... args) mutable { - auto context = contextFactory(); - { - std::unique_lock guard(startMutex_); - context_ = &context; - } - startCV_.notify_one(); - std::invoke(std::forward(f), std::forward(args)...); - std::unique_lock guard(stopMutex_); - stopCV_.wait(guard, [this] { return needsShutdown_; }); - }, - std::forward(args)...) {} - - ~ThreadWithContext() { - { - std::unique_lock guard(stopMutex_); - needsShutdown_ = true; - } - stopCV_.notify_one(); - thread_.join(); - } - - // Wait until thread is fully initialized and `context()` is created. - void waitInitialized() noexcept { - std::unique_lock guard(startMutex_); - startCV_.wait(guard, [this] { return context_ != nullptr; }); - } - - // May only be called after the thread has fully initialized. Use `WaitInitialized()` to be sure. - Context& context() const noexcept { - RuntimeAssert(context_ != nullptr, "context must be set"); - return *context_; - } - - std::thread::id get_id() const noexcept { return thread_.get_id(); } - -private: - std::condition_variable startCV_; - std::mutex startMutex_; - Context* context_ = nullptr; - - // Need to keep thread alive for the entire lifetime of this object, because `context_` lifetime - // is bound to the thread. - std::condition_variable stopCV_; - std::mutex stopMutex_; - bool needsShutdown_ = false; - - std::thread thread_; -}; - -// TODO: Replace with `std::jthread`. -// A thread that always joins in the destructor -class JoiningThread : private MoveOnly { -public: - JoiningThread() = default; - - template - explicit JoiningThread(Function&& f, Args&&... args) : thread_(std::forward(f), std::forward(args)...) {} - - ~JoiningThread() { thread_.join(); } - - std::thread::id get_id() const noexcept { return thread_.get_id(); } - -private: - std::thread thread_; -}; - -// Execute tasks on a single worker thread. -// `Thread` must join in the destructor. -template class SingleThreadExecutor : private Pinned { public: // Starts the worker thread immediately. - template - explicit SingleThreadExecutor(ThreadFactory&& threadFactory) noexcept : - thread_(std::forward(threadFactory)(&SingleThreadExecutor::RunLoop, this)) {} + template + explicit SingleThreadExecutor(ContextFactory&& contextFactory) noexcept : + thread_(&SingleThreadExecutor::runLoop, this, std::forward(contextFactory)) {} - SingleThreadExecutor() noexcept : - SingleThreadExecutor([](auto&& function, auto&&... args) { - return Thread(std::forward(function), std::forward(args)...); - }) {} + // Starts the worker thread immediately. + SingleThreadExecutor() noexcept : SingleThreadExecutor([] { return Context(); }) {} ~SingleThreadExecutor() { { @@ -122,15 +43,24 @@ public: shutdownRequested_ = true; } workCV_.notify_one(); + thread_.join(); } - Thread& thread() noexcept { return thread_; } + // May lock until the context is created by the worker thread. + Context& context() const noexcept { + std::shared_lock guard(contextMutex_); + contextCV_.wait(guard, [this] { return context_ != nullptr; }); + return *context_; + } + + // Id of the worker thread. + std::thread::id threadId() const noexcept { return thread_.get_id(); } // Schedule task execution on the worker thread. The returned future is resolved when the task has completed. // If `this` is destroyed before the task manages to complete, the returned future will fail with exception upon `.get()`. // If the task moves the runtime into a runnable state, it should move it back into the native state. template - [[nodiscard]] std::future Execute(Task&& f) noexcept { + [[nodiscard]] std::future execute(Task&& f) noexcept { std::packaged_task task(std::forward(f)); auto future = task.get_future(); { @@ -142,7 +72,14 @@ public: } private: - void RunLoop() noexcept { + template + void runLoop(ContextFactory&& contextFactory) noexcept { + auto context = contextFactory(); + { + std::unique_lock guard(contextMutex_); + context_ = &context; + } + contextCV_.notify_all(); while (true) { std::packaged_task task; { @@ -158,27 +95,16 @@ private: } } + mutable std::condition_variable_any contextCV_; + mutable std::shared_mutex contextMutex_; + Context* context_ = nullptr; + std::condition_variable workCV_; std::mutex workMutex_; KStdDeque> queue_; bool shutdownRequested_ = false; - Thread thread_; + std::thread thread_; }; -template -SingleThreadExecutor> MakeSingleThreadExecutorWithContext(ContextFactory&& contextFactory) noexcept { - return SingleThreadExecutor>( - [contextFactory = std::forward(contextFactory)](auto&& function, auto&&... args) mutable { - return ThreadWithContext( - std::forward(contextFactory), std::forward(function), - std::forward(args)...); - }); -} - -template -SingleThreadExecutor> MakeSingleThreadExecutorWithContext() noexcept { - return MakeSingleThreadExecutorWithContext([] { return Context(); }); -} - } // namespace kotlin diff --git a/kotlin-native/runtime/src/main/cpp/SingleThreadExecutorTest.cpp b/kotlin-native/runtime/src/main/cpp/SingleThreadExecutorTest.cpp index 0363c1b28ce..c8c2af2e9c7 100644 --- a/kotlin-native/runtime/src/main/cpp/SingleThreadExecutorTest.cpp +++ b/kotlin-native/runtime/src/main/cpp/SingleThreadExecutorTest.cpp @@ -52,32 +52,34 @@ testing::MockFunction* PinnedContext::dtorMock = nullptr; } // namespace -TEST(ThreadWithContextTest, ContextThreadBound) { +TEST(SingleThreadExecutorTest, ContextThreadBound) { PinnedContext::ScopedMocks mocks; PinnedContext* createdContext = nullptr; std::thread::id createdThread; - testing::StrictMock> function; EXPECT_CALL(mocks.ctorMock, Call(_)).WillOnce([&](PinnedContext& context) { createdContext = &context; createdThread = std::this_thread::get_id(); }); - EXPECT_CALL(function, Call()).WillOnce([&] { EXPECT_THAT(std::this_thread::get_id(), createdThread); }); - auto thread = ::make_unique>([] { return PinnedContext(); }, function.AsStdFunction()); - thread->waitInitialized(); + auto executor = ::make_unique>(); + // Make sure context is created. + executor->context(); testing::Mock::VerifyAndClearExpectations(&mocks.ctorMock); - EXPECT_THAT(createdThread, thread->get_id()); - EXPECT_THAT(thread->context(), testing::Ref(*createdContext)); + EXPECT_THAT(createdThread, executor->threadId()); + EXPECT_THAT(executor->context(), testing::Ref(*createdContext)); + + testing::StrictMock> task; + EXPECT_CALL(task, Call()).WillOnce([&] { EXPECT_THAT(std::this_thread::get_id(), createdThread); }); + executor->execute(task.AsStdFunction()).get(); + testing::Mock::VerifyAndClearExpectations(&task); EXPECT_CALL(mocks.dtorMock, Call(testing::Ref(*createdContext))).WillOnce([&] { EXPECT_THAT(std::this_thread::get_id(), createdThread); }); - thread.reset(); - // The function is expected to be called at some point between `waitInitialized` and the thread exit. - testing::Mock::VerifyAndClearExpectations(&function); + executor.reset(); testing::Mock::VerifyAndClearExpectations(&mocks.dtorMock); } -TEST(ThreadWithContextTest, WaitInitialized) { +TEST(SingleThreadExecutorTest, WaitContext) { PinnedContext::ScopedMocks mocks; PinnedContext* createdContext = nullptr; std::mutex ctorMutex; @@ -86,41 +88,34 @@ TEST(ThreadWithContextTest, WaitInitialized) { createdContext = &context; }); - testing::StrictMock> function; - EXPECT_CALL(function, Call()).Times(0); ctorMutex.lock(); - auto thread = ::make_unique>([] { return PinnedContext(); }, function.AsStdFunction()); + SingleThreadExecutor executor; - std::atomic_bool initialized = false; - std::thread initializedWaiter([&] { - thread->waitInitialized(); - initialized = true; - }); + auto future = executor.execute([] {}); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - EXPECT_THAT(initialized.load(), false); - testing::Mock::VerifyAndClearExpectations(&function); + auto futureStatus = future.wait_for(std::chrono::milliseconds(10)); + EXPECT_THAT(futureStatus, std::future_status::timeout); - EXPECT_CALL(function, Call()); ctorMutex.unlock(); - initializedWaiter.join(); + // Wait for `thread` to initialize. + executor.context(); + future.get(); testing::Mock::VerifyAndClearExpectations(&mocks.ctorMock); - testing::Mock::VerifyAndClearExpectations(&function); - EXPECT_THAT(initialized.load(), true); - EXPECT_THAT(thread->context(), testing::Ref(*createdContext)); + EXPECT_THAT(executor.context(), testing::Ref(*createdContext)); EXPECT_CALL(mocks.dtorMock, Call(testing::Ref(*createdContext))); } -TEST(SingleThreadExecutorTest, Execute) { - SingleThreadExecutor executor; +TEST(SingleThreadExecutorTest, execute) { + struct Context {}; + SingleThreadExecutor executor; std::mutex taskMutex; testing::StrictMock> task; EXPECT_CALL(task, Call()).WillOnce([&] { std::unique_lock guard(taskMutex); }); taskMutex.lock(); - auto future = executor.Execute(task.AsStdFunction()); + auto future = executor.execute(task.AsStdFunction()); auto futureStatus = future.wait_for(std::chrono::milliseconds(10)); EXPECT_THAT(futureStatus, std::future_status::timeout); @@ -131,7 +126,8 @@ TEST(SingleThreadExecutorTest, Execute) { } TEST(SingleThreadExecutorTest, DropExecutorWithTasks) { - auto executor = make_unique>(); + struct Context {}; + auto executor = make_unique>(); std::mutex taskMutex; testing::StrictMock> task; @@ -142,13 +138,13 @@ TEST(SingleThreadExecutorTest, DropExecutorWithTasks) { std::unique_lock guard(taskMutex); }); taskMutex.lock(); - auto future = executor->Execute(task.AsStdFunction()); + auto future = executor->execute(task.AsStdFunction()); while (!taskStarted) {} KStdVector, bool>> newTasks; constexpr size_t tasksCount = 100; for (size_t i = 0; i < tasksCount; ++i) { - newTasks.push_back(std::make_pair(executor->Execute([&newTasks, i] { newTasks[i].second = true; }), false)); + newTasks.push_back(std::make_pair(executor->execute([&newTasks, i] { newTasks[i].second = true; }), false)); } taskMutex.unlock(); @@ -171,7 +167,7 @@ TEST(SingleThreadExecutorTest, ExecuteFromManyThreads) { struct Context { KStdVector result; }; - auto executor = MakeSingleThreadExecutorWithContext(); + SingleThreadExecutor executor; std::atomic_bool canStart = false; @@ -182,7 +178,7 @@ TEST(SingleThreadExecutorTest, ExecuteFromManyThreads) { threads.emplace_back([&, i] { while (!canStart) { } - executor.Execute([&] { executor.thread().context().result.push_back(i); }).get(); + executor.execute([&] { executor.context().result.push_back(i); }).get(); }); } @@ -192,5 +188,5 @@ TEST(SingleThreadExecutorTest, ExecuteFromManyThreads) { thread.join(); } - EXPECT_THAT(executor.thread().context().result, testing::UnorderedElementsAreArray(expected)); + EXPECT_THAT(executor.context().result, testing::UnorderedElementsAreArray(expected)); }