diff --git a/kotlin-native/runtime/src/custom_alloc/cpp/AtomicStack.hpp b/kotlin-native/runtime/src/custom_alloc/cpp/AtomicStack.hpp index 7db4f438de9..81c02ff064c 100644 --- a/kotlin-native/runtime/src/custom_alloc/cpp/AtomicStack.hpp +++ b/kotlin-native/runtime/src/custom_alloc/cpp/AtomicStack.hpp @@ -67,7 +67,7 @@ public: } while (!stack_.compare_exchange_weak(thisHead, otherHead, std::memory_order_acq_rel)); } - bool isEmpty() noexcept { return stack_.load(std::memory_order_relaxed) == nullptr; } + bool isEmpty() const noexcept { return stack_.load(std::memory_order_relaxed) == nullptr; } // Not thread-safe. Named like this to make AtomicStack compatible with FinalizerQueue size_t size() { diff --git a/kotlin-native/runtime/src/custom_alloc/cpp/CustomFinalizerProcessor.cpp b/kotlin-native/runtime/src/custom_alloc/cpp/CustomFinalizerProcessor.cpp deleted file mode 100644 index 3aa5bb17ccb..00000000000 --- a/kotlin-native/runtime/src/custom_alloc/cpp/CustomFinalizerProcessor.cpp +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright 2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license - * that can be found in the LICENSE file. - */ - -#include "CustomFinalizerProcessor.hpp" - -#include -#include -#include - -#include "AtomicStack.hpp" -#include "CustomLogging.hpp" -#include "ExtraObjectData.hpp" -#include "FinalizerHooks.hpp" -#include "Memory.h" -#include "Runtime.h" - -namespace kotlin::alloc { - -void CustomFinalizerProcessor::StartFinalizerThreadIfNone() noexcept { - CustomAllocDebug("CustomFinalizerProcessor::StartFinalizerThreadIfNone()"); - std::unique_lock guard(threadCreatingMutex_); - if (finalizerThread_.joinable()) return; - - finalizerThread_ = ScopedThread(ScopedThread::attributes().name("Custom finalizer processor"), [this] { - Kotlin_initRuntimeIfNeeded(); - { - std::unique_lock guard(initializedMutex_); - initialized_ = true; - } - initializedCondVar_.notify_all(); - int64_t finalizersEpoch = 0; - while (true) { - std::unique_lock lock(finalizerQueueMutex_); - finalizerQueueCondVar_.wait(lock, [this, &finalizersEpoch] { - return !finalizerQueue_.isEmpty() || finalizerQueueEpoch_ != finalizersEpoch || shutdownFlag_; - }); - if (finalizerQueue_.isEmpty() && finalizerQueueEpoch_ == finalizersEpoch) { - newTasksAllowed_ = false; - RuntimeAssert(shutdownFlag_, "Nothing to do, but no shutdownFlag_ is set on wakeup"); - break; - } - auto queue = std::move(finalizerQueue_); - finalizersEpoch = finalizerQueueEpoch_; - lock.unlock(); - { - ThreadStateGuard guard(ThreadState::kRunnable); - while (auto* cell = queue.Pop()) { - auto* extraObject = cell->Data(); - auto* baseObject = extraObject->GetBaseObject(); - RunFinalizers(baseObject); - } - } - epochDoneCallback_(finalizersEpoch); - } - { - std::unique_lock guard(initializedMutex_); - initialized_ = false; - } - initializedCondVar_.notify_all(); - }); -} - -void CustomFinalizerProcessor::StopFinalizerThread() noexcept { - CustomAllocDebug("CustomFinalizerProcessor::StopFinalizerThread()"); - { - std::unique_lock guard(finalizerQueueMutex_); - if (!finalizerThread_.joinable()) return; - shutdownFlag_ = true; - finalizerQueueCondVar_.notify_all(); - } - finalizerThread_.join(); - shutdownFlag_ = false; - RuntimeAssert(finalizerQueue_.isEmpty(), "Finalizer queue should be empty when killing finalizer thread"); - std::unique_lock guard(finalizerQueueMutex_); - newTasksAllowed_ = true; - finalizerQueueCondVar_.notify_all(); -} - -void CustomFinalizerProcessor::ScheduleTasks(Queue&& tasks, int64_t epoch) noexcept { - std::unique_lock guard(finalizerQueueMutex_); - if (tasks.isEmpty() && !IsRunning()) { - epochDoneCallback_(epoch); - return; - } - StartFinalizerThreadIfNone(); - finalizerQueueCondVar_.wait(guard, [this] { return newTasksAllowed_; }); - finalizerQueue_.TransferAllFrom(std::move(tasks)); - finalizerQueueEpoch_ = epoch; - finalizerQueueCondVar_.notify_all(); -} - -bool CustomFinalizerProcessor::IsRunning() noexcept { - return finalizerThread_.joinable(); -} - -void CustomFinalizerProcessor::WaitFinalizerThreadInitialized() noexcept { - CustomAllocDebug("CustomFinalizerProcessor::WaitFinalizerThreadInitialized()"); - std::unique_lock guard(initializedMutex_); - initializedCondVar_.wait(guard, [this] { return initialized_; }); -} - -CustomFinalizerProcessor::~CustomFinalizerProcessor() { - StopFinalizerThread(); -} - -} // namespace kotlin::alloc diff --git a/kotlin-native/runtime/src/custom_alloc/cpp/CustomFinalizerProcessor.hpp b/kotlin-native/runtime/src/custom_alloc/cpp/CustomFinalizerProcessor.hpp index 9d2d7dea15a..b87205b1667 100644 --- a/kotlin-native/runtime/src/custom_alloc/cpp/CustomFinalizerProcessor.hpp +++ b/kotlin-native/runtime/src/custom_alloc/cpp/CustomFinalizerProcessor.hpp @@ -6,41 +6,27 @@ #ifndef CUSTOM_ALLOC_CPP_CUSTOMFINALIZERPROCESSOR_HPP_ #define CUSTOM_ALLOC_CPP_CUSTOMFINALIZERPROCESSOR_HPP_ -#include -#include - #include "AtomicStack.hpp" +#include "ExtraObjectData.hpp" #include "ExtraObjectPage.hpp" -#include "ScopedThread.hpp" +#include "FinalizerHooks.hpp" namespace kotlin::alloc { -class CustomFinalizerProcessor : Pinned { -public: - using Queue = typename kotlin::alloc::AtomicStack; - explicit CustomFinalizerProcessor(std::function epochDoneCallback) : epochDoneCallback_(std::move(epochDoneCallback)) {} - void ScheduleTasks(Queue&& tasks, int64_t epoch) noexcept; - void StopFinalizerThread() noexcept; - bool IsRunning() noexcept; - void StartFinalizerThreadIfNone() noexcept; - void WaitFinalizerThreadInitialized() noexcept; - ~CustomFinalizerProcessor(); +using FinalizerQueue = kotlin::alloc::AtomicStack; -private: - ScopedThread finalizerThread_; - Queue finalizerQueue_; - std::condition_variable finalizerQueueCondVar_; - std::mutex finalizerQueueMutex_; - std::function epochDoneCallback_; - int64_t finalizerQueueEpoch_ = 0; - bool shutdownFlag_ = false; - bool newTasksAllowed_ = true; +struct FinalizerQueueTraits { + static bool isEmpty(const FinalizerQueue& queue) noexcept { return queue.isEmpty(); } - std::mutex initializedMutex_; - std::condition_variable initializedCondVar_; - bool initialized_ = false; + static void add(FinalizerQueue& into, FinalizerQueue from) noexcept { into.TransferAllFrom(std::move(from)); } - std::mutex threadCreatingMutex_; + static void process(FinalizerQueue queue) noexcept { + while (auto* cell = queue.Pop()) { + auto* extraObject = cell->Data(); + auto* baseObject = extraObject->GetBaseObject(); + RunFinalizers(baseObject); + } + } }; } // namespace kotlin::alloc diff --git a/kotlin-native/runtime/src/gc/cms/cpp/ConcurrentMarkAndSweep.cpp b/kotlin-native/runtime/src/gc/cms/cpp/ConcurrentMarkAndSweep.cpp index a02706f1b0a..6ddaf931974 100644 --- a/kotlin-native/runtime/src/gc/cms/cpp/ConcurrentMarkAndSweep.cpp +++ b/kotlin-native/runtime/src/gc/cms/cpp/ConcurrentMarkAndSweep.cpp @@ -19,11 +19,9 @@ #include "ThreadRegistry.hpp" #include "ThreadSuspension.hpp" #include "GCState.hpp" -#include "FinalizerProcessor.hpp" #include "GCStatistics.hpp" #ifdef CUSTOM_ALLOCATOR -#include "CustomFinalizerProcessor.hpp" #include "Heap.hpp" #endif @@ -113,15 +111,14 @@ void gc::ConcurrentMarkAndSweep::ThreadData::OnSuspendForGC() noexcept { gc::ConcurrentMarkAndSweep::ConcurrentMarkAndSweep( mm::ObjectFactory& objectFactory, GCScheduler& gcScheduler) noexcept : objectFactory_(objectFactory), - gcScheduler_(gcScheduler), - finalizerProcessor_(std_support::make_unique([this](int64_t epoch) { #else gc::ConcurrentMarkAndSweep::ConcurrentMarkAndSweep(GCScheduler& gcScheduler) noexcept : - gcScheduler_(gcScheduler), finalizerProcessor_(std_support::make_unique([this](int64_t epoch) { #endif + gcScheduler_(gcScheduler), + finalizerProcessor_([this](int64_t epoch) { GCHandle::getByEpoch(epoch).finalizersDone(); state_.finalized(epoch); - })) { + }) { gcScheduler_.SetScheduleGC([this]() NO_INLINE { // This call acquires a lock, so we need to ensure that we're in the safe state. NativeOrUnregisteredThreadGuard guard(/* reentrant = */ true); @@ -147,31 +144,31 @@ gc::ConcurrentMarkAndSweep::~ConcurrentMarkAndSweep() { void gc::ConcurrentMarkAndSweep::StartFinalizerThreadIfNeeded() noexcept { NativeOrUnregisteredThreadGuard guard(true); - finalizerProcessor_->StartFinalizerThreadIfNone(); - finalizerProcessor_->WaitFinalizerThreadInitialized(); + finalizerProcessor_.StartFinalizerThreadIfNone(); + finalizerProcessor_.WaitFinalizerThreadInitialized(); } void gc::ConcurrentMarkAndSweep::StopFinalizerThreadIfRunning() noexcept { NativeOrUnregisteredThreadGuard guard(true); - finalizerProcessor_->StopFinalizerThread(); + finalizerProcessor_.StopFinalizerThread(); } bool gc::ConcurrentMarkAndSweep::FinalizersThreadIsRunning() noexcept { - return finalizerProcessor_->IsRunning(); + return finalizerProcessor_.IsRunning(); } void gc::ConcurrentMarkAndSweep::SetMarkingBehaviorForTests(MarkingBehavior markingBehavior) noexcept { markingBehavior_ = markingBehavior; } -bool gc::ConcurrentMarkAndSweep::PerformFullGC(int64_t epoch) noexcept { +void gc::ConcurrentMarkAndSweep::PerformFullGC(int64_t epoch) noexcept { auto gcHandle = GCHandle::create(epoch); SetMarkingRequested(epoch); bool didSuspend = mm::RequestThreadsSuspension(); RuntimeAssert(didSuspend, "Only GC thread can request suspension"); gcHandle.suspensionRequested(); - RuntimeAssert(!kotlin::mm::IsCurrentThreadRegistered(), "Concurrent GC must run on unregistered thread"); + RuntimeAssert(!kotlin::mm::IsCurrentThreadRegistered(), "GC must run on unregistered thread"); WaitForThreadsReadyToMark(); gcHandle.threadsAreSuspended(); @@ -218,11 +215,11 @@ bool gc::ConcurrentMarkAndSweep::PerformFullGC(int64_t epoch) noexcept { state_.finish(epoch); gcHandle.finalizersScheduled(finalizerQueue.size()); gcHandle.finished(); + // This may start a new thread. On some pthreads implementations, this may block waiting for concurrent thread // destructors running. So, it must ensured that no locks are held by this point. // TODO: Consider having an always on sleeping finalizer thread. - finalizerProcessor_->ScheduleTasks(std::move(finalizerQueue), epoch); - return true; + finalizerProcessor_.ScheduleTasks(std::move(finalizerQueue), epoch); } namespace { diff --git a/kotlin-native/runtime/src/gc/cms/cpp/ConcurrentMarkAndSweep.hpp b/kotlin-native/runtime/src/gc/cms/cpp/ConcurrentMarkAndSweep.hpp index c2bc62ed568..5f0ec698697 100644 --- a/kotlin-native/runtime/src/gc/cms/cpp/ConcurrentMarkAndSweep.hpp +++ b/kotlin-native/runtime/src/gc/cms/cpp/ConcurrentMarkAndSweep.hpp @@ -9,7 +9,10 @@ #include #include "Allocator.hpp" +#include "FinalizerProcessor.hpp" #include "GCScheduler.hpp" +#include "GCState.hpp" +#include "GCStatistics.hpp" #include "IntrusiveList.hpp" #include "MarkAndSweepUtils.hpp" #include "ObjectFactory.hpp" @@ -17,26 +20,17 @@ #include "ThreadData.hpp" #include "Types.h" #include "Utils.hpp" -#include "GCState.hpp" #include "std_support/Memory.hpp" -#include "GCStatistics.hpp" #ifdef CUSTOM_ALLOCATOR #include "CustomAllocator.hpp" +#include "CustomFinalizerProcessor.hpp" #include "Heap.hpp" - -namespace kotlin::alloc { -class CustomFinalizerProcessor; -} #endif namespace kotlin { namespace gc { -#ifndef CUSTOM_ALLOCATOR -class FinalizerProcessor; -#endif - // Stop-the-world parallel mark + concurrent sweep. The GC runs in a separate thread, finalizers run in another thread of their own. // TODO: Also make marking run concurrently with Kotlin threads. class ConcurrentMarkAndSweep : private Pinned { @@ -108,6 +102,14 @@ public: using Allocator = ThreadData::Allocator; +#ifndef CUSTOM_ALLOCATOR + using FinalizerQueue = mm::ObjectFactory::FinalizerQueue; + using FinalizerQueueTraits = mm::ObjectFactory::FinalizerQueueTraits; +#else + using FinalizerQueue = alloc::FinalizerQueue; + using FinalizerQueueTraits = alloc::FinalizerQueueTraits; +#endif + #ifdef CUSTOM_ALLOCATOR explicit ConcurrentMarkAndSweep(GCScheduler& scheduler) noexcept; #else @@ -128,8 +130,7 @@ public: #endif private: - // Returns `true` if GC has happened, and `false` if not (because someone else has suspended the threads). - bool PerformFullGC(int64_t epoch) noexcept; + void PerformFullGC(int64_t epoch) noexcept; #ifndef CUSTOM_ALLOCATOR mm::ObjectFactory& objectFactory_; @@ -140,11 +141,7 @@ private: GCStateHolder state_; ScopedThread gcThread_; -#ifndef CUSTOM_ALLOCATOR - std_support::unique_ptr finalizerProcessor_; -#else - std_support::unique_ptr finalizerProcessor_; -#endif + FinalizerProcessor finalizerProcessor_; MarkQueue markQueue_; MarkingBehavior markingBehavior_; diff --git a/kotlin-native/runtime/src/gc/cms/cpp/FinalizerProcessor.cpp b/kotlin-native/runtime/src/gc/cms/cpp/FinalizerProcessor.cpp deleted file mode 100644 index 74f09cb3257..00000000000 --- a/kotlin-native/runtime/src/gc/cms/cpp/FinalizerProcessor.cpp +++ /dev/null @@ -1,89 +0,0 @@ -/* -* Copyright 2010-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license -* that can be found in the LICENSE file. -*/ - -#include "FinalizerProcessor.hpp" -#include "ObjectFactory.hpp" -#include "Runtime.h" - - -void kotlin::gc::FinalizerProcessor::StartFinalizerThreadIfNone() noexcept { - std::unique_lock guard(threadCreatingMutex_); - if (finalizerThread_.joinable()) return; - - finalizerThread_ = ScopedThread(ScopedThread::attributes().name("GC finalizer processor"), [this] { - Kotlin_initRuntimeIfNeeded(); - { - std::unique_lock guard(initializedMutex_); - initialized_ = true; - } - initializedCondVar_.notify_all(); - int64_t finalizersEpoch = 0; - while (true) { - std::unique_lock lock(finalizerQueueMutex_); - finalizerQueueCondVar_.wait(lock, [this, &finalizersEpoch] { - return finalizerQueue_.size() > 0 || finalizerQueueEpoch_ != finalizersEpoch || shutdownFlag_; - }); - if (finalizerQueue_.size() == 0 && finalizerQueueEpoch_ == finalizersEpoch) { - newTasksAllowed_ = false; - RuntimeAssert(shutdownFlag_, "Nothing to do, but no shutdownFlag_ is set on wakeup"); - break; - } - auto queue = std::move(finalizerQueue_); - finalizersEpoch = finalizerQueueEpoch_; - lock.unlock(); - if (queue.size() > 0) { - ThreadStateGuard guard(ThreadState::kRunnable); - queue.Finalize(); - } - epochDoneCallback_(finalizersEpoch); - } - { - std::unique_lock guard(initializedMutex_); - initialized_ = false; - } - initializedCondVar_.notify_all(); - }); -} - -void kotlin::gc::FinalizerProcessor::StopFinalizerThread() noexcept { - { - std::unique_lock guard(finalizerQueueMutex_); - if (!finalizerThread_.joinable()) return; - shutdownFlag_ = true; - finalizerQueueCondVar_.notify_all(); - } - finalizerThread_.join(); - shutdownFlag_ = false; - RuntimeAssert(finalizerQueue_.size() == 0, "Finalizer queue should be empty when killing finalizer thread"); - std::unique_lock guard(finalizerQueueMutex_); - newTasksAllowed_ = true; - finalizerQueueCondVar_.notify_all(); -} - -void kotlin::gc::FinalizerProcessor::ScheduleTasks(Queue&& tasks, int64_t epoch) noexcept { - std::unique_lock guard(finalizerQueueMutex_); - if (tasks.size() == 0 && !IsRunning()) { - epochDoneCallback_(epoch); - return; - } - finalizerQueueCondVar_.wait(guard, [this] { return newTasksAllowed_; }); - StartFinalizerThreadIfNone(); - finalizerQueue_.MergeWith(std::move(tasks)); - finalizerQueueEpoch_ = epoch; - finalizerQueueCondVar_.notify_all(); -} - -bool kotlin::gc::FinalizerProcessor::IsRunning() noexcept { - return finalizerThread_.joinable(); -} - -void kotlin::gc::FinalizerProcessor::WaitFinalizerThreadInitialized() noexcept { - std::unique_lock guard(initializedMutex_); - initializedCondVar_.wait(guard, [this] { return initialized_; }); -} - -kotlin::gc::FinalizerProcessor::~FinalizerProcessor() { - StopFinalizerThread(); -} diff --git a/kotlin-native/runtime/src/gc/cms/cpp/FinalizerProcessor.hpp b/kotlin-native/runtime/src/gc/cms/cpp/FinalizerProcessor.hpp deleted file mode 100644 index f20acfc1b59..00000000000 --- a/kotlin-native/runtime/src/gc/cms/cpp/FinalizerProcessor.hpp +++ /dev/null @@ -1,45 +0,0 @@ -/* -* Copyright 2010-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license -* that can be found in the LICENSE file. -*/ - -#pragma once - -#include "ConcurrentMarkAndSweep.hpp" -#include "GCState.hpp" -#include "ObjectFactory.hpp" -#include "ScopedThread.hpp" - -namespace kotlin::gc { - -class FinalizerProcessor : Pinned { -public: - using Queue = typename kotlin::mm::ObjectFactory::FinalizerQueue; - // epochDoneCallback could be called on any subset of them. - // If no new tasks are set, epochDoneCallback will be eventually called on last epoch - explicit FinalizerProcessor(std::function epochDoneCallback): epochDoneCallback_(std::move(epochDoneCallback)) {} - void ScheduleTasks(Queue&& tasks, int64_t epoch) noexcept; - void StopFinalizerThread() noexcept; - bool IsRunning() noexcept; - void StartFinalizerThreadIfNone() noexcept; - void WaitFinalizerThreadInitialized() noexcept; - ~FinalizerProcessor(); - -private: - ScopedThread finalizerThread_; - Queue finalizerQueue_; - std::condition_variable finalizerQueueCondVar_; - std::mutex finalizerQueueMutex_; - std::function epochDoneCallback_; - int64_t finalizerQueueEpoch_ = 0; - bool shutdownFlag_ = false; - bool newTasksAllowed_ = true; - - std::mutex initializedMutex_; - std::condition_variable initializedCondVar_; - bool initialized_ = false; - - std::mutex threadCreatingMutex_; -}; - -} // namespace kotlin::gc diff --git a/kotlin-native/runtime/src/gc/cms/cpp/FinalizerProcessorTest.cpp b/kotlin-native/runtime/src/gc/cms/cpp/FinalizerProcessorTest.cpp deleted file mode 100644 index 50ed0788561..00000000000 --- a/kotlin-native/runtime/src/gc/cms/cpp/FinalizerProcessorTest.cpp +++ /dev/null @@ -1,147 +0,0 @@ -/* -* Copyright 2010-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license -* that can be found in the LICENSE file. -*/ - -#include "ConcurrentMarkAndSweep.hpp" - -#include -#include -#include -#include - -#include "gmock/gmock.h" -#include "gtest/gtest.h" - -#include "ExtraObjectData.hpp" -#include "FinalizerHooksTestSupport.hpp" -#include "GCImpl.hpp" -#include "GlobalData.hpp" -#include "ObjectOps.hpp" -#include "ObjectTestSupport.hpp" -#include "TestSupport.hpp" -#include "ThreadData.hpp" -#include "FinalizerProcessor.hpp" - -using namespace kotlin; - -// These tests can only work if `GC` is `ConcurrentMarkAndSweep`. - -namespace { - -struct Payload { - ObjHeader* field1; - ObjHeader* field2; - ObjHeader* field3; - - static constexpr std::array kFields = { - &Payload::field1, - &Payload::field2, - &Payload::field3, - }; -}; - -test_support::TypeInfoHolder typeHolder{test_support::TypeInfoHolder::ObjectBuilder()}; -test_support::TypeInfoHolder typeHolderWithFinalizer{test_support::TypeInfoHolder::ObjectBuilder().addFlag(TF_HAS_FINALIZER)}; - -test_support::Object& AllocateObjectWithFinalizer(mm::ThreadData& threadData) { - ObjHolder holder; - mm::AllocateObject(&threadData, typeHolderWithFinalizer.typeInfo(), holder.slot()); - return test_support::Object::FromObjHeader(holder.obj()); -} - -class FinalizerProcessorTest : public testing::Test { -public: - - ~FinalizerProcessorTest() { - mm::GlobalsRegistry::Instance().ClearForTests(); - mm::GlobalData::Instance().extraObjectDataFactory().ClearForTests(); - mm::GlobalData::Instance().gc().ClearForTests(); - } - - testing::MockFunction& finalizerHook() { return finalizerHooks_.finalizerHook(); } - -private: - FinalizerHooksTestSupport finalizerHooks_; -}; - -int threadsCount() { - int result = 0; - for (auto &thread: mm::ThreadRegistry::Instance().LockForIter()) { - static_cast(thread); // to avoid unused warning - result++; - } - return result; -}; - -} // namespace - -TEST_F(FinalizerProcessorTest, NotRunningThreadWhenUnused) { - GCStateHolder state; - gc::FinalizerProcessor processor([](int64_t) {}); - ASSERT_EQ(threadsCount(), 0); - ASSERT_FALSE(processor.IsRunning()); - mm::ObjectFactory::FinalizerQueue queue; - processor.ScheduleTasks(std::move(queue), 1); - ASSERT_EQ(threadsCount(), 0); - ASSERT_FALSE(processor.IsRunning()); -} - -TEST_F(FinalizerProcessorTest, RemoveObject) { - RunInNewThread([this] { - ASSERT_EQ(threadsCount(), 1); - std::atomic done = 0; - gc::FinalizerProcessor processor([&](int64_t epoch) { done = epoch; }); - mm::ObjectFactory::FinalizerQueue queue; - auto &object = AllocateObjectWithFinalizer(*mm::ThreadRegistry::Instance().CurrentThreadData()); - mm::ThreadRegistry::Instance().CurrentThreadData()->Publish(); - auto& factory = mm::GlobalData::Instance().gc().impl().objectFactory(); - auto iter = factory.LockForIter(); - auto iterator = iter.begin(); - iter.MoveAndAdvance(queue, iterator); - ASSERT_EQ(queue.size(), 1u); - EXPECT_CALL(finalizerHook(), Call(object.header())); - processor.ScheduleTasks(std::move(queue), 1); - while (done != 1) {} - ASSERT_EQ(threadsCount(), 2); - ASSERT_TRUE(processor.IsRunning()); - processor.StopFinalizerThread(); - ASSERT_EQ(threadsCount(), 1); - }); -} - -TEST_F(FinalizerProcessorTest, ScheduleTasksWhileFinalizing) { - RunInNewThread([this] { - std::atomic done = 0; - gc::FinalizerProcessor processor([&done](int64_t epoch) { done = epoch; }); - std::vector::FinalizerQueue> queues; - int epochs = 100; - std::vector headers; - for (int epoch = 0; epoch < epochs; epoch++) { - for (int i = 0; i < 10; i++) { - auto& object = AllocateObjectWithFinalizer(*mm::ThreadRegistry::Instance().CurrentThreadData()); - headers.push_back(object.header()); - } - auto& factory = mm::GlobalData::Instance().gc().impl().objectFactory(); - mm::ThreadRegistry::Instance().CurrentThreadData()->Publish(); - auto iter = factory.LockForIter(); - mm::ObjectFactory::FinalizerQueue queue; - for (auto iterator = iter.begin(); iterator != iter.end();) { - iter.MoveAndAdvance(queue, iterator); - } - queues.push_back(std::move(queue)); - } - for (auto header: headers) { - EXPECT_CALL(finalizerHook(), Call(header)); - } - for (int epoch = 0; epoch < epochs; epoch++) { - processor.ScheduleTasks(std::move(queues[epoch]), epoch + 1); - } - while (done != epochs) {} - ASSERT_EQ(threadsCount(), 2); - ASSERT_TRUE(processor.IsRunning()); - processor.StopFinalizerThread(); - ASSERT_EQ(threadsCount(), 1); - }); -} - diff --git a/kotlin-native/runtime/src/gc/common/cpp/FinalizerProcessor.hpp b/kotlin-native/runtime/src/gc/common/cpp/FinalizerProcessor.hpp new file mode 100644 index 00000000000..7333e1b2770 --- /dev/null +++ b/kotlin-native/runtime/src/gc/common/cpp/FinalizerProcessor.hpp @@ -0,0 +1,124 @@ +/* + * Copyright 2010-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license + * that can be found in the LICENSE file. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "KAssert.h" +#include "Memory.h" +#include "ObjectFactory.hpp" +#include "Runtime.h" +#include "ScopedThread.hpp" +#include "Utils.hpp" + +namespace kotlin::gc { + +template +class FinalizerProcessor : private Pinned { +public: + // epochDoneCallback could be called on any subset of them. + // If no new tasks are set, epochDoneCallback will be eventually called on last epoch + explicit FinalizerProcessor(std::function epochDoneCallback) noexcept : + epochDoneCallback_(std::move(epochDoneCallback)) {} + + ~FinalizerProcessor() { StopFinalizerThread(); } + + void ScheduleTasks(FinalizerQueue tasks, int64_t epoch) noexcept { + std::unique_lock guard(finalizerQueueMutex_); + if (FinalizerQueueTraits::isEmpty(tasks) && !IsRunning()) { + epochDoneCallback_(epoch); + return; + } + finalizerQueueCondVar_.wait(guard, [this] { return newTasksAllowed_; }); + StartFinalizerThreadIfNone(); + FinalizerQueueTraits::add(finalizerQueue_, std::move(tasks)); + finalizerQueueEpoch_ = epoch; + finalizerQueueCondVar_.notify_all(); + } + + void StopFinalizerThread() noexcept { + { + std::unique_lock guard(finalizerQueueMutex_); + if (!finalizerThread_.joinable()) return; + shutdownFlag_ = true; + finalizerQueueCondVar_.notify_all(); + } + finalizerThread_.join(); + shutdownFlag_ = false; + RuntimeAssert(FinalizerQueueTraits::isEmpty(finalizerQueue_), "Finalizer queue should be empty when killing finalizer thread"); + std::unique_lock guard(finalizerQueueMutex_); + newTasksAllowed_ = true; + finalizerQueueCondVar_.notify_all(); + } + + bool IsRunning() const noexcept { return finalizerThread_.joinable(); } + + void StartFinalizerThreadIfNone() noexcept { + std::unique_lock guard(threadCreatingMutex_); + if (finalizerThread_.joinable()) return; + + finalizerThread_ = ScopedThread(ScopedThread::attributes().name("GC finalizer processor"), [this] { + Kotlin_initRuntimeIfNeeded(); + { + std::unique_lock guard(initializedMutex_); + initialized_ = true; + } + initializedCondVar_.notify_all(); + int64_t finalizersEpoch = 0; + while (true) { + std::unique_lock lock(finalizerQueueMutex_); + finalizerQueueCondVar_.wait(lock, [this, &finalizersEpoch] { + return !FinalizerQueueTraits::isEmpty(finalizerQueue_) || finalizerQueueEpoch_ != finalizersEpoch || shutdownFlag_; + }); + if (FinalizerQueueTraits::isEmpty(finalizerQueue_) && finalizerQueueEpoch_ == finalizersEpoch) { + newTasksAllowed_ = false; + RuntimeAssert(shutdownFlag_, "Nothing to do, but no shutdownFlag_ is set on wakeup"); + break; + } + auto queue = std::move(finalizerQueue_); + finalizersEpoch = finalizerQueueEpoch_; + lock.unlock(); + if (!FinalizerQueueTraits::isEmpty(queue)) { + ThreadStateGuard guard(ThreadState::kRunnable); + FinalizerQueueTraits::process(std::move(queue)); + } + epochDoneCallback_(finalizersEpoch); + } + { + std::unique_lock guard(initializedMutex_); + initialized_ = false; + } + initializedCondVar_.notify_all(); + }); + } + + void WaitFinalizerThreadInitialized() noexcept { + std::unique_lock guard(initializedMutex_); + initializedCondVar_.wait(guard, [this] { return initialized_; }); + } + +private: + ScopedThread finalizerThread_; + FinalizerQueue finalizerQueue_; + std::condition_variable finalizerQueueCondVar_; + std::mutex finalizerQueueMutex_; + std::function epochDoneCallback_; + int64_t finalizerQueueEpoch_ = 0; + bool shutdownFlag_ = false; + bool newTasksAllowed_ = true; + + std::mutex initializedMutex_; + std::condition_variable initializedCondVar_; + bool initialized_ = false; + + std::mutex threadCreatingMutex_; +}; + +} // namespace kotlin::gc diff --git a/kotlin-native/runtime/src/gc/common/cpp/FinalizerProcessorTest.cpp b/kotlin-native/runtime/src/gc/common/cpp/FinalizerProcessorTest.cpp new file mode 100644 index 00000000000..82444edcfd5 --- /dev/null +++ b/kotlin-native/runtime/src/gc/common/cpp/FinalizerProcessorTest.cpp @@ -0,0 +1,113 @@ +/* + * Copyright 2010-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license + * that can be found in the LICENSE file. + */ + +#include "FinalizerProcessor.hpp" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include "TestSupport.hpp" + +using namespace kotlin; + +namespace { + +class FinalizerProcessorTest : public testing::Test { +public: + using FinalizerQueue = std_support::vector; + + struct FinalizerQueueTraits { + static bool isEmpty(const FinalizerQueue& queue) noexcept { return queue.empty(); } + static void add(FinalizerQueue& into, FinalizerQueue from) noexcept { into.insert(into.end(), from.begin(), from.end()); } + static void process(FinalizerQueue queue) noexcept { + AssertThreadState(ThreadState::kRunnable); + for (auto& obj : queue) { + setFinalizerHook_->Call(obj); + } + } + }; + + using FinalizerProcessor = gc::FinalizerProcessor; + + FinalizerProcessorTest() noexcept { setFinalizerHook_ = &finalizerHook_; } + + ~FinalizerProcessorTest() { setFinalizerHook_ = nullptr; } + + testing::MockFunction& finalizerHook() { return finalizerHook_; } + +private: + static testing::MockFunction* setFinalizerHook_; + testing::StrictMock> finalizerHook_; +}; + +// static +testing::MockFunction* FinalizerProcessorTest::setFinalizerHook_ = nullptr; + +int threadsCount() { + auto iter = mm::ThreadRegistry::Instance().LockForIter(); + return std::distance(iter.begin(), iter.end()); +}; + +} // namespace + +TEST_F(FinalizerProcessorTest, NotRunningThreadWhenUnused) { + FinalizerProcessor processor([](int64_t) {}); + ASSERT_EQ(threadsCount(), 0); + ASSERT_FALSE(processor.IsRunning()); + FinalizerQueue queue; + processor.ScheduleTasks(std::move(queue), 1); + ASSERT_EQ(threadsCount(), 0); + ASSERT_FALSE(processor.IsRunning()); +} + +TEST_F(FinalizerProcessorTest, RemoveObject) { + RunInNewThread([this] { + ASSERT_EQ(threadsCount(), 1); + std::atomic done = 0; + FinalizerProcessor processor([&](int64_t epoch) { done = epoch; }); + FinalizerQueue queue; + auto obj = 42; + queue.push_back(obj); + EXPECT_CALL(finalizerHook(), Call(obj)); + processor.ScheduleTasks(std::move(queue), 1); + while (done != 1) { + } + ASSERT_EQ(threadsCount(), 2); + ASSERT_TRUE(processor.IsRunning()); + processor.StopFinalizerThread(); + ASSERT_EQ(threadsCount(), 1); + }); +} + +TEST_F(FinalizerProcessorTest, ScheduleTasksWhileFinalizing) { + RunInNewThread([this] { + std::atomic done = 0; + FinalizerProcessor processor([&done](int64_t epoch) { done = epoch; }); + std::vector queues; + int epochs = 100; + std::vector objects; + for (int epoch = 0; epoch < epochs; epoch++) { + FinalizerQueue queue; + for (int i = 0; i < 10; i++) { + auto obj = objects.size(); + queue.push_back(obj); + objects.push_back(obj); + } + queues.emplace_back(std::move(queue)); + } + for (auto object : objects) { + EXPECT_CALL(finalizerHook(), Call(object)); + } + for (int epoch = 0; epoch < epochs; epoch++) { + processor.ScheduleTasks(std::move(queues[epoch]), epoch + 1); + } + while (done != epochs) { + } + ASSERT_EQ(threadsCount(), 2); + ASSERT_TRUE(processor.IsRunning()); + processor.StopFinalizerThread(); + ASSERT_EQ(threadsCount(), 1); + }); +} diff --git a/kotlin-native/runtime/src/gc/cms/cpp/GCState.hpp b/kotlin-native/runtime/src/gc/common/cpp/GCState.hpp similarity index 88% rename from kotlin-native/runtime/src/gc/cms/cpp/GCState.hpp rename to kotlin-native/runtime/src/gc/common/cpp/GCState.hpp index d061d656916..db2648d95ff 100644 --- a/kotlin-native/runtime/src/gc/cms/cpp/GCState.hpp +++ b/kotlin-native/runtime/src/gc/common/cpp/GCState.hpp @@ -10,6 +10,9 @@ #include #include +#include "KAssert.h" +#include "Utils.hpp" + class GCStateHolder { public: int64_t schedule() { @@ -29,17 +32,11 @@ public: finalizedEpoch.notify(); } - void start(int64_t epoch) { - startedEpoch.set(epoch); - } + void start(int64_t epoch) { startedEpoch.set(epoch); } - void finish(int64_t epoch) { - finishedEpoch.set(epoch); - } + void finish(int64_t epoch) { finishedEpoch.set(epoch); } - void finalized(int64_t epoch) { - finalizedEpoch.set(epoch); - } + void finalized(int64_t epoch) { finalizedEpoch.set(epoch); } void waitEpochFinished(int64_t epoch) { finishedEpoch.wait([this, epoch] { return *finishedEpoch >= epoch || shutdownFlag_; }); @@ -58,7 +55,7 @@ public: private: template struct ValueWithCondVar : kotlin::Pinned { - explicit ValueWithCondVar(T initializer, std::mutex& mutex) noexcept : value_(initializer), mutex_(mutex) {}; + explicit ValueWithCondVar(T initializer, std::mutex& mutex) noexcept : value_(initializer), mutex_(mutex){}; const T& operator*() const { return value_; } @@ -73,9 +70,7 @@ private: cond_.notify_all(); } - void notify() { - cond_.notify_all(); - } + void notify() { cond_.notify_all(); } template const T& wait(Predicate stop_waiting) { @@ -97,4 +92,4 @@ private: ValueWithCondVar scheduledEpoch{0, mutex_}; ValueWithCondVar finalizedEpoch{0, mutex_}; bool shutdownFlag_ = false; -}; \ No newline at end of file +}; diff --git a/kotlin-native/runtime/src/gc/stms/cpp/GCImpl.cpp b/kotlin-native/runtime/src/gc/stms/cpp/GCImpl.cpp index 5acb3ef266b..4ef5106c2a2 100644 --- a/kotlin-native/runtime/src/gc/stms/cpp/GCImpl.cpp +++ b/kotlin-native/runtime/src/gc/stms/cpp/GCImpl.cpp @@ -17,10 +17,7 @@ namespace { ALWAYS_INLINE void SafePointRegular(gc::GC::ThreadData& threadData, size_t weight) noexcept { threadData.impl().gcScheduler().OnSafePointRegular(weight); - auto flag = gc::internal::loadSafepointFlag(); - if (flag != gc::SameThreadMarkAndSweep::SafepointFlag::kNone) { - threadData.impl().gc().SafePointSlowPath(flag); - } + mm::SuspendIfRequested(); } } // namespace @@ -89,16 +86,21 @@ gc::GCSchedulerConfig& gc::GC::gcSchedulerConfig() noexcept { } void gc::GC::ClearForTests() noexcept { + impl_->gc().StopFinalizerThreadIfRunning(); impl_->objectFactory().ClearForTests(); GCHandle::ClearForTests(); } -void gc::GC::StartFinalizerThreadIfNeeded() noexcept {} +void gc::GC::StartFinalizerThreadIfNeeded() noexcept { + impl_->gc().StartFinalizerThreadIfNeeded(); +} -void gc::GC::StopFinalizerThreadIfRunning() noexcept {} +void gc::GC::StopFinalizerThreadIfRunning() noexcept { + impl_->gc().StopFinalizerThreadIfRunning(); +} bool gc::GC::FinalizersThreadIsRunning() noexcept { - return false; + return impl_->gc().FinalizersThreadIsRunning(); } // static diff --git a/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweep.cpp b/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweep.cpp index d27016ca6f1..66c9db6e67f 100644 --- a/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweep.cpp +++ b/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweep.cpp @@ -51,27 +51,31 @@ struct ProcessWeaksTraits { } }; -// Global, because it's accessed on a hot path: avoid memory load from `this`. -std::atomic gSafepointFlag = gc::SameThreadMarkAndSweep::SafepointFlag::kNone; - } // namespace void gc::SameThreadMarkAndSweep::ThreadData::SafePointAllocation(size_t size) noexcept { gcScheduler_.OnSafePointAllocation(size); - SafepointFlag flag = gSafepointFlag.load(); - if (flag != SafepointFlag::kNone) { - SafePointSlowPath(flag); - } + mm::SuspendIfRequested(); +} + +void gc::SameThreadMarkAndSweep::ThreadData::Schedule() noexcept { + RuntimeLogInfo({kTagGC}, "Scheduling GC manually"); + ThreadStateGuard guard(ThreadState::kNative); + gc_.state_.schedule(); } void gc::SameThreadMarkAndSweep::ThreadData::ScheduleAndWaitFullGC() noexcept { RuntimeLogInfo({kTagGC}, "Scheduling GC manually"); - auto didGC = gc_.PerformFullGC(); + ThreadStateGuard guard(ThreadState::kNative); + auto scheduled_epoch = gc_.state_.schedule(); + gc_.state_.waitEpochFinished(scheduled_epoch); +} - if (!didGC) { - // If we failed to suspend threads, someone else might be asking to suspend them. - mm::SuspendIfRequested(); - } +void gc::SameThreadMarkAndSweep::ThreadData::ScheduleAndWaitFullGCWithFinalizers() noexcept { + RuntimeLogInfo({kTagGC}, "Scheduling GC manually"); + ThreadStateGuard guard(ThreadState::kNative); + auto scheduled_epoch = gc_.state_.schedule(); + gc_.state_.waitEpochFinalized(scheduled_epoch); } void gc::SameThreadMarkAndSweep::ThreadData::OnOOM(size_t size) noexcept { @@ -79,92 +83,88 @@ void gc::SameThreadMarkAndSweep::ThreadData::OnOOM(size_t size) noexcept { ScheduleAndWaitFullGC(); } -NO_INLINE void gc::SameThreadMarkAndSweep::ThreadData::SafePointSlowPath(SafepointFlag flag) noexcept { - switch (flag) { - case SafepointFlag::kNone: - RuntimeAssert(false, "Must've been handled by the caller"); - return; - case SafepointFlag::kNeedsSuspend: - mm::SuspendIfRequested(); - return; - case SafepointFlag::kNeedsGC: - RuntimeLogDebug({kTagGC}, "Attempt to GC at SafePoint"); - ScheduleAndWaitFullGC(); - return; - } -} - gc::SameThreadMarkAndSweep::SameThreadMarkAndSweep( mm::ObjectFactory& objectFactory, GCScheduler& gcScheduler) noexcept : - objectFactory_(objectFactory), gcScheduler_(gcScheduler) { - gcScheduler_.SetScheduleGC([]() { - // TODO: CMS is also responsible for avoiding scheduling while GC hasn't started running. - // Investigate, if it's possible to move this logic into the scheduler. - SafepointFlag expectedFlag = SafepointFlag::kNone; - if (gSafepointFlag.compare_exchange_strong(expectedFlag, SafepointFlag::kNeedsGC)) { - RuntimeLogDebug({kTagGC}, "Scheduling GC by thread %d", konan::currentThreadId()); + objectFactory_(objectFactory), gcScheduler_(gcScheduler), finalizerProcessor_([this](int64_t epoch) noexcept { + GCHandle::getByEpoch(epoch).finalizersDone(); + state_.finalized(epoch); + }) { + gcScheduler_.SetScheduleGC([this]() NO_INLINE { + RuntimeLogDebug({kTagGC}, "Scheduling GC by thread %d", konan::currentThreadId()); + // This call acquires a lock, so we need to ensure that we're in the safe state. + NativeOrUnregisteredThreadGuard guard(/* reentrant = */ true); + state_.schedule(); + }); + gcThread_ = ScopedThread(ScopedThread::attributes().name("GC thread"), [this] { + while (true) { + auto epoch = state_.waitScheduled(); + if (epoch.has_value()) { + PerformFullGC(*epoch); + } else { + break; + } } }); RuntimeLogDebug({kTagGC}, "Same thread Mark & Sweep GC initialized"); } -bool gc::SameThreadMarkAndSweep::PerformFullGC() noexcept { - RuntimeLogDebug({kTagGC}, "Attempt to suspend threads by thread %d", konan::currentThreadId()); +gc::SameThreadMarkAndSweep::~SameThreadMarkAndSweep() { + state_.shutdown(); +} + +void gc::SameThreadMarkAndSweep::StartFinalizerThreadIfNeeded() noexcept { + NativeOrUnregisteredThreadGuard guard(true); + finalizerProcessor_.StartFinalizerThreadIfNone(); + finalizerProcessor_.WaitFinalizerThreadInitialized(); +} + +void gc::SameThreadMarkAndSweep::StopFinalizerThreadIfRunning() noexcept { + NativeOrUnregisteredThreadGuard guard(true); + finalizerProcessor_.StopFinalizerThread(); +} + +bool gc::SameThreadMarkAndSweep::FinalizersThreadIsRunning() noexcept { + return finalizerProcessor_.IsRunning(); +} + +void gc::SameThreadMarkAndSweep::PerformFullGC(int64_t epoch) noexcept { + auto gcHandle = GCHandle::create(epoch); bool didSuspend = mm::RequestThreadsSuspension(); - if (!didSuspend) { - RuntimeLogDebug({kTagGC}, "Failed to suspend threads by thread %d", konan::currentThreadId()); - // Somebody else suspended the threads, and so ran a GC. - // TODO: This breaks if suspension is used by something apart from GC. - return false; - } - gSafepointFlag = SafepointFlag::kNeedsSuspend; - auto gcHandle = GCHandle::create(epoch_++); + RuntimeAssert(didSuspend, "Only GC thread can request suspension"); gcHandle.suspensionRequested(); - mm::ObjectFactory::FinalizerQueue finalizerQueue; - { - // Switch state to native to simulate this thread being a GC thread. - ThreadStateGuard guard(ThreadState::kNative); + RuntimeAssert(!kotlin::mm::IsCurrentThreadRegistered(), "GC must run on unregistered thread"); + mm::WaitForThreadsSuspension(); + gcHandle.threadsAreSuspended(); - mm::WaitForThreadsSuspension(); - gcHandle.threadsAreSuspended(); + auto& scheduler = gcScheduler_; + scheduler.gcData().OnPerformFullGC(); - auto& scheduler = gcScheduler_; - scheduler.gcData().OnPerformFullGC(); + state_.start(epoch); - gc::collectRootSet(gcHandle, markQueue_, [] (mm::ThreadData&) { return true; }); - auto& extraObjectsDataFactory = mm::GlobalData::Instance().extraObjectDataFactory(); + gc::collectRootSet(gcHandle, markQueue_, [](mm::ThreadData&) { return true; }); - gc::Mark(gcHandle, markQueue_); - auto markStats = gcHandle.getMarked(); - scheduler.gcData().UpdateAliveSetBytes(markStats.markedSizeBytes); + gc::Mark(gcHandle, markQueue_); + auto markStats = gcHandle.getMarked(); + scheduler.gcData().UpdateAliveSetBytes(markStats.markedSizeBytes); - gc::processWeaks(gcHandle, mm::SpecialRefRegistry::instance()); + gc::processWeaks(gcHandle, mm::SpecialRefRegistry::instance()); - gc::SweepExtraObjects(gcHandle, extraObjectsDataFactory); - finalizerQueue = gc::Sweep(gcHandle, objectFactory_); + // Taking the locks before the pause is completed. So that any destroying thread + // would not publish into the global state at an unexpected time. + std::optional extraObjectFactoryIterable = mm::GlobalData::Instance().extraObjectDataFactory().LockForIter(); + std::optional objectFactoryIterable = objectFactory_.LockForIter(); - kotlin::compactObjectPoolInMainThread(); + gc::SweepExtraObjects(gcHandle, *extraObjectFactoryIterable); + extraObjectFactoryIterable = std::nullopt; + auto finalizerQueue = gc::Sweep(gcHandle, *objectFactoryIterable); + objectFactoryIterable = std::nullopt; + kotlin::compactObjectPoolInMainThread(); - gSafepointFlag = SafepointFlag::kNone; - mm::ResumeThreads(); - gcHandle.threadsAreResumed(); - gcHandle.finalizersScheduled(finalizerQueue.size()); - gcHandle.finished(); - } - - // Finalizers are run after threads are resumed, because finalizers may request GC themselves, which would - // try to suspend threads again. Also, we run finalizers in the runnable state, because they may be executing - // kotlin code. - - // TODO: These will actually need to be run on a separate thread. - AssertThreadState(ThreadState::kRunnable); - finalizerQueue.Finalize(); - gcHandle.finalizersDone(); - - return true; -} - -gc::SameThreadMarkAndSweep::SafepointFlag gc::internal::loadSafepointFlag() noexcept { - return gSafepointFlag.load(); + mm::ResumeThreads(); + gcHandle.threadsAreResumed(); + state_.finish(epoch); + gcHandle.finalizersScheduled(finalizerQueue.size()); + gcHandle.finished(); + finalizerProcessor_.ScheduleTasks(std::move(finalizerQueue), epoch); } diff --git a/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweep.hpp b/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweep.hpp index 7a7fd52b025..a9d5b116b5c 100644 --- a/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweep.hpp +++ b/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweep.hpp @@ -9,7 +9,9 @@ #include #include "Allocator.hpp" +#include "FinalizerProcessor.hpp" #include "GCScheduler.hpp" +#include "GCState.hpp" #include "IntrusiveList.hpp" #include "ObjectFactory.hpp" #include "Types.h" @@ -23,15 +25,10 @@ class ThreadData; namespace gc { -// Stop-the-world Mark-and-Sweep that runs on mutator threads. Can support targets that do not have threads. +// Stop-the-world mark & sweep. The GC runs in a separate thread, finalizers run in another thread of their own. +// TODO: Rename to StopTheWorldMarkAndSweep. class SameThreadMarkAndSweep : private Pinned { public: - enum class SafepointFlag { - kNone, - kNeedsSuspend, - kNeedsGC, - }; - class ObjectData { public: bool tryMark() noexcept { @@ -77,12 +74,11 @@ public: gc_(gc), gcScheduler_(gcScheduler) {} ~ThreadData() = default; - void SafePointSlowPath(SafepointFlag flag) noexcept; void SafePointAllocation(size_t size) noexcept; - void Schedule() noexcept { ScheduleAndWaitFullGC(); } + void Schedule() noexcept; void ScheduleAndWaitFullGC() noexcept; - void ScheduleAndWaitFullGCWithFinalizers() noexcept { ScheduleAndWaitFullGC(); } + void ScheduleAndWaitFullGCWithFinalizers() noexcept; void OnOOM(size_t size) noexcept; @@ -96,18 +92,26 @@ public: using Allocator = ThreadData::Allocator; + using FinalizerQueue = mm::ObjectFactory::FinalizerQueue; + using FinalizerQueueTraits = mm::ObjectFactory::FinalizerQueueTraits; + SameThreadMarkAndSweep(mm::ObjectFactory& objectFactory, GCScheduler& gcScheduler) noexcept; - ~SameThreadMarkAndSweep() = default; + ~SameThreadMarkAndSweep(); + + void StartFinalizerThreadIfNeeded() noexcept; + void StopFinalizerThreadIfRunning() noexcept; + bool FinalizersThreadIsRunning() noexcept; private: - // Returns `true` if GC has happened, and `false` if not (because someone else has suspended the threads). - bool PerformFullGC() noexcept; - - uint64_t epoch_ = 0; + void PerformFullGC(int64_t epoch) noexcept; mm::ObjectFactory& objectFactory_; GCScheduler& gcScheduler_; + GCStateHolder state_; + ScopedThread gcThread_; + FinalizerProcessor finalizerProcessor_; + MarkQueue markQueue_; }; @@ -143,8 +147,6 @@ struct MarkTraits { } }; -SameThreadMarkAndSweep::SafepointFlag loadSafepointFlag() noexcept; - } // namespace internal } // namespace gc diff --git a/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweepTest.cpp b/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweepTest.cpp index 5bd2ba6978f..9f90da89fa7 100644 --- a/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweepTest.cpp +++ b/kotlin-native/runtime/src/gc/stms/cpp/SameThreadMarkAndSweepTest.cpp @@ -211,7 +211,7 @@ public: mm::GlobalsRegistry::Instance().ClearForTests(); mm::SpecialRefRegistry::instance().clearForTests(); mm::GlobalData::Instance().extraObjectDataFactory().ClearForTests(); - mm::GlobalData::Instance().gc().impl().objectFactory().ClearForTests(); + mm::GlobalData::Instance().gc().ClearForTests(); } testing::MockFunction& finalizerHook() { return finalizerHooks_.finalizerHook(); } @@ -242,7 +242,7 @@ TEST_F(SameThreadMarkAndSweepTest, RootSet) { ASSERT_THAT(IsMarked(stack2.header()), false); ASSERT_THAT(IsMarked(stack3.header()), false); - threadData.gc().ScheduleAndWaitFullGC(); + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); EXPECT_THAT( Alive(threadData), @@ -288,7 +288,7 @@ TEST_F(SameThreadMarkAndSweepTest, InterconnectedRootSet) { ASSERT_THAT(IsMarked(stack2.header()), false); ASSERT_THAT(IsMarked(stack3.header()), false); - threadData.gc().ScheduleAndWaitFullGC(); + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); EXPECT_THAT( Alive(threadData), @@ -312,7 +312,7 @@ TEST_F(SameThreadMarkAndSweepTest, FreeObjects) { ASSERT_THAT(IsMarked(object1.header()), false); ASSERT_THAT(IsMarked(object2.header()), false); - threadData.gc().ScheduleAndWaitFullGC(); + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); EXPECT_THAT(Alive(threadData), testing::UnorderedElementsAre()); }); @@ -329,7 +329,7 @@ TEST_F(SameThreadMarkAndSweepTest, FreeObjectsWithFinalizers) { EXPECT_CALL(finalizerHook(), Call(object1.header())); EXPECT_CALL(finalizerHook(), Call(object2.header())); - threadData.gc().ScheduleAndWaitFullGC(); + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); EXPECT_THAT(Alive(threadData), testing::UnorderedElementsAre()); }); @@ -349,7 +349,7 @@ TEST_F(SameThreadMarkAndSweepTest, FreeObjectWithFreeWeak) { ASSERT_THAT(weak1.get(), object1.header()); EXPECT_CALL(finalizerHook(), Call(weak1.header())); - threadData.gc().ScheduleAndWaitFullGC(); + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); EXPECT_THAT(Alive(threadData), testing::UnorderedElementsAre()); }); @@ -366,7 +366,7 @@ TEST_F(SameThreadMarkAndSweepTest, FreeObjectWithHoldedWeak) { ASSERT_THAT(IsMarked(weak1.header()), false); ASSERT_THAT(weak1.get(), object1.header()); - threadData.gc().ScheduleAndWaitFullGC(); + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); EXPECT_THAT(Alive(threadData), testing::UnorderedElementsAre(weak1.header(), stack.header())); EXPECT_THAT(IsMarked(weak1.header()), false); @@ -399,7 +399,7 @@ TEST_F(SameThreadMarkAndSweepTest, ObjectReferencedFromRootSet) { ASSERT_THAT(IsMarked(object3.header()), false); ASSERT_THAT(IsMarked(object4.header()), false); - threadData.gc().ScheduleAndWaitFullGC(); + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); EXPECT_THAT( Alive(threadData), @@ -448,7 +448,7 @@ TEST_F(SameThreadMarkAndSweepTest, ObjectsWithCycles) { ASSERT_THAT(IsMarked(object5.header()), false); ASSERT_THAT(IsMarked(object6.header()), false); - threadData.gc().ScheduleAndWaitFullGC(); + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); EXPECT_THAT( Alive(threadData), @@ -499,7 +499,7 @@ TEST_F(SameThreadMarkAndSweepTest, ObjectsWithCyclesAndFinalizers) { EXPECT_CALL(finalizerHook(), Call(object5.header())); EXPECT_CALL(finalizerHook(), Call(object6.header())); - threadData.gc().ScheduleAndWaitFullGC(); + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); EXPECT_THAT( Alive(threadData), @@ -532,7 +532,7 @@ TEST_F(SameThreadMarkAndSweepTest, ObjectsWithCyclesIntoRootSet) { ASSERT_THAT(IsMarked(object1.header()), false); ASSERT_THAT(IsMarked(object2.header()), false); - threadData.gc().ScheduleAndWaitFullGC(); + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); EXPECT_THAT(Alive(threadData), testing::UnorderedElementsAre(global.header(), stack.header(), object1.header(), object2.header())); EXPECT_THAT(IsMarked(global.header()), false); @@ -576,8 +576,8 @@ TEST_F(SameThreadMarkAndSweepTest, RunGCTwice) { ASSERT_THAT(IsMarked(object5.header()), false); ASSERT_THAT(IsMarked(object6.header()), false); - threadData.gc().ScheduleAndWaitFullGC(); - threadData.gc().ScheduleAndWaitFullGC(); + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); EXPECT_THAT( Alive(threadData), @@ -607,7 +607,7 @@ TEST_F(SameThreadMarkAndSweepTest, PermanentObjects) { ASSERT_THAT(Alive(threadData), testing::UnorderedElementsAre(global2.header())); EXPECT_THAT(IsMarked(global2.header()), false); - threadData.gc().ScheduleAndWaitFullGC(); + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); EXPECT_THAT(Alive(threadData), testing::UnorderedElementsAre(global2.header())); EXPECT_THAT(IsMarked(global2.header()), false); @@ -627,7 +627,7 @@ TEST_F(SameThreadMarkAndSweepTest, SameObjectInRootSet) { EXPECT_THAT(IsMarked(global.header()), false); EXPECT_THAT(IsMarked(object.header()), false); - threadData.gc().ScheduleAndWaitFullGC(); + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); EXPECT_THAT(Alive(threadData), testing::UnorderedElementsAre(global.header(), object.header())); EXPECT_THAT(IsMarked(global.header()), false); @@ -725,7 +725,7 @@ TEST_F(SameThreadMarkAndSweepTest, MultipleMutatorsCollect) { std_support::vector> gcFutures(kDefaultThreadCount); - gcFutures[0] = mutators[0].Execute([](mm::ThreadData& threadData, Mutator& mutator) { threadData.gc().ScheduleAndWaitFullGC(); }); + gcFutures[0] = mutators[0].Execute([](mm::ThreadData& threadData, Mutator& mutator) { threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); }); // Spin until thread suspension is requested. while (!mm::IsThreadSuspensionRequested()) { @@ -781,15 +781,27 @@ TEST_F(SameThreadMarkAndSweepTest, MultipleMutatorsAllCollect) { std_support::vector> gcFutures(kDefaultThreadCount); - // TODO: Maybe check that only one GC is performed. for (int i = 0; i < kDefaultThreadCount; ++i) { - gcFutures[i] = mutators[i].Execute([](mm::ThreadData& threadData, Mutator& mutator) { threadData.gc().ScheduleAndWaitFullGC(); }); + gcFutures[i] = mutators[i].Execute([](mm::ThreadData& threadData, Mutator& mutator) { + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); + // If GC starts before all thread executed line above, two gc will be run + // So we are temporary switch threads to native state and then return them back after all GC runs are done + SwitchThreadState(mm::GetMemoryState(), kotlin::ThreadState::kNative); + }); } for (auto& future : gcFutures) { future.wait(); } + for (int i = 0; i < kDefaultThreadCount; ++i) { + mutators[i] + .Execute([](mm::ThreadData& threadData, Mutator& mutator) { + SwitchThreadState(mm::GetMemoryState(), kotlin::ThreadState::kRunnable); + }) + .wait(); + } + std_support::vector expectedAlive; for (auto& global : globals) { expectedAlive.push_back(global); @@ -845,7 +857,7 @@ TEST_F(SameThreadMarkAndSweepTest, MultipleMutatorsAddToRootSetAfterCollectionRe } std_support::vector> gcFutures(kDefaultThreadCount); - gcFutures[0] = mutators[0].Execute([](mm::ThreadData& threadData, Mutator& mutator) { threadData.gc().ScheduleAndWaitFullGC(); }); + gcFutures[0] = mutators[0].Execute([](mm::ThreadData& threadData, Mutator& mutator) { threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); }); // Spin until thread suspension is requested. while (!mm::IsThreadSuspensionRequested()) { @@ -910,7 +922,7 @@ TEST_F(SameThreadMarkAndSweepTest, CrossThreadReference) { std_support::vector> gcFutures(kDefaultThreadCount); - gcFutures[0] = mutators[0].Execute([](mm::ThreadData& threadData, Mutator& mutator) { threadData.gc().ScheduleAndWaitFullGC(); }); + gcFutures[0] = mutators[0].Execute([](mm::ThreadData& threadData, Mutator& mutator) { threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); }); // Spin until thread suspension is requested. while (!mm::IsThreadSuspensionRequested()) { @@ -972,7 +984,7 @@ TEST_F(SameThreadMarkAndSweepTest, MultipleMutatorsWeaks) { std_support::vector> gcFutures(kDefaultThreadCount); gcFutures[0] = mutators[0].Execute([weak](mm::ThreadData& threadData, Mutator& mutator) { - threadData.gc().ScheduleAndWaitFullGC(); + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); EXPECT_THAT(weak->get(), nullptr); }); @@ -1023,7 +1035,7 @@ TEST_F(SameThreadMarkAndSweepTest, NewThreadsWhileRequestingCollection) { std_support::vector> gcFutures(kDefaultThreadCount); - gcFutures[0] = mutators[0].Execute([](mm::ThreadData& threadData, Mutator& mutator) { threadData.gc().ScheduleAndWaitFullGC(); }); + gcFutures[0] = mutators[0].Execute([](mm::ThreadData& threadData, Mutator& mutator) { threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); }); // Spin until thread suspension is requested. while (!mm::IsThreadSuspensionRequested()) { @@ -1090,7 +1102,7 @@ TEST_F(SameThreadMarkAndSweepTest, FreeObjectWithFreeWeakReversedOrder) { global1->field1 = object1_local.header(); while (weak.load() == nullptr) ; - threadData.gc().ScheduleAndWaitFullGC(); + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); ASSERT_THAT(Alive(threadData), testing::UnorderedElementsAre(object1_local.header(), weak.load()->header(), global1.header())); ASSERT_THAT(IsMarked(global1.header()), false); @@ -1101,7 +1113,7 @@ TEST_F(SameThreadMarkAndSweepTest, FreeObjectWithFreeWeakReversedOrder) { global1->field1 = nullptr; EXPECT_CALL(finalizerHook(), Call(weak.load()->header())); - threadData.gc().ScheduleAndWaitFullGC(); + threadData.gc().ScheduleAndWaitFullGCWithFinalizers(); EXPECT_THAT(Alive(threadData), testing::UnorderedElementsAre(global1.header())); done = true; diff --git a/kotlin-native/runtime/src/main/cpp/SingleLockList.hpp b/kotlin-native/runtime/src/main/cpp/SingleLockList.hpp index 30afd03be9a..3d264e46682 100644 --- a/kotlin-native/runtime/src/main/cpp/SingleLockList.hpp +++ b/kotlin-native/runtime/src/main/cpp/SingleLockList.hpp @@ -48,7 +48,7 @@ public: class Iterator { public: - using difference_type = void; + using difference_type = std::ptrdiff_t; using value_type = Value; using pointer = Value*; using reference = Value&; diff --git a/kotlin-native/runtime/src/mm/cpp/ObjectFactory.hpp b/kotlin-native/runtime/src/mm/cpp/ObjectFactory.hpp index 0f316a21ae5..7cdb417bae1 100644 --- a/kotlin-native/runtime/src/mm/cpp/ObjectFactory.hpp +++ b/kotlin-native/runtime/src/mm/cpp/ObjectFactory.hpp @@ -675,6 +675,14 @@ public: typename Storage::Consumer consumer_; }; + struct FinalizerQueueTraits { + static bool isEmpty(const FinalizerQueue& queue) noexcept { return queue.size() == 0; } + + static void add(FinalizerQueue& into, FinalizerQueue from) noexcept { into.MergeWith(std::move(from)); } + + static void process(FinalizerQueue queue) noexcept { queue.Finalize(); } + }; + class Iterable { public: Iterable(ObjectFactory& owner) noexcept : iter_(owner.storage_.LockForIter()) {}