diff --git a/kotlin-native/runtime/src/main/cpp/CleanerTest.cpp b/kotlin-native/runtime/src/main/cpp/CleanerTest.cpp index 4861b9b455f..ca36bcd109a 100644 --- a/kotlin-native/runtime/src/main/cpp/CleanerTest.cpp +++ b/kotlin-native/runtime/src/main/cpp/CleanerTest.cpp @@ -12,6 +12,7 @@ #include "gtest/gtest.h" #include "Atomic.h" +#include "TestSupport.hpp" #include "TestSupportCompilerGenerated.hpp" using testing::_; @@ -21,7 +22,7 @@ using testing::_; TEST(CleanerTest, ConcurrentCreation) { ResetCleanerWorkerForTests(); - constexpr int threadCount = 100; + constexpr int threadCount = kotlin::kDefaultThreadCount; constexpr KInt workerId = 42; auto createCleanerWorkerMock = ScopedCreateCleanerWorkerMock(); diff --git a/kotlin-native/runtime/src/main/cpp/MultiSourceQueue.hpp b/kotlin-native/runtime/src/main/cpp/MultiSourceQueue.hpp index cafcf355dee..1a514855141 100644 --- a/kotlin-native/runtime/src/main/cpp/MultiSourceQueue.hpp +++ b/kotlin-native/runtime/src/main/cpp/MultiSourceQueue.hpp @@ -7,6 +7,9 @@ #define RUNTIME_MULTI_SOURCE_QUEUE_H #include +#include + +#include "Mutex.hpp" namespace kotlin { @@ -16,27 +19,50 @@ class MultiSourceQueue { public: class Producer { public: + explicit Producer(MultiSourceQueue& owner) noexcept : owner_(owner) {} + + ~Producer() { Publish(); } + void Insert(const T& value) noexcept { queue_.push_back(value); } + // Merge `this` queue with owning `MultiSourceQueue`. `this` will have empty queue after the call. + // This call is performed without heap allocations. TODO: Test that no allocations are happening. + void Publish() noexcept { owner_.Collect(*this); } + private: friend class MultiSourceQueue; + MultiSourceQueue& owner_; // weak std::list queue_; }; using Iterator = typename std::list::iterator; - Iterator begin() noexcept { return commonQueue_.begin(); } - Iterator end() noexcept { return commonQueue_.end(); } + class Iterable : MoveOnly { + public: + explicit Iterable(MultiSourceQueue& owner) noexcept : owner_(owner), guard_(owner_.mutex_) {} - // Merge `producer`s queue with `this`. `producer` will have empty queue after the call. - // This call is performed without heap allocations. TODO: Test that no allocations are happening. - void Collect(Producer* producer) noexcept { commonQueue_.splice(commonQueue_.end(), producer->queue_); } + Iterator begin() noexcept { return owner_.commonQueue_.begin(); } + Iterator end() noexcept { return owner_.commonQueue_.end(); } + + private: + MultiSourceQueue& owner_; // weak + std::unique_lock guard_; + }; + + // Lock MultiSourceQueue for safe iteration. + Iterable Iter() noexcept { return Iterable(*this); } private: + void Collect(Producer& producer) noexcept { + std::lock_guard guard(mutex_); + commonQueue_.splice(commonQueue_.end(), producer.queue_); + } + // Using `std::list` as it allows to implement `Collect` without memory allocations, // which is important for GC mark phase. std::list commonQueue_; + SimpleMutex mutex_; }; } // namespace kotlin diff --git a/kotlin-native/runtime/src/main/cpp/MultiSourceQueueTest.cpp b/kotlin-native/runtime/src/main/cpp/MultiSourceQueueTest.cpp index e6622edc099..60d8e81fa66 100644 --- a/kotlin-native/runtime/src/main/cpp/MultiSourceQueueTest.cpp +++ b/kotlin-native/runtime/src/main/cpp/MultiSourceQueueTest.cpp @@ -5,9 +5,14 @@ #include "MultiSourceQueue.hpp" +#include +#include + #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "TestSupport.hpp" + using namespace kotlin; using IntQueue = MultiSourceQueue; @@ -16,74 +21,187 @@ TEST(MultiSourceQueueTest, Empty) { IntQueue queue; std::vector actual; - for (int element : queue) { + for (int element : queue.Iter()) { actual.push_back(element); } EXPECT_THAT(actual, testing::IsEmpty()); } -TEST(MultiSourceQueueTest, DoNotCollect) { +TEST(MultiSourceQueueTest, DoNotPublish) { IntQueue queue; - IntQueue::Producer producer; + IntQueue::Producer producer(queue); producer.Insert(1); producer.Insert(2); std::vector actual; - for (int element : queue) { + for (int element : queue.Iter()) { actual.push_back(element); } EXPECT_THAT(actual, testing::IsEmpty()); } -TEST(MultiSourceQueueTest, Collect) { +TEST(MultiSourceQueueTest, Publish) { IntQueue queue; - IntQueue::Producer producer1; - IntQueue::Producer producer2; + IntQueue::Producer producer1(queue); + IntQueue::Producer producer2(queue); producer1.Insert(1); producer1.Insert(2); producer2.Insert(10); producer2.Insert(20); - queue.Collect(&producer1); - queue.Collect(&producer2); + producer1.Publish(); + producer2.Publish(); std::vector actual; - for (int element : queue) { + for (int element : queue.Iter()) { actual.push_back(element); } EXPECT_THAT(actual, testing::ElementsAre(1, 2, 10, 20)); } -TEST(MultiSourceQueueTest, CollectSeveralTimes) { +TEST(MultiSourceQueueTest, PublishSeveralTimes) { IntQueue queue; - IntQueue::Producer producer; + IntQueue::Producer producer(queue); - // Add 2 elements and collect. + // Add 2 elements and publish. producer.Insert(1); producer.Insert(2); - queue.Collect(&producer); + producer.Publish(); - // Add another element and collect. + // Add another element and publish. producer.Insert(3); - queue.Collect(&producer); + producer.Publish(); - // Collect without adding elements. - queue.Collect(&producer); + // Publish without adding elements. + producer.Publish(); - // Add yet another two elements and collect. + // Add yet another two elements and publish. producer.Insert(4); producer.Insert(5); - queue.Collect(&producer); + producer.Publish(); std::vector actual; - for (int element : queue) { + for (int element : queue.Iter()) { actual.push_back(element); } EXPECT_THAT(actual, testing::ElementsAre(1, 2, 3, 4, 5)); } + +TEST(MultiSourceQueueTest, PublishInDestructor) { + IntQueue queue; + + { + IntQueue::Producer producer(queue); + producer.Insert(1); + producer.Insert(2); + } + + std::vector actual; + for (int element : queue.Iter()) { + actual.push_back(element); + } + + EXPECT_THAT(actual, testing::ElementsAre(1, 2)); +} + +TEST(MultiSourceQueueTest, ConcurrentPublish) { + IntQueue queue; + constexpr int kThreadCount = kDefaultThreadCount; + std::atomic canStart(false); + std::atomic readyCount(0); + std::vector threads; + std::vector expected; + + for (int i = 0; i < kThreadCount; ++i) { + expected.push_back(i); + threads.emplace_back([i, &queue, &canStart, &readyCount]() { + IntQueue::Producer producer(queue); + producer.Insert(i); + ++readyCount; + while (!canStart) { + } + producer.Publish(); + }); + } + + while (readyCount < kThreadCount) { + } + canStart = true; + for (auto& t : threads) { + t.join(); + } + + std::vector actual; + for (int element : queue.Iter()) { + actual.push_back(element); + } + + EXPECT_THAT(actual, testing::UnorderedElementsAreArray(expected)); +} + +TEST(MultiSourceQueueTest, IterWhileConcurrentPublish) { + IntQueue queue; + constexpr int kStartCount = 50; + constexpr int kThreadCount = kDefaultThreadCount; + + std::vector expectedBefore; + std::vector expectedAfter; + IntQueue::Producer producer(queue); + for (int i = 0; i < kStartCount; ++i) { + expectedBefore.push_back(i); + expectedAfter.push_back(i); + producer.Insert(i); + } + producer.Publish(); + + std::atomic canStart(false); + std::atomic readyCount(0); + std::atomic startedCount(0); + std::vector threads; + for (int i = 0; i < kThreadCount; ++i) { + int j = i + kStartCount; + expectedAfter.push_back(j); + threads.emplace_back([j, &queue, &canStart, &startedCount, &readyCount]() { + IntQueue::Producer producer(queue); + producer.Insert(j); + ++readyCount; + while (!canStart) { + } + ++startedCount; + producer.Publish(); + }); + } + + std::vector actualBefore; + { + auto iter = queue.Iter(); + while (readyCount < kThreadCount) { + } + canStart = true; + while (startedCount < kThreadCount) { + } + + for (int element : iter) { + actualBefore.push_back(element); + } + } + + for (auto& t : threads) { + t.join(); + } + + EXPECT_THAT(actualBefore, testing::ElementsAreArray(expectedBefore)); + + std::vector actualAfter; + for (int element : queue.Iter()) { + actualAfter.push_back(element); + } + + EXPECT_THAT(actualAfter, testing::UnorderedElementsAreArray(expectedAfter)); +} diff --git a/kotlin-native/runtime/src/main/cpp/SingleLockListTest.cpp b/kotlin-native/runtime/src/main/cpp/SingleLockListTest.cpp index 2fd1d93b338..480056ac12f 100644 --- a/kotlin-native/runtime/src/main/cpp/SingleLockListTest.cpp +++ b/kotlin-native/runtime/src/main/cpp/SingleLockListTest.cpp @@ -12,6 +12,8 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "TestSupport.hpp" + using namespace kotlin; namespace { @@ -105,19 +107,23 @@ TEST(SingleLockListTest, EraseToEmptyEmplaceAndIter) { TEST(SingleLockListTest, ConcurrentEmplace) { IntList list; - constexpr int kThreadCount = 100; + constexpr int kThreadCount = kDefaultThreadCount; std::atomic canStart(false); + std::atomic readyCount(0); std::vector threads; std::vector expected; for (int i = 0; i < kThreadCount; ++i) { expected.push_back(i); - threads.emplace_back([i, &list, &canStart]() { + threads.emplace_back([i, &list, &canStart, &readyCount]() { + ++readyCount; while (!canStart) { } list.Emplace(i); }); } + while (readyCount < kThreadCount) { + } canStart = true; for (auto& t : threads) { t.join(); @@ -133,22 +139,26 @@ TEST(SingleLockListTest, ConcurrentEmplace) { TEST(SingleLockListTest, ConcurrentErase) { IntList list; - constexpr int kThreadCount = 100; + constexpr int kThreadCount = kDefaultThreadCount; std::vector items; for (int i = 0; i < kThreadCount; ++i) { items.push_back(list.Emplace(i)); } std::atomic canStart(false); + std::atomic readyCount(0); std::vector threads; for (auto* item : items) { - threads.emplace_back([item, &list, &canStart]() { + threads.emplace_back([item, &list, &canStart, &readyCount]() { + ++readyCount; while (!canStart) { } list.Erase(item); }); } + while (readyCount < kThreadCount) { + } canStart = true; for (auto& t : threads) { t.join(); @@ -165,7 +175,7 @@ TEST(SingleLockListTest, ConcurrentErase) { TEST(SingleLockListTest, IterWhileConcurrentEmplace) { IntList list; constexpr int kStartCount = 50; - constexpr int kThreadCount = 100; + constexpr int kThreadCount = kDefaultThreadCount; std::deque expectedBefore; std::vector expectedAfter; @@ -217,7 +227,7 @@ TEST(SingleLockListTest, IterWhileConcurrentEmplace) { TEST(SingleLockListTest, IterWhileConcurrentErase) { IntList list; - constexpr int kThreadCount = 100; + constexpr int kThreadCount = kDefaultThreadCount; std::deque expectedBefore; std::vector items; diff --git a/kotlin-native/runtime/src/main/cpp/TestSupport.hpp b/kotlin-native/runtime/src/main/cpp/TestSupport.hpp new file mode 100644 index 00000000000..521a7d93951 --- /dev/null +++ b/kotlin-native/runtime/src/main/cpp/TestSupport.hpp @@ -0,0 +1,15 @@ +/* + * Copyright 2010-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license + * that can be found in the LICENSE file. + */ + +namespace kotlin { + +#if KONAN_WINDOWS +// TODO: Figure out why creating many threads on windows is so slow. +constexpr int kDefaultThreadCount = 10; +#else +constexpr int kDefaultThreadCount = 100; +#endif + +} // namespace kotlin diff --git a/kotlin-native/runtime/src/mm/cpp/GlobalsRegistry.cpp b/kotlin-native/runtime/src/mm/cpp/GlobalsRegistry.cpp index 96f80fd9ea4..74c89ea7747 100644 --- a/kotlin-native/runtime/src/mm/cpp/GlobalsRegistry.cpp +++ b/kotlin-native/runtime/src/mm/cpp/GlobalsRegistry.cpp @@ -18,12 +18,11 @@ mm::GlobalsRegistry& mm::GlobalsRegistry::Instance() noexcept { } void mm::GlobalsRegistry::RegisterStorageForGlobal(mm::ThreadData* threadData, ObjHeader** location) noexcept { - threadData->globalsThreadQueue()->Insert(location); + threadData->globalsThreadQueue().Insert(location); } void mm::GlobalsRegistry::ProcessThread(mm::ThreadData* threadData) noexcept { - RuntimeAssert(threadData->isWaitingForGC(), "Thread must be waiting for GC to complete."); - globals_.Collect(threadData->globalsThreadQueue()); + threadData->globalsThreadQueue().Publish(); } mm::GlobalsRegistry::GlobalsRegistry() = default; diff --git a/kotlin-native/runtime/src/mm/cpp/GlobalsRegistry.hpp b/kotlin-native/runtime/src/mm/cpp/GlobalsRegistry.hpp index 5cb77c91128..580ddf74ce4 100644 --- a/kotlin-native/runtime/src/mm/cpp/GlobalsRegistry.hpp +++ b/kotlin-native/runtime/src/mm/cpp/GlobalsRegistry.hpp @@ -16,24 +16,29 @@ namespace mm { class GlobalsRegistry : Pinned { public: - using ThreadQueue = MultiSourceQueue::Producer; + class ThreadQueue : public MultiSourceQueue::Producer { + public: + explicit ThreadQueue(GlobalsRegistry& registry) : Producer(registry.globals_) {} + // Do not add fields as this is just a wrapper and Producer does not have virtual destructor. + }; - using Iterator = std::list::iterator; + using Iterable = MultiSourceQueue::Iterable; + + using Iterator = MultiSourceQueue::Iterator; static GlobalsRegistry& Instance() noexcept; void RegisterStorageForGlobal(mm::ThreadData* threadData, ObjHeader** location) noexcept; - // Collect globals from thread corresponding to `threadData`. Thread must be waiting for GC. - // Only one thread can call this method. + // Collect globals from thread corresponding to `threadData`. Must be called by the thread + // when it's asked by GC to stop. void ProcessThread(mm::ThreadData* threadData) noexcept; - // These must be called on the same thread as `ProcessThread` to avoid races. + // Lock registry for safe iteration. // TODO: Iteration over `globals_` will be slow, because it's `std::list` collected at different times from // different threads, and so the nodes are all over the memory. Use metrics to understand how // much of a problem is it. - Iterator begin() noexcept { return globals_.begin(); } - Iterator end() noexcept { return globals_.end(); } + Iterable Iter() noexcept { return globals_.Iter(); } private: friend class GlobalData; diff --git a/kotlin-native/runtime/src/mm/cpp/ThreadData.hpp b/kotlin-native/runtime/src/mm/cpp/ThreadData.hpp index 82a89c3c3f2..cecf7cdb2d6 100644 --- a/kotlin-native/runtime/src/mm/cpp/ThreadData.hpp +++ b/kotlin-native/runtime/src/mm/cpp/ThreadData.hpp @@ -19,18 +19,13 @@ namespace mm { // Pin it in memory to prevent accidental copying. class ThreadData final : private Pinned { public: - ThreadData(pthread_t threadId) noexcept : threadId_(threadId) {} + ThreadData(pthread_t threadId) noexcept : threadId_(threadId), globalsThreadQueue_(GlobalsRegistry::Instance()) {} ~ThreadData() = default; pthread_t threadId() const noexcept { return threadId_; } - bool isWaitingForGC() const noexcept { - // TODO: Implement. - return false; - } - - GlobalsRegistry::ThreadQueue* globalsThreadQueue() noexcept { return &globalsThreadQueue_; } + GlobalsRegistry::ThreadQueue& globalsThreadQueue() noexcept { return globalsThreadQueue_; } ThreadLocalStorage& tls() noexcept { return tls_; }