From 8a86fec38fe4b600ae63af961413354959879b85 Mon Sep 17 00:00:00 2001 From: Alexander Shabalin Date: Wed, 29 Nov 2023 13:52:31 +0100 Subject: [PATCH] [K/N] Add RunLoopFinalizerProcessor ^KT-63423 --- kotlin-native/runtime/build.gradle.kts | 1 + .../common/cpp/RunLoopFinalizerProcessor.hpp | 195 ++++++++++++++++++ .../cpp/RunLoopFinalizerProcessorTest.cpp | 167 +++++++++++++++ 3 files changed, 363 insertions(+) create mode 100644 kotlin-native/runtime/src/alloc/common/cpp/RunLoopFinalizerProcessor.hpp create mode 100644 kotlin-native/runtime/src/alloc/common/cpp/RunLoopFinalizerProcessorTest.cpp diff --git a/kotlin-native/runtime/build.gradle.kts b/kotlin-native/runtime/build.gradle.kts index ef2c329ad58..9e31894cb9f 100644 --- a/kotlin-native/runtime/build.gradle.kts +++ b/kotlin-native/runtime/build.gradle.kts @@ -159,6 +159,7 @@ bitcode { headersDirs.from(files("src/gcScheduler/common/cpp", "src/gc/common/cpp", "src/mm/cpp", "src/externalCallsChecker/common/cpp", "src/main/cpp")) sourceSets { main {} + test {} } } diff --git a/kotlin-native/runtime/src/alloc/common/cpp/RunLoopFinalizerProcessor.hpp b/kotlin-native/runtime/src/alloc/common/cpp/RunLoopFinalizerProcessor.hpp new file mode 100644 index 00000000000..3ca8619e798 --- /dev/null +++ b/kotlin-native/runtime/src/alloc/common/cpp/RunLoopFinalizerProcessor.hpp @@ -0,0 +1,195 @@ +/* + * Copyright 2010-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license + * that can be found in the LICENSE file. + */ + +#pragma once + +#include +#include +#include + +#include "Clock.hpp" +#include "Logging.hpp" +#include "Utils.hpp" +#include "objc_support/RunLoopSource.hpp" +#include "objc_support/RunLoopTimer.hpp" +#include "objc_support/AutoreleasePool.hpp" + +namespace kotlin::alloc { + +struct RunLoopFinalizerProcessorConfig { + // How long can finalizers be processed in a single task. If some finalizer takes too long, the entire + // batch of `batchSize` will overshoot this target. + // This cannot be too large to allow the attached run loop process other tasks (not from this finalizer processor). + std::chrono::nanoseconds maxTimeInTask = std::chrono::milliseconds(300); + // The minimum time between two tasks. + // This cannot be too small to allow the attached run loop process other tasks (not from this finalizer processor). + std::chrono::nanoseconds minTimeBetweenTasks = std::chrono::milliseconds(1); + // How many finalizers are processed in a single batch in a single autoreleasepool. + size_t batchSize = 100; +}; + +#if KONAN_HAS_FOUNDATION_FRAMEWORK + +// Finalizer processor that runs on `CFRunLoop`. +// +// It's attached to a run loop via `attachToCurrentRunLoop` and detached when the returned `Subscription` +// is destroyed. It cannot be simulatenously attached to multiple run loops. +// +// Tasks are scheduled via `schedule` and are guaranteed to be processed before the tasks from the next `schedule` call. +// The processor will process finalizers in groups of `batchSize` and will stop after either all finalizers are processed or +// more than `maxTimeInTask` have passed (if some finalizer takes a very long time, the overshoot may be significant). The +// processor will not start processing next finalizers for `minTimeBetweenTasks`. +// +// The default configuration may be changed by `withConfig`. +template +class RunLoopFinalizerProcessor : private Pinned { +public: + // A token that `RunLoopFinalizerProcessor` is attached to a run loop. + // + // Must be destroyed on the same thread that called `attachToCurrentRunLoop`. + class [[nodiscard]] Subscription : private Pinned { + public: + ~Subscription() = default; + + private: + friend class RunLoopFinalizerProcessor; + + explicit Subscription(RunLoopFinalizerProcessor& owner) noexcept : + sourceSubscription_(owner.source_.attachToCurrentRunLoop()), timerSubscription_(owner.timer_.attachToCurrentRunLoop()) {} + + std::unique_ptr sourceSubscription_; + std::unique_ptr timerSubscription_; + }; + + // The constructed processor is not attached to any run loop, and so will not be processing + // tasks. Call `attachToCurrentRunLoop` to attach it to the current thread's run loop. + RunLoopFinalizerProcessor() noexcept = default; + + // Schedule `tasks` from epoch `epoch` to be processed on this finalizer processor. + // + // It's guaranteed that these `tasks` will be processed only after `tasks` from the previous + // call to `schedule`. + void schedule(FinalizerQueue tasks, uint64_t epoch) noexcept { + if (FinalizerQueueTraits::isEmpty(tasks)) return; + { + std::unique_lock guard(queueMutex_); + queue_.emplace_back(std::move(tasks), epoch); + } + source_.signal(); + } + + // Modify the configuration of this `RunLoopFinalizerProcessor`. There's no guarantee, when will it be applied. + template + std::invoke_result_t withConfig(F&& f) noexcept { + std::unique_lock guard(configMutex_); + return std::invoke(std::forward(f), config_); + } + + // Attach this `RunLoopFinalizerProcessor` to the current thread's run loop. + // + // This processor can only be attached to one run loop at a time. + Subscription attachToCurrentRunLoop() noexcept { return Subscription(*this); } + +private: + void process() noexcept { + auto startTime = steady_clock::now(); + { + std::unique_lock guard(configMutex_); + auto minStartTime = lastProcessTimestamp_ + config_.minTimeBetweenTasks; + if (startTime < minStartTime) { + // `process` is being called too frequently. Wait until the next allowed time. + auto interval = minStartTime - startTime; + // TODO: std::common_type between double and saturated is undefined. + using Unsaturated = std::chrono::duration; + auto unsaturatedInterval = Unsaturated(interval); + timer_.setNextFiring(unsaturatedInterval); + return; + } + } + steady_clock::time_point deadline; + size_t batchCount; + { + std::unique_lock guard(configMutex_); + RuntimeLogDebug( + {kTagGC}, "Processing finalizers on a run loop for maximum %" PRId64 "ms", + std::chrono::duration_cast(config_.maxTimeInTask).count()); + deadline = startTime + config_.maxTimeInTask; + batchCount = config_.batchSize; + } + size_t processedCount = 0; + while (true) { + auto now = steady_clock::now(); + if (now > deadline) { + // Finalization is being run too long. Stop processing and reschedule until the next allowed time. + std::unique_lock guard(configMutex_); + RuntimeLogDebug( + {kTagGC}, "Processing %zu finalizers on a run loop has taken %" PRId64 " ms. Stopping for %" PRId64 "ms.", + processedCount, std::chrono::duration_cast(now - startTime).count().value, + std::chrono::duration_cast(config_.minTimeBetweenTasks).count()); + timer_.setNextFiring(config_.minTimeBetweenTasks); + lastProcessTimestamp_ = now; + return; + } + { + objc_support::AutoreleasePool autoreleasePool; + for (size_t i = 0; i < batchCount; ++i) { + // There's no point checking `deadline` here since the majority of the time will probably + // be spent in `AutoreleasePool` destructor. + if (!FinalizerQueueTraits::processSingle(currentQueue_.queue)) { + break; + } + ++processedCount; + } + } + if (!FinalizerQueueTraits::isEmpty(currentQueue_.queue)) { + continue; + } + RuntimeLogDebug({kTagGC}, "Epoch #%" PRIu64 ": finished processing finalizers on a run loop", currentQueue_.epoch); + // Attempt to fill `currentQueue_` from the global `queue_`. + std::unique_lock guard(queueMutex_); + if (queue_.empty()) { + // Let's keep this under the lock. This way if someone were to schedule new tasks, they + // would definitely have to wait long enough to see the updated lastProcessTimestamp_. + lastProcessTimestamp_ = steady_clock::now(); + RuntimeLogDebug( + {kTagGC}, "Processing %zu finalizers on a run loop has finished in %" PRId64 "ms.", processedCount, + std::chrono::duration_cast(lastProcessTimestamp_ - startTime).count().value); + return; + } + currentQueue_ = std::move(queue_.front()); + RuntimeLogDebug({kTagGC}, "Epoch #%" PRIu64 ": will process finalizers on a run loop", currentQueue_.epoch); + queue_.pop_front(); + RuntimeAssert(!FinalizerQueueTraits::isEmpty(currentQueue_.queue), "Empty queue should not have been scheduled"); + } + } + + std::mutex configMutex_; + RunLoopFinalizerProcessorConfig config_; + + struct ScheduledQueue { + ScheduledQueue() noexcept = default; + ScheduledQueue(FinalizerQueue queue, uint64_t epoch) noexcept : queue(std::move(queue)), epoch(epoch) {} + + FinalizerQueue queue; + uint64_t epoch = 0; + }; + + std::mutex queueMutex_; + ScheduledQueue currentQueue_; + std::deque queue_; + + steady_clock::time_point lastProcessTimestamp_ = + steady_clock::time_point::min(); // Only accessed by the process() function called only by the `CFRunLoop`. + + objc_support::RunLoopSource source_{[this]() noexcept { process(); }}; + // `timer_` is triggered manually with `setNextFiring`, so `interval` and `initialFiring` are set very high. + // This follows https://developer.apple.com/documentation/corefoundation/1542501-cfrunlooptimersetnextfiredate#discussion + objc_support::RunLoopTimer timer_{ + [this]() noexcept { source_.signal(); }, std::chrono::hours(100), objc_support::cf_clock::now() + std::chrono::hours(100)}; +}; + +#endif + +} // namespace kotlin::alloc \ No newline at end of file diff --git a/kotlin-native/runtime/src/alloc/common/cpp/RunLoopFinalizerProcessorTest.cpp b/kotlin-native/runtime/src/alloc/common/cpp/RunLoopFinalizerProcessorTest.cpp new file mode 100644 index 00000000000..6bcd8beae29 --- /dev/null +++ b/kotlin-native/runtime/src/alloc/common/cpp/RunLoopFinalizerProcessorTest.cpp @@ -0,0 +1,167 @@ +/* + * Copyright 2010-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license + * that can be found in the LICENSE file. + */ + +#if KONAN_HAS_FOUNDATION_FRAMEWORK + +#include "RunLoopFinalizerProcessor.hpp" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include "objc_support/RunLoopTestSupport.hpp" + +using namespace kotlin; + +namespace { + +using FinalizerQueue = std::vector>; + +struct FinalizerQueueTraits { + static void add(FinalizerQueue& into, FinalizerQueue from) noexcept { + into.insert(into.end(), std::make_move_iterator(from.begin()), std::make_move_iterator(from.end())); + } + + static bool isEmpty(const FinalizerQueue& queue) noexcept { return queue.empty(); } + + static bool processSingle(FinalizerQueue& queue) noexcept { + if (queue.empty()) return false; + auto item = std::move(queue.back()); + queue.pop_back(); + item(); + return true; + } +}; + +using RunLoopFinalizerProcessor = alloc::RunLoopFinalizerProcessor; + +} // namespace + +TEST(RunLoopFinalizerProcessorTest, Basic) { + RunLoopFinalizerProcessor processor; + objc_support::test_support::RunLoopInScopedThread runLoop([&]() noexcept { return processor.attachToCurrentRunLoop(); }); + + std::array>, 4> finalizers; + + std::atomic done = false; + { + testing::InSequence seq; + EXPECT_CALL(finalizers[1], Call()); + EXPECT_CALL(finalizers[0], Call()); + EXPECT_CALL(finalizers[3], Call()); + EXPECT_CALL(finalizers[2], Call()).WillOnce([&] { done.store(true, std::memory_order_release); }); + } + processor.schedule({finalizers[0].AsStdFunction(), finalizers[1].AsStdFunction()}, 1); + processor.schedule({finalizers[2].AsStdFunction(), finalizers[3].AsStdFunction()}, 2); + runLoop.wakeUp(); + while (!done.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } +} + +TEST(RunLoopFinalizerProcessorTest, ScheduleWhileProcessing) { + RunLoopFinalizerProcessor processor; + objc_support::test_support::RunLoopInScopedThread runLoop([&]() noexcept { return processor.attachToCurrentRunLoop(); }); + + std::array>, 4> finalizers; + + std::atomic done = false; + { + testing::InSequence seq; + EXPECT_CALL(finalizers[1], Call()).WillOnce([&] { + processor.schedule({finalizers[2].AsStdFunction(), finalizers[3].AsStdFunction()}, 2); + }); + EXPECT_CALL(finalizers[0], Call()); + EXPECT_CALL(finalizers[3], Call()); + EXPECT_CALL(finalizers[2], Call()).WillOnce([&] { done.store(true, std::memory_order_release); }); + } + processor.schedule({finalizers[0].AsStdFunction(), finalizers[1].AsStdFunction()}, 1); + runLoop.wakeUp(); + while (!done.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } +} + +TEST(RunLoopFinalizerProcessorTest, Overtime) { + constexpr std::chrono::nanoseconds overtime = std::chrono::milliseconds(1); + constexpr std::chrono::nanoseconds timeoutBetween = std::chrono::milliseconds(10); + RunLoopFinalizerProcessor processor; + processor.withConfig([&](alloc::RunLoopFinalizerProcessorConfig& config) noexcept { + config.minTimeBetweenTasks = timeoutBetween; + config.maxTimeInTask = overtime; + config.batchSize = 3; + }); + objc_support::test_support::RunLoopInScopedThread runLoop([&]() noexcept { return processor.attachToCurrentRunLoop(); }); + + std::array>, 4> finalizers; + + std::atomic done = false; + steady_clock::time_point sleptAt; + testing::StrictMock> checkpoint; + { + testing::InSequence seq; + EXPECT_CALL(finalizers[3], Call()).WillOnce([&] { runLoop.schedule(checkpoint.AsStdFunction()); }); + EXPECT_CALL(finalizers[2], Call()).WillOnce([&] { + std::this_thread::sleep_for(overtime); + sleptAt = steady_clock::now(); + }); + EXPECT_CALL(finalizers[1], Call()); + EXPECT_CALL(checkpoint, Call()); + EXPECT_CALL(finalizers[0], Call()).WillOnce([&] { + EXPECT_GE(steady_clock::now(), sleptAt + timeoutBetween); + done.store(true, std::memory_order_release); + }); + } + processor.schedule( + {finalizers[0].AsStdFunction(), finalizers[1].AsStdFunction(), finalizers[2].AsStdFunction(), finalizers[3].AsStdFunction()}, + 1); + runLoop.wakeUp(); + + while (!done.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } +} + +TEST(RunLoopFinalizerProcessorTest, ScheduleWhileOvertime) { + constexpr std::chrono::nanoseconds overtime = std::chrono::milliseconds(1); + constexpr std::chrono::nanoseconds timeoutBetween = std::chrono::milliseconds(10); + RunLoopFinalizerProcessor processor; + processor.withConfig([&](alloc::RunLoopFinalizerProcessorConfig& config) noexcept { + config.minTimeBetweenTasks = timeoutBetween; + config.maxTimeInTask = overtime; + config.batchSize = 2; + }); + objc_support::test_support::RunLoopInScopedThread runLoop([&]() noexcept { return processor.attachToCurrentRunLoop(); }); + + std::array>, 6> finalizers; + + std::atomic done = false; + steady_clock::time_point sleptAt; + testing::StrictMock> checkpoint; + { + testing::InSequence seq; + EXPECT_CALL(finalizers[3], Call()).WillOnce([&] { + processor.schedule({finalizers[4].AsStdFunction(), finalizers[5].AsStdFunction()}, 1); + runLoop.schedule(checkpoint.AsStdFunction()); + std::this_thread::sleep_for(overtime); + sleptAt = steady_clock::now(); + }); + EXPECT_CALL(finalizers[2], Call()); + EXPECT_CALL(checkpoint, Call()); + EXPECT_CALL(finalizers[1], Call()).WillOnce([&] { EXPECT_GE(steady_clock::now(), sleptAt + timeoutBetween); }); + EXPECT_CALL(finalizers[0], Call()); + EXPECT_CALL(finalizers[5], Call()); + EXPECT_CALL(finalizers[4], Call()).WillOnce([&] { done.store(true, std::memory_order_release); }); + } + processor.schedule( + {finalizers[0].AsStdFunction(), finalizers[1].AsStdFunction(), finalizers[2].AsStdFunction(), finalizers[3].AsStdFunction()}, + 1); + runLoop.wakeUp(); + + while (!done.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } +} + +#endif \ No newline at end of file