diff --git a/kotlin-native/runtime/src/gc/cms/cpp/ConcurrentMarkAndSweepTest.cpp b/kotlin-native/runtime/src/gc/cms/cpp/ConcurrentMarkAndSweepTest.cpp index fae62ac218a..e8f9db9e451 100644 --- a/kotlin-native/runtime/src/gc/cms/cpp/ConcurrentMarkAndSweepTest.cpp +++ b/kotlin-native/runtime/src/gc/cms/cpp/ConcurrentMarkAndSweepTest.cpp @@ -19,6 +19,7 @@ #include "GlobalData.hpp" #include "ObjectOps.hpp" #include "ObjectTestSupport.hpp" +#include "SingleThreadExecutor.hpp" #include "TestSupport.hpp" #include "ThreadData.hpp" @@ -650,101 +651,65 @@ namespace { class Mutator : private Pinned { public: - Mutator() : thread_(&Mutator::RunLoop, this) {} - - ~Mutator() { - { - std::unique_lock guard(queueMutex_); - shutdownRequested_ = true; - } - queueCV_.notify_one(); - thread_.join(); - RuntimeAssert(queue_.empty(), "The queue must be empty, has size=%zu", queue_.size()); - RuntimeAssert(memory_ == nullptr, "Memory must have been deinitialized"); - RuntimeAssert(stackRoots_.empty(), "Stack roots must be empty, has size=%zu", stackRoots_.size()); - RuntimeAssert(globalRoots_.empty(), "Global roots must be empty, has size=%zu", globalRoots_.size()); - } + Mutator() : executor_(MakeSingleThreadExecutorWithContext()) {} template [[nodiscard]] std::future Execute(F&& f) { - std::packaged_task task([this, f = std::forward(f)]() { f(*memory_->memoryState()->GetThreadData(), *this); }); - auto future = task.get_future(); - { - std::unique_lock guard(queueMutex_); - queue_.push_back(std::move(task)); - } - queueCV_.notify_one(); - return future; + return executor_.Execute( + [this, f = std::forward(f)] { f(*executor_.thread().context().memory_->memoryState()->GetThreadData(), *this); }); } StackObjectHolder& AddStackRoot() { - RuntimeAssert(std::this_thread::get_id() == thread_.get_id(), "AddStackRoot can only be called in the mutator thread"); - auto holder = make_unique(*memory_->memoryState()->GetThreadData()); + RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddStackRoot can only be called in the mutator thread"); + auto& context = executor_.thread().context(); + auto holder = make_unique(*context.memory_->memoryState()->GetThreadData()); auto& holderRef = *holder; - stackRoots_.push_back(std::move(holder)); + context.stackRoots_.push_back(std::move(holder)); return holderRef; } StackObjectHolder& AddStackRoot(ObjHeader* object) { - RuntimeAssert(std::this_thread::get_id() == thread_.get_id(), "AddStackRoot can only be called in the mutator thread"); + RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddStackRoot can only be called in the mutator thread"); + auto& context = executor_.thread().context(); auto holder = make_unique(object); auto& holderRef = *holder; - stackRoots_.push_back(std::move(holder)); + context.stackRoots_.push_back(std::move(holder)); return holderRef; } GlobalObjectHolder& AddGlobalRoot() { - RuntimeAssert(std::this_thread::get_id() == thread_.get_id(), "AddGlobalRoot can only be called in the mutator thread"); - auto holder = make_unique(*memory_->memoryState()->GetThreadData()); + RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddGlobalRoot can only be called in the mutator thread"); + auto& context = executor_.thread().context(); + auto holder = make_unique(*context.memory_->memoryState()->GetThreadData()); auto& holderRef = *holder; - globalRoots_.push_back(std::move(holder)); + context.globalRoots_.push_back(std::move(holder)); return holderRef; } GlobalObjectHolder& AddGlobalRoot(ObjHeader* object) { - RuntimeAssert(std::this_thread::get_id() == thread_.get_id(), "AddGlobalRoot can only be called in the mutator thread"); - auto holder = make_unique(*memory_->memoryState()->GetThreadData(), object); + RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddGlobalRoot can only be called in the mutator thread"); + auto& context = executor_.thread().context(); + auto holder = make_unique(*context.memory_->memoryState()->GetThreadData(), object); auto& holderRef = *holder; - globalRoots_.push_back(std::move(holder)); + context.globalRoots_.push_back(std::move(holder)); return holderRef; } - KStdVector Alive() { return ::Alive(*memory_->memoryState()->GetThreadData()); } + KStdVector Alive() { return ::Alive(*executor_.thread().context().memory_->memoryState()->GetThreadData()); } private: - void RunLoop() { - memory_ = make_unique(); - AssertThreadState(memory_->memoryState(), ThreadState::kRunnable); + struct Context { + KStdUniquePtr memory_; + KStdVector> stackRoots_; + KStdVector> globalRoots_; - while (true) { - std::packaged_task task; - { - std::unique_lock guard(queueMutex_); - queueCV_.wait(guard, [this]() { return !queue_.empty() || shutdownRequested_; }); - if (shutdownRequested_) { - globalRoots_.clear(); - stackRoots_.clear(); - memory_.reset(); - return; - } - task = std::move(queue_.front()); - queue_.pop_front(); - } - task(); + Context() : memory_(make_unique()) { + // SingleThreadExecutor must work in the runnable state, so that GC does not collect between tasks. + AssertThreadState(memory_->memoryState(), ThreadState::kRunnable); } - } + }; - KStdUniquePtr memory_; - - // TODO: Consider full runtime init instead, and interact with initialized worker - std::condition_variable queueCV_; - std::mutex queueMutex_; - KStdDeque> queue_; - bool shutdownRequested_ = false; - std::thread thread_; - - KStdVector> globalRoots_; - KStdVector> stackRoots_; + SingleThreadExecutor> executor_; }; } // namespace diff --git a/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweepTest.cpp b/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweepTest.cpp index 6b6e280dbd1..9b21127b5fb 100644 --- a/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweepTest.cpp +++ b/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweepTest.cpp @@ -19,6 +19,7 @@ #include "GlobalData.hpp" #include "ObjectOps.hpp" #include "ObjectTestSupport.hpp" +#include "SingleThreadExecutor.hpp" #include "TestSupport.hpp" #include "ThreadData.hpp" @@ -646,101 +647,65 @@ namespace { class Mutator : private Pinned { public: - Mutator() : thread_(&Mutator::RunLoop, this) {} - - ~Mutator() { - { - std::unique_lock guard(queueMutex_); - shutdownRequested_ = true; - } - queueCV_.notify_one(); - thread_.join(); - RuntimeAssert(queue_.empty(), "The queue must be empty, has size=%zu", queue_.size()); - RuntimeAssert(memory_ == nullptr, "Memory must have been deinitialized"); - RuntimeAssert(stackRoots_.empty(), "Stack roots must be empty, has size=%zu", stackRoots_.size()); - RuntimeAssert(globalRoots_.empty(), "Global roots must be empty, has size=%zu", globalRoots_.size()); - } + Mutator() : executor_(MakeSingleThreadExecutorWithContext()) {} template [[nodiscard]] std::future Execute(F&& f) { - std::packaged_task task([this, f = std::forward(f)]() { f(*memory_->memoryState()->GetThreadData(), *this); }); - auto future = task.get_future(); - { - std::unique_lock guard(queueMutex_); - queue_.push_back(std::move(task)); - } - queueCV_.notify_one(); - return future; + return executor_.Execute( + [this, f = std::forward(f)] { f(*executor_.thread().context().memory_->memoryState()->GetThreadData(), *this); }); } StackObjectHolder& AddStackRoot() { - RuntimeAssert(std::this_thread::get_id() == thread_.get_id(), "AddStackRoot can only be called in the mutator thread"); - auto holder = make_unique(*memory_->memoryState()->GetThreadData()); + RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddStackRoot can only be called in the mutator thread"); + auto& context = executor_.thread().context(); + auto holder = make_unique(*context.memory_->memoryState()->GetThreadData()); auto& holderRef = *holder; - stackRoots_.push_back(std::move(holder)); + context.stackRoots_.push_back(std::move(holder)); return holderRef; } StackObjectHolder& AddStackRoot(ObjHeader* object) { - RuntimeAssert(std::this_thread::get_id() == thread_.get_id(), "AddStackRoot can only be called in the mutator thread"); + RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddStackRoot can only be called in the mutator thread"); + auto& context = executor_.thread().context(); auto holder = make_unique(object); auto& holderRef = *holder; - stackRoots_.push_back(std::move(holder)); + context.stackRoots_.push_back(std::move(holder)); return holderRef; } GlobalObjectHolder& AddGlobalRoot() { - RuntimeAssert(std::this_thread::get_id() == thread_.get_id(), "AddGlobalRoot can only be called in the mutator thread"); - auto holder = make_unique(*memory_->memoryState()->GetThreadData()); + RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddGlobalRoot can only be called in the mutator thread"); + auto& context = executor_.thread().context(); + auto holder = make_unique(*context.memory_->memoryState()->GetThreadData()); auto& holderRef = *holder; - globalRoots_.push_back(std::move(holder)); + context.globalRoots_.push_back(std::move(holder)); return holderRef; } GlobalObjectHolder& AddGlobalRoot(ObjHeader* object) { - RuntimeAssert(std::this_thread::get_id() == thread_.get_id(), "AddGlobalRoot can only be called in the mutator thread"); - auto holder = make_unique(*memory_->memoryState()->GetThreadData(), object); + RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddGlobalRoot can only be called in the mutator thread"); + auto& context = executor_.thread().context(); + auto holder = make_unique(*context.memory_->memoryState()->GetThreadData(), object); auto& holderRef = *holder; - globalRoots_.push_back(std::move(holder)); + context.globalRoots_.push_back(std::move(holder)); return holderRef; } - KStdVector Alive() { return ::Alive(*memory_->memoryState()->GetThreadData()); } + KStdVector Alive() { return ::Alive(*executor_.thread().context().memory_->memoryState()->GetThreadData()); } private: - void RunLoop() { - memory_ = make_unique(); - AssertThreadState(memory_->memoryState(), ThreadState::kRunnable); + struct Context { + KStdUniquePtr memory_; + KStdVector> stackRoots_; + KStdVector> globalRoots_; - while (true) { - std::packaged_task task; - { - std::unique_lock guard(queueMutex_); - queueCV_.wait(guard, [this]() { return !queue_.empty() || shutdownRequested_; }); - if (shutdownRequested_) { - globalRoots_.clear(); - stackRoots_.clear(); - memory_.reset(); - return; - } - task = std::move(queue_.front()); - queue_.pop_front(); - } - task(); + Context() : memory_(make_unique()) { + // SingleThreadExecutor must work in the runnable state, so that GC does not collect between tasks. + AssertThreadState(memory_->memoryState(), ThreadState::kRunnable); } - } + }; - KStdUniquePtr memory_; - - // TODO: Consider full runtime init instead, and interact with initialized worker - std::condition_variable queueCV_; - std::mutex queueMutex_; - KStdDeque> queue_; - bool shutdownRequested_ = false; - std::thread thread_; - - KStdVector> globalRoots_; - KStdVector> stackRoots_; + SingleThreadExecutor> executor_; }; } // namespace diff --git a/kotlin-native/runtime/src/main/cpp/SingleThreadExecutor.hpp b/kotlin-native/runtime/src/main/cpp/SingleThreadExecutor.hpp new file mode 100644 index 00000000000..cfc54f41f4c --- /dev/null +++ b/kotlin-native/runtime/src/main/cpp/SingleThreadExecutor.hpp @@ -0,0 +1,184 @@ +/* + * Copyright 2010-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license + * that can be found in the LICENSE file. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "Types.h" +#include "Utils.hpp" + +namespace kotlin { + +// TODO: Try to generalize enough, so that FinalizerProcessor is implementable in terms of it. +// Requirements: avoid heap allocations as much as possible. +// TODO: Try to generalize so enough, that Worker.cpp can be written on top of this. +// Requirements: delayed tasks. + +// Thread that has a context attached to it. Context is created and destroyed on the thread. +// Thread cannot join before the destructor, because otherwise `context()` will dangle. +template +class ThreadWithContext : private Pinned { +public: + ThreadWithContext() = default; + + template + explicit ThreadWithContext(ContextFactory&& contextFactory, Function&& f, Args&&... args) : + thread_( + [this, contextFactory = std::forward(contextFactory), f = std::forward(f)]( + Args&&... args) mutable { + auto context = contextFactory(); + { + std::unique_lock guard(startMutex_); + context_ = &context; + } + startCV_.notify_one(); + std::invoke(std::forward(f), std::forward(args)...); + std::unique_lock guard(stopMutex_); + stopCV_.wait(guard, [this] { return needsShutdown_; }); + }, + std::forward(args)...) {} + + ~ThreadWithContext() { + { + std::unique_lock guard(stopMutex_); + needsShutdown_ = true; + } + stopCV_.notify_one(); + thread_.join(); + } + + // Wait until thread is fully initialized and `context()` is created. + void waitInitialized() noexcept { + std::unique_lock guard(startMutex_); + startCV_.wait(guard, [this] { return context_ != nullptr; }); + } + + // May only be called after the thread has fully initialized. Use `WaitInitialized()` to be sure. + Context& context() const noexcept { + RuntimeAssert(context_ != nullptr, "context must be set"); + return *context_; + } + + std::thread::id get_id() const noexcept { return thread_.get_id(); } + +private: + std::condition_variable startCV_; + std::mutex startMutex_; + Context* context_ = nullptr; + + // Need to keep thread alive for the entire lifetime of this object, because `context_` lifetime + // is bound to the thread. + std::condition_variable stopCV_; + std::mutex stopMutex_; + bool needsShutdown_ = false; + + std::thread thread_; +}; + +// TODO: Replace with `std::jthread`. +// A thread that always joins in the destructor +class JoiningThread : private MoveOnly { +public: + JoiningThread() = default; + + template + explicit JoiningThread(Function&& f, Args&&... args) : thread_(std::forward(f), std::forward(args)...) {} + + ~JoiningThread() { thread_.join(); } + + std::thread::id get_id() const noexcept { return thread_.get_id(); } + +private: + std::thread thread_; +}; + +// Execute tasks on a single worker thread. +// `Thread` must join in the destructor. +template +class SingleThreadExecutor : private Pinned { +public: + // Starts the worker thread immediately. + template + explicit SingleThreadExecutor(ThreadFactory&& threadFactory) noexcept : + thread_(std::forward(threadFactory)(&SingleThreadExecutor::RunLoop, this)) {} + + SingleThreadExecutor() noexcept : + SingleThreadExecutor([](auto&& function, auto&&... args) { + return Thread(std::forward(function), std::forward(args)...); + }) {} + + ~SingleThreadExecutor() { + { + std::unique_lock guard(workMutex_); + // Note: This can only happen in destructor, because otherwise `context_` will be a dangling + // pointer to the destroyed thread's stack. + shutdownRequested_ = true; + } + workCV_.notify_one(); + } + + Thread& thread() noexcept { return thread_; } + + // Schedule task execution on the worker thread. The returned future is resolved when the task has completed. + // If `this` is destroyed before the task manages to complete, the returned future will fail with exception upon `.get()`. + // If the task moves the runtime into a runnable state, it should move it back into the native state. + template + [[nodiscard]] std::future Execute(Task&& f) noexcept { + std::packaged_task task(std::forward(f)); + auto future = task.get_future(); + { + std::unique_lock guard(workMutex_); + queue_.push_back(std::move(task)); + } + workCV_.notify_one(); + return future; + } + +private: + void RunLoop() noexcept { + while (true) { + std::packaged_task task; + { + std::unique_lock guard(workMutex_); + workCV_.wait(guard, [this] { return !queue_.empty() || shutdownRequested_; }); + if (shutdownRequested_) { + return; + } + task = std::move(queue_.front()); + queue_.pop_front(); + } + task(); + } + } + + std::condition_variable workCV_; + std::mutex workMutex_; + KStdDeque> queue_; + bool shutdownRequested_ = false; + + Thread thread_; +}; + +template +SingleThreadExecutor> MakeSingleThreadExecutorWithContext(ContextFactory&& contextFactory) noexcept { + return SingleThreadExecutor>( + [contextFactory = std::forward(contextFactory)](auto&& function, auto&&... args) mutable { + return ThreadWithContext( + std::forward(contextFactory), std::forward(function), + std::forward(args)...); + }); +} + +template +SingleThreadExecutor> MakeSingleThreadExecutorWithContext() noexcept { + return MakeSingleThreadExecutorWithContext([] { return Context(); }); +} + +} // namespace kotlin diff --git a/kotlin-native/runtime/src/main/cpp/SingleThreadExecutorTest.cpp b/kotlin-native/runtime/src/main/cpp/SingleThreadExecutorTest.cpp new file mode 100644 index 00000000000..07fc91e4114 --- /dev/null +++ b/kotlin-native/runtime/src/main/cpp/SingleThreadExecutorTest.cpp @@ -0,0 +1,195 @@ +/* + * Copyright 2010-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license + * that can be found in the LICENSE file. + */ + +#include "SingleThreadExecutor.hpp" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include "KAssert.h" +#include "TestSupport.hpp" + +using namespace kotlin; + +using testing::_; + +namespace { + +class PinnedContext : private Pinned { +public: + struct ScopedMocks : private Pinned { + testing::StrictMock> ctorMock; + testing::StrictMock> dtorMock; + + ScopedMocks() { + RuntimeAssert(PinnedContext::ctorMock == nullptr, "ctor mock must be null was %p", PinnedContext::ctorMock); + PinnedContext::ctorMock = &ctorMock; + RuntimeAssert(PinnedContext::dtorMock == nullptr, "dtor mock must be null was %p", PinnedContext::dtorMock); + PinnedContext::dtorMock = &dtorMock; + } + + ~ScopedMocks() { + RuntimeAssert(PinnedContext::ctorMock == &ctorMock, "ctor mock must be %p was %p", &ctorMock, PinnedContext::ctorMock); + PinnedContext::ctorMock = nullptr; + RuntimeAssert(PinnedContext::dtorMock == &dtorMock, "dtor mock must be %p was %p", &dtorMock, PinnedContext::dtorMock); + PinnedContext::dtorMock = nullptr; + } + }; + + PinnedContext() { ctorMock->Call(*this); } + + ~PinnedContext() { dtorMock->Call(*this); } + +private: + static testing::MockFunction* ctorMock; + static testing::MockFunction* dtorMock; +}; + +testing::MockFunction* PinnedContext::ctorMock = nullptr; +testing::MockFunction* PinnedContext::dtorMock = nullptr; + +} // namespace + +TEST(ThreadWithContextTest, ContextThreadBound) { + PinnedContext::ScopedMocks mocks; + PinnedContext* createdContext = nullptr; + std::thread::id createdThread; + testing::StrictMock> function; + EXPECT_CALL(mocks.ctorMock, Call(_)).WillOnce([&](PinnedContext& context) { + createdContext = &context; + createdThread = std::this_thread::get_id(); + }); + EXPECT_CALL(function, Call()).WillOnce([&] { EXPECT_THAT(std::this_thread::get_id(), createdThread); }); + auto thread = ::make_unique>([] { return PinnedContext(); }, function.AsStdFunction()); + thread->waitInitialized(); + testing::Mock::VerifyAndClearExpectations(&function); + testing::Mock::VerifyAndClearExpectations(&mocks.ctorMock); + EXPECT_THAT(createdThread, thread->get_id()); + EXPECT_THAT(thread->context(), testing::Ref(*createdContext)); + + EXPECT_CALL(mocks.dtorMock, Call(testing::Ref(*createdContext))).WillOnce([&] { + EXPECT_THAT(std::this_thread::get_id(), createdThread); + }); + thread.reset(); + testing::Mock::VerifyAndClearExpectations(&mocks.dtorMock); +} + +TEST(ThreadWithContextTest, WaitInitialized) { + PinnedContext::ScopedMocks mocks; + PinnedContext* createdContext = nullptr; + std::mutex ctorMutex; + EXPECT_CALL(mocks.ctorMock, Call(_)).WillOnce([&](PinnedContext& context) { + std::unique_lock guard(ctorMutex); + createdContext = &context; + }); + + testing::StrictMock> function; + EXPECT_CALL(function, Call()).Times(0); + ctorMutex.lock(); + auto thread = ::make_unique>([] { return PinnedContext(); }, function.AsStdFunction()); + + std::atomic_bool initialized = false; + std::thread initializedWaiter([&] { + thread->waitInitialized(); + initialized = true; + }); + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + EXPECT_THAT(initialized.load(), false); + testing::Mock::VerifyAndClearExpectations(&function); + + EXPECT_CALL(function, Call()); + ctorMutex.unlock(); + initializedWaiter.join(); + testing::Mock::VerifyAndClearExpectations(&mocks.ctorMock); + testing::Mock::VerifyAndClearExpectations(&function); + EXPECT_THAT(initialized.load(), true); + + EXPECT_THAT(thread->context(), testing::Ref(*createdContext)); + EXPECT_CALL(mocks.dtorMock, Call(testing::Ref(*createdContext))); +} + +TEST(SingleThreadExecutorTest, Execute) { + SingleThreadExecutor executor; + + std::mutex taskMutex; + testing::StrictMock> task; + + EXPECT_CALL(task, Call()).WillOnce([&] { std::unique_lock guard(taskMutex); }); + taskMutex.lock(); + auto future = executor.Execute(task.AsStdFunction()); + + auto futureStatus = future.wait_for(std::chrono::milliseconds(10)); + EXPECT_THAT(futureStatus, std::future_status::timeout); + + taskMutex.unlock(); + future.get(); + testing::Mock::VerifyAndClearExpectations(&task); +} + +TEST(SingleThreadExecutorTest, DropExecutorWithTasks) { + auto executor = make_unique>(); + + std::mutex taskMutex; + testing::StrictMock> task; + + std::atomic_bool taskStarted = false; + EXPECT_CALL(task, Call()).WillOnce([&] { + taskStarted = true; + std::unique_lock guard(taskMutex); + }); + taskMutex.lock(); + auto future = executor->Execute(task.AsStdFunction()); + while (!taskStarted) {} + + KStdVector, bool>> newTasks; + constexpr size_t tasksCount = 100; + for (size_t i = 0; i < tasksCount; ++i) { + newTasks.push_back(std::make_pair(executor->Execute([&newTasks, i] { newTasks[i].second = true; }), false)); + } + + taskMutex.unlock(); + executor.reset(); + + testing::Mock::VerifyAndClearExpectations(&task); + future.get(); + + // There's no guarantee whether any of those succeed, or any fail. + for (auto& [future, success] : newTasks) { + if (success) { + future.get(); + } else { + EXPECT_THROW(future.get(), std::future_error); + } + } +} + +TEST(SingleThreadExecutorTest, ExecuteFromManyThreads) { + struct Context { + KStdVector result; + }; + auto executor = MakeSingleThreadExecutorWithContext(); + + std::atomic_bool canStart = false; + + KStdVector expected; + KStdVector threads; + for (int i = 0; i < kDefaultThreadCount; ++i) { + expected.push_back(i); + threads.emplace_back([&, i] { + while (!canStart) { + } + executor.Execute([&] { executor.thread().context().result.push_back(i); }).get(); + }); + } + + canStart = true; + + for (auto& thread : threads) { + thread.join(); + } + + EXPECT_THAT(executor.thread().context().result, testing::UnorderedElementsAreArray(expected)); +}