[K/N] Simplify SingleThreadExecutor

Merge-request: KT-MR-5591
Merged-by: Alexander Shabalin <Alexander.Shabalin@jetbrains.com>
This commit is contained in:
Alexander Shabalin
2022-01-28 12:11:44 +00:00
committed by Space
parent 4cd60090e0
commit 4c97b55adf
5 changed files with 92 additions and 175 deletions
@@ -648,17 +648,15 @@ namespace {
class Mutator : private Pinned {
public:
Mutator() : executor_(MakeSingleThreadExecutorWithContext<Context>()) {}
template <typename F>
[[nodiscard]] std::future<void> Execute(F&& f) {
return executor_.Execute(
[this, f = std::forward<F>(f)] { f(*executor_.thread().context().memory_->memoryState()->GetThreadData(), *this); });
return executor_.execute(
[this, f = std::forward<F>(f)] { f(*executor_.context().memory_->memoryState()->GetThreadData(), *this); });
}
StackObjectHolder& AddStackRoot() {
RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddStackRoot can only be called in the mutator thread");
auto& context = executor_.thread().context();
RuntimeAssert(std::this_thread::get_id() == executor_.threadId(), "AddStackRoot can only be called in the mutator thread");
auto& context = executor_.context();
auto holder = make_unique<StackObjectHolder>(*context.memory_->memoryState()->GetThreadData());
auto& holderRef = *holder;
context.stackRoots_.push_back(std::move(holder));
@@ -666,8 +664,8 @@ public:
}
StackObjectHolder& AddStackRoot(ObjHeader* object) {
RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddStackRoot can only be called in the mutator thread");
auto& context = executor_.thread().context();
RuntimeAssert(std::this_thread::get_id() == executor_.threadId(), "AddStackRoot can only be called in the mutator thread");
auto& context = executor_.context();
auto holder = make_unique<StackObjectHolder>(object);
auto& holderRef = *holder;
context.stackRoots_.push_back(std::move(holder));
@@ -675,8 +673,8 @@ public:
}
GlobalObjectHolder& AddGlobalRoot() {
RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddGlobalRoot can only be called in the mutator thread");
auto& context = executor_.thread().context();
RuntimeAssert(std::this_thread::get_id() == executor_.threadId(), "AddGlobalRoot can only be called in the mutator thread");
auto& context = executor_.context();
auto holder = make_unique<GlobalObjectHolder>(*context.memory_->memoryState()->GetThreadData());
auto& holderRef = *holder;
context.globalRoots_.push_back(std::move(holder));
@@ -684,15 +682,15 @@ public:
}
GlobalObjectHolder& AddGlobalRoot(ObjHeader* object) {
RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddGlobalRoot can only be called in the mutator thread");
auto& context = executor_.thread().context();
RuntimeAssert(std::this_thread::get_id() == executor_.threadId(), "AddGlobalRoot can only be called in the mutator thread");
auto& context = executor_.context();
auto holder = make_unique<GlobalObjectHolder>(*context.memory_->memoryState()->GetThreadData(), object);
auto& holderRef = *holder;
context.globalRoots_.push_back(std::move(holder));
return holderRef;
}
KStdVector<ObjHeader*> Alive() { return ::Alive(*executor_.thread().context().memory_->memoryState()->GetThreadData()); }
KStdVector<ObjHeader*> Alive() { return ::Alive(*executor_.context().memory_->memoryState()->GetThreadData()); }
private:
struct Context {
@@ -706,7 +704,7 @@ private:
}
};
SingleThreadExecutor<ThreadWithContext<Context>> executor_;
SingleThreadExecutor<Context> executor_;
};
} // namespace
@@ -286,12 +286,11 @@ using TimePoint = std::chrono::time_point<std::chrono::steady_clock>;
class MutatorThread : private Pinned {
public:
MutatorThread(GCSchedulerConfig& config, std::function<void(GCSchedulerThreadData&)> slowPath) :
executor_(MakeSingleThreadExecutorWithContext<Context>(
[&config, slowPath = std::move(slowPath)] { return Context(config, std::move(slowPath)); })) {}
executor_([&config, slowPath = std::move(slowPath)] { return Context(config, std::move(slowPath)); }) {}
std::future<void> Allocate(size_t bytes) {
return executor_.Execute([&, bytes] {
auto& context = executor_.thread().context();
return executor_.execute([&, bytes] {
auto& context = executor_.context();
context.threadDataTestApi.SetAllocatedBytes(bytes);
context.slowPath(context.threadData);
});
@@ -307,7 +306,7 @@ private:
threadData(config, [](GCSchedulerThreadData&) {}), threadDataTestApi(threadData), slowPath(slowPath) {}
};
SingleThreadExecutor<ThreadWithContext<Context>> executor_;
SingleThreadExecutor<Context> executor_;
};
template <compiler::GCSchedulerType schedulerType, int MutatorCount>
@@ -647,17 +647,15 @@ namespace {
class Mutator : private Pinned {
public:
Mutator() : executor_(MakeSingleThreadExecutorWithContext<Context>()) {}
template <typename F>
[[nodiscard]] std::future<void> Execute(F&& f) {
return executor_.Execute(
[this, f = std::forward<F>(f)] { f(*executor_.thread().context().memory_->memoryState()->GetThreadData(), *this); });
return executor_.execute(
[this, f = std::forward<F>(f)] { f(*executor_.context().memory_->memoryState()->GetThreadData(), *this); });
}
StackObjectHolder& AddStackRoot() {
RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddStackRoot can only be called in the mutator thread");
auto& context = executor_.thread().context();
RuntimeAssert(std::this_thread::get_id() == executor_.threadId(), "AddStackRoot can only be called in the mutator thread");
auto& context = executor_.context();
auto holder = make_unique<StackObjectHolder>(*context.memory_->memoryState()->GetThreadData());
auto& holderRef = *holder;
context.stackRoots_.push_back(std::move(holder));
@@ -665,8 +663,8 @@ public:
}
StackObjectHolder& AddStackRoot(ObjHeader* object) {
RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddStackRoot can only be called in the mutator thread");
auto& context = executor_.thread().context();
RuntimeAssert(std::this_thread::get_id() == executor_.threadId(), "AddStackRoot can only be called in the mutator thread");
auto& context = executor_.context();
auto holder = make_unique<StackObjectHolder>(object);
auto& holderRef = *holder;
context.stackRoots_.push_back(std::move(holder));
@@ -674,8 +672,8 @@ public:
}
GlobalObjectHolder& AddGlobalRoot() {
RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddGlobalRoot can only be called in the mutator thread");
auto& context = executor_.thread().context();
RuntimeAssert(std::this_thread::get_id() == executor_.threadId(), "AddGlobalRoot can only be called in the mutator thread");
auto& context = executor_.context();
auto holder = make_unique<GlobalObjectHolder>(*context.memory_->memoryState()->GetThreadData());
auto& holderRef = *holder;
context.globalRoots_.push_back(std::move(holder));
@@ -683,15 +681,15 @@ public:
}
GlobalObjectHolder& AddGlobalRoot(ObjHeader* object) {
RuntimeAssert(std::this_thread::get_id() == executor_.thread().get_id(), "AddGlobalRoot can only be called in the mutator thread");
auto& context = executor_.thread().context();
RuntimeAssert(std::this_thread::get_id() == executor_.threadId(), "AddGlobalRoot can only be called in the mutator thread");
auto& context = executor_.context();
auto holder = make_unique<GlobalObjectHolder>(*context.memory_->memoryState()->GetThreadData(), object);
auto& holderRef = *holder;
context.globalRoots_.push_back(std::move(holder));
return holderRef;
}
KStdVector<ObjHeader*> Alive() { return ::Alive(*executor_.thread().context().memory_->memoryState()->GetThreadData()); }
KStdVector<ObjHeader*> Alive() { return ::Alive(*executor_.context().memory_->memoryState()->GetThreadData()); }
private:
struct Context {
@@ -705,7 +703,7 @@ private:
}
};
SingleThreadExecutor<ThreadWithContext<Context>> executor_;
SingleThreadExecutor<Context> executor_;
};
} // namespace
@@ -9,6 +9,7 @@
#include <functional>
#include <future>
#include <mutex>
#include <shared_mutex>
#include <thread>
#include "Types.h"
@@ -20,99 +21,19 @@ namespace kotlin {
// Requirements: avoid heap allocations as much as possible.
// TODO: Try to generalize so enough, that Worker.cpp can be written on top of this.
// Requirements: delayed tasks.
// TODO: Makes sense to specialize Context to void when context is unneeded.
// Thread that has a context attached to it. Context is created and destroyed on the thread.
// Thread cannot join before the destructor, because otherwise `context()` will dangle.
// Execute tasks on a single worker thread. `Context` is created and destroyed on the worker thread.
template <typename Context>
class ThreadWithContext : private Pinned {
public:
ThreadWithContext() = default;
template <typename ContextFactory, typename Function, typename... Args>
explicit ThreadWithContext(ContextFactory&& contextFactory, Function&& f, Args&&... args) :
thread_(
[this, contextFactory = std::forward<ContextFactory>(contextFactory), f = std::forward<Function>(f)](
Args&&... args) mutable {
auto context = contextFactory();
{
std::unique_lock guard(startMutex_);
context_ = &context;
}
startCV_.notify_one();
std::invoke(std::forward<Function>(f), std::forward<Args>(args)...);
std::unique_lock guard(stopMutex_);
stopCV_.wait(guard, [this] { return needsShutdown_; });
},
std::forward<Args>(args)...) {}
~ThreadWithContext() {
{
std::unique_lock guard(stopMutex_);
needsShutdown_ = true;
}
stopCV_.notify_one();
thread_.join();
}
// Wait until thread is fully initialized and `context()` is created.
void waitInitialized() noexcept {
std::unique_lock guard(startMutex_);
startCV_.wait(guard, [this] { return context_ != nullptr; });
}
// May only be called after the thread has fully initialized. Use `WaitInitialized()` to be sure.
Context& context() const noexcept {
RuntimeAssert(context_ != nullptr, "context must be set");
return *context_;
}
std::thread::id get_id() const noexcept { return thread_.get_id(); }
private:
std::condition_variable startCV_;
std::mutex startMutex_;
Context* context_ = nullptr;
// Need to keep thread alive for the entire lifetime of this object, because `context_` lifetime
// is bound to the thread.
std::condition_variable stopCV_;
std::mutex stopMutex_;
bool needsShutdown_ = false;
std::thread thread_;
};
// TODO: Replace with `std::jthread`.
// A thread that always joins in the destructor
class JoiningThread : private MoveOnly {
public:
JoiningThread() = default;
template <typename Function, typename... Args>
explicit JoiningThread(Function&& f, Args&&... args) : thread_(std::forward<Function>(f), std::forward<Args>(args)...) {}
~JoiningThread() { thread_.join(); }
std::thread::id get_id() const noexcept { return thread_.get_id(); }
private:
std::thread thread_;
};
// Execute tasks on a single worker thread.
// `Thread` must join in the destructor.
template <typename Thread>
class SingleThreadExecutor : private Pinned {
public:
// Starts the worker thread immediately.
template <typename ThreadFactory>
explicit SingleThreadExecutor(ThreadFactory&& threadFactory) noexcept :
thread_(std::forward<ThreadFactory>(threadFactory)(&SingleThreadExecutor::RunLoop, this)) {}
template <typename ContextFactory>
explicit SingleThreadExecutor(ContextFactory&& contextFactory) noexcept :
thread_(&SingleThreadExecutor::runLoop<ContextFactory>, this, std::forward<ContextFactory>(contextFactory)) {}
SingleThreadExecutor() noexcept :
SingleThreadExecutor([](auto&& function, auto&&... args) {
return Thread(std::forward<decltype(function)>(function), std::forward<decltype(args)>(args)...);
}) {}
// Starts the worker thread immediately.
SingleThreadExecutor() noexcept : SingleThreadExecutor([] { return Context(); }) {}
~SingleThreadExecutor() {
{
@@ -122,15 +43,24 @@ public:
shutdownRequested_ = true;
}
workCV_.notify_one();
thread_.join();
}
Thread& thread() noexcept { return thread_; }
// May lock until the context is created by the worker thread.
Context& context() const noexcept {
std::shared_lock guard(contextMutex_);
contextCV_.wait(guard, [this] { return context_ != nullptr; });
return *context_;
}
// Id of the worker thread.
std::thread::id threadId() const noexcept { return thread_.get_id(); }
// Schedule task execution on the worker thread. The returned future is resolved when the task has completed.
// If `this` is destroyed before the task manages to complete, the returned future will fail with exception upon `.get()`.
// If the task moves the runtime into a runnable state, it should move it back into the native state.
template <typename Task>
[[nodiscard]] std::future<void> Execute(Task&& f) noexcept {
[[nodiscard]] std::future<void> execute(Task&& f) noexcept {
std::packaged_task<void()> task(std::forward<Task>(f));
auto future = task.get_future();
{
@@ -142,7 +72,14 @@ public:
}
private:
void RunLoop() noexcept {
template <typename ContextFactory>
void runLoop(ContextFactory&& contextFactory) noexcept {
auto context = contextFactory();
{
std::unique_lock guard(contextMutex_);
context_ = &context;
}
contextCV_.notify_all();
while (true) {
std::packaged_task<void()> task;
{
@@ -158,27 +95,16 @@ private:
}
}
mutable std::condition_variable_any contextCV_;
mutable std::shared_mutex contextMutex_;
Context* context_ = nullptr;
std::condition_variable workCV_;
std::mutex workMutex_;
KStdDeque<std::packaged_task<void()>> queue_;
bool shutdownRequested_ = false;
Thread thread_;
std::thread thread_;
};
template <typename Context, typename ContextFactory>
SingleThreadExecutor<ThreadWithContext<Context>> MakeSingleThreadExecutorWithContext(ContextFactory&& contextFactory) noexcept {
return SingleThreadExecutor<ThreadWithContext<Context>>(
[contextFactory = std::forward<ContextFactory>(contextFactory)](auto&& function, auto&&... args) mutable {
return ThreadWithContext<Context>(
std::forward<ContextFactory>(contextFactory), std::forward<decltype(function)>(function),
std::forward<decltype(args)>(args)...);
});
}
template <typename Context>
SingleThreadExecutor<ThreadWithContext<Context>> MakeSingleThreadExecutorWithContext() noexcept {
return MakeSingleThreadExecutorWithContext<Context>([] { return Context(); });
}
} // namespace kotlin
@@ -52,32 +52,34 @@ testing::MockFunction<void(PinnedContext&)>* PinnedContext::dtorMock = nullptr;
} // namespace
TEST(ThreadWithContextTest, ContextThreadBound) {
TEST(SingleThreadExecutorTest, ContextThreadBound) {
PinnedContext::ScopedMocks mocks;
PinnedContext* createdContext = nullptr;
std::thread::id createdThread;
testing::StrictMock<testing::MockFunction<void()>> function;
EXPECT_CALL(mocks.ctorMock, Call(_)).WillOnce([&](PinnedContext& context) {
createdContext = &context;
createdThread = std::this_thread::get_id();
});
EXPECT_CALL(function, Call()).WillOnce([&] { EXPECT_THAT(std::this_thread::get_id(), createdThread); });
auto thread = ::make_unique<ThreadWithContext<PinnedContext>>([] { return PinnedContext(); }, function.AsStdFunction());
thread->waitInitialized();
auto executor = ::make_unique<SingleThreadExecutor<PinnedContext>>();
// Make sure context is created.
executor->context();
testing::Mock::VerifyAndClearExpectations(&mocks.ctorMock);
EXPECT_THAT(createdThread, thread->get_id());
EXPECT_THAT(thread->context(), testing::Ref(*createdContext));
EXPECT_THAT(createdThread, executor->threadId());
EXPECT_THAT(executor->context(), testing::Ref(*createdContext));
testing::StrictMock<testing::MockFunction<void()>> task;
EXPECT_CALL(task, Call()).WillOnce([&] { EXPECT_THAT(std::this_thread::get_id(), createdThread); });
executor->execute(task.AsStdFunction()).get();
testing::Mock::VerifyAndClearExpectations(&task);
EXPECT_CALL(mocks.dtorMock, Call(testing::Ref(*createdContext))).WillOnce([&] {
EXPECT_THAT(std::this_thread::get_id(), createdThread);
});
thread.reset();
// The function is expected to be called at some point between `waitInitialized` and the thread exit.
testing::Mock::VerifyAndClearExpectations(&function);
executor.reset();
testing::Mock::VerifyAndClearExpectations(&mocks.dtorMock);
}
TEST(ThreadWithContextTest, WaitInitialized) {
TEST(SingleThreadExecutorTest, WaitContext) {
PinnedContext::ScopedMocks mocks;
PinnedContext* createdContext = nullptr;
std::mutex ctorMutex;
@@ -86,41 +88,34 @@ TEST(ThreadWithContextTest, WaitInitialized) {
createdContext = &context;
});
testing::StrictMock<testing::MockFunction<void()>> function;
EXPECT_CALL(function, Call()).Times(0);
ctorMutex.lock();
auto thread = ::make_unique<ThreadWithContext<PinnedContext>>([] { return PinnedContext(); }, function.AsStdFunction());
SingleThreadExecutor<PinnedContext> executor;
std::atomic_bool initialized = false;
std::thread initializedWaiter([&] {
thread->waitInitialized();
initialized = true;
});
auto future = executor.execute([] {});
std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_THAT(initialized.load(), false);
testing::Mock::VerifyAndClearExpectations(&function);
auto futureStatus = future.wait_for(std::chrono::milliseconds(10));
EXPECT_THAT(futureStatus, std::future_status::timeout);
EXPECT_CALL(function, Call());
ctorMutex.unlock();
initializedWaiter.join();
// Wait for `thread` to initialize.
executor.context();
future.get();
testing::Mock::VerifyAndClearExpectations(&mocks.ctorMock);
testing::Mock::VerifyAndClearExpectations(&function);
EXPECT_THAT(initialized.load(), true);
EXPECT_THAT(thread->context(), testing::Ref(*createdContext));
EXPECT_THAT(executor.context(), testing::Ref(*createdContext));
EXPECT_CALL(mocks.dtorMock, Call(testing::Ref(*createdContext)));
}
TEST(SingleThreadExecutorTest, Execute) {
SingleThreadExecutor<JoiningThread> executor;
TEST(SingleThreadExecutorTest, execute) {
struct Context {};
SingleThreadExecutor<Context> executor;
std::mutex taskMutex;
testing::StrictMock<testing::MockFunction<void()>> task;
EXPECT_CALL(task, Call()).WillOnce([&] { std::unique_lock guard(taskMutex); });
taskMutex.lock();
auto future = executor.Execute(task.AsStdFunction());
auto future = executor.execute(task.AsStdFunction());
auto futureStatus = future.wait_for(std::chrono::milliseconds(10));
EXPECT_THAT(futureStatus, std::future_status::timeout);
@@ -131,7 +126,8 @@ TEST(SingleThreadExecutorTest, Execute) {
}
TEST(SingleThreadExecutorTest, DropExecutorWithTasks) {
auto executor = make_unique<SingleThreadExecutor<JoiningThread>>();
struct Context {};
auto executor = make_unique<SingleThreadExecutor<Context>>();
std::mutex taskMutex;
testing::StrictMock<testing::MockFunction<void()>> task;
@@ -142,13 +138,13 @@ TEST(SingleThreadExecutorTest, DropExecutorWithTasks) {
std::unique_lock guard(taskMutex);
});
taskMutex.lock();
auto future = executor->Execute(task.AsStdFunction());
auto future = executor->execute(task.AsStdFunction());
while (!taskStarted) {}
KStdVector<std::pair<std::future<void>, bool>> newTasks;
constexpr size_t tasksCount = 100;
for (size_t i = 0; i < tasksCount; ++i) {
newTasks.push_back(std::make_pair(executor->Execute([&newTasks, i] { newTasks[i].second = true; }), false));
newTasks.push_back(std::make_pair(executor->execute([&newTasks, i] { newTasks[i].second = true; }), false));
}
taskMutex.unlock();
@@ -171,7 +167,7 @@ TEST(SingleThreadExecutorTest, ExecuteFromManyThreads) {
struct Context {
KStdVector<int> result;
};
auto executor = MakeSingleThreadExecutorWithContext<Context>();
SingleThreadExecutor<Context> executor;
std::atomic_bool canStart = false;
@@ -182,7 +178,7 @@ TEST(SingleThreadExecutorTest, ExecuteFromManyThreads) {
threads.emplace_back([&, i] {
while (!canStart) {
}
executor.Execute([&] { executor.thread().context().result.push_back(i); }).get();
executor.execute([&] { executor.context().result.push_back(i); }).get();
});
}
@@ -192,5 +188,5 @@ TEST(SingleThreadExecutorTest, ExecuteFromManyThreads) {
thread.join();
}
EXPECT_THAT(executor.thread().context().result, testing::UnorderedElementsAreArray(expected));
EXPECT_THAT(executor.context().result, testing::UnorderedElementsAreArray(expected));
}