[K/N] GC: parallel mark with work balancing ^KT-57771

Merge-request: KT-MR-11460
Merged-by: Alexey Glushko <aleksei.glushko@jetbrains.com>
This commit is contained in:
Aleksei.Glushko
2023-08-03 16:25:19 +00:00
committed by Space Team
parent f0f1dc15c3
commit f1efeff21b
32 changed files with 1431 additions and 594 deletions
@@ -0,0 +1,114 @@
/*
* Copyright 2010-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license
* that can be found in the LICENSE file.
*/
/*
* An implementation of Dmitry Vyukov's Bounded Multi-producer/multi-consumer bounded queue.
*
* Copyright (c) 2010-2011, Dmitry Vyukov. All rights reserved.
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
* 1. Redistributions of source code must retain the above copyright notice, this list of
* conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice, this list
* of conditions and the following disclaimer in the documentation and/or other materials
* provided with the distribution.
* THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* The views and conclusions contained in the software and documentation are those of the authors and should not be interpreted
* as representing official policies, either expressed or implied, of Dmitry Vyukov.
*
* TODO what about a binary distribution?
*/
#pragma once
#include <atomic>
#include "Utils.hpp"
#include "ManuallyScoped.hpp"
namespace kotlin {
/**
* A fixed-size concurrent multi-producer/multi-consumer queue.
* @tparam kCapacity must be a power of 2.
*/
template<typename T, std::size_t kCapacity>
class BoundedQueue : private Pinned {
public:
BoundedQueue() {
static_assert((kCapacity >= 2) && ((kCapacity & (kCapacity - 1)) == 0), "Queue capacity must be a power of 2");
for (size_t i = 0; i < kCapacity; ++i) {
buffer_[i].sequence_.store(i, std::memory_order_relaxed);
}
enqueuePos_.store(0, std::memory_order_relaxed);
dequeuePos_.store(0, std::memory_order_relaxed);
}
bool enqueue(T&& value) {
Cell* cell;
std::size_t pos = enqueuePos_.load(std::memory_order_relaxed);
while (true) {
cell = &buffer_[pos & (kCapacity - 1)];
std::size_t seq = cell->sequence_.load(std::memory_order_acquire);
std::intptr_t dif = static_cast<std::intptr_t>(seq) - static_cast<std::intptr_t>(pos);
if (dif == 0) {
if (enqueuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
break;
}
} else if (dif < 0) {
return false;
} else {
pos = enqueuePos_.load(std::memory_order_relaxed);
}
}
cell->data_.construct(std::move(value));
cell->sequence_.store(pos + 1, std::memory_order_release);
return true;
}
std::optional<T> dequeue() {
Cell* cell;
std::size_t pos = dequeuePos_.load(std::memory_order_relaxed);
while (true) {
cell = &buffer_[pos & (kCapacity - 1)];
std::size_t seq = cell->sequence_.load(std::memory_order_acquire);
std::intptr_t dif = static_cast<std::intptr_t>(seq) - static_cast<std::intptr_t>(pos + 1);
if (dif == 0) {
if (dequeuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
break;
}
} else if (dif < 0) {
return std::nullopt;
} else {
pos = dequeuePos_.load(std::memory_order_relaxed);
}
}
auto result = std::move(*cell->data_);
cell->data_.destroy();
cell->sequence_.store(pos + kCapacity, std::memory_order_release);
return std::move(result);
}
private:
struct Cell {
// TODO describe
std::atomic<size_t> sequence_;
ManuallyScoped<T> data_;
};
constexpr static auto kCacheLineSize = 128;
alignas(kCacheLineSize) Cell buffer_[kCapacity];
alignas(kCacheLineSize) std::atomic<size_t> enqueuePos_;
alignas(kCacheLineSize) std::atomic<size_t> dequeuePos_;
};
} // namespace kotlin
@@ -0,0 +1,130 @@
/*
* Copyright 2010-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license
* that can be found in the LICENSE file.
*/
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <list>
#include "IntrusiveList.hpp"
#include "ParallelProcessor.hpp"
#include "std_support/Vector.hpp"
#include "SingleThreadExecutor.hpp"
#include "TestSupport.hpp"
using ::testing::_;
using namespace kotlin;
namespace {
class Element {
public:
Element() : Element(0) {}
explicit Element(int value) : a(value), b(value), c(value), d(value) {}
bool isValid() const {
return a == b && b == c && c == d;
}
private:
std::size_t a;
std::size_t b;
std::size_t c;
std::size_t d;
};
} // namespace
TEST(BoundedQueueTest, ConcurrentEnqueue) {
constexpr auto kThreadCount = 16;
constexpr auto kElemsPerThread = 1024;
BoundedQueue<Element, kThreadCount * kElemsPerThread> queue;
std::atomic<bool> start = false;
std::list<ScopedThread> threads;
for (int t = 0; t < kThreadCount; ++t) {
threads.emplace_back([&, t]() {
while (!start) {
std::this_thread::yield();
}
for (int e = 0; e < kElemsPerThread; ++e) {
queue.enqueue(Element(t + e));
}
});
}
start = true;
threads.clear();
while (auto elem = queue.dequeue()) {
EXPECT_TRUE(elem->isValid());
}
}
TEST(BoundedQueueTest, ConcurrentDequeue) {
constexpr auto kThreadCount = 16;
constexpr auto kElemsPerThread = 1024;
BoundedQueue<Element, kThreadCount * kElemsPerThread> queue;
for (int i = 0; i < kThreadCount * kElemsPerThread; ++i) {
queue.enqueue(Element(i));
}
std::atomic<bool> start = false;
std::list<ScopedThread> threads;
for (int t = 0; t < kThreadCount; ++t) {
threads.emplace_back([&]() {
while (!start) {
std::this_thread::yield();
}
while (auto elem = queue.dequeue()) {
EXPECT_TRUE(elem->isValid());
}
});
}
start = true;
}
TEST(BoundedQueueTest, PingPongWithOverflow) {
constexpr auto kElemsPerThread = 1024;
BoundedQueue<Element, kElemsPerThread / 2> queue;
std::atomic<bool> start = false;
std::list<ScopedThread> writers;
for (std::size_t t = 0; t < kDefaultThreadCount; ++t) {
writers.emplace_back([&]() {
while (!start) {
std::this_thread::yield();
}
for (int i = 0; i < kElemsPerThread; ++i) {
while (!queue.enqueue(Element(i))) {
std::this_thread::yield();
}
}
});
}
std::atomic<bool> allWritten = false;
std::list<ScopedThread> readers;
for (std::size_t t = 0; t < kDefaultThreadCount; ++t) {
readers.emplace_back([&]() {
while (!start) {
std::this_thread::yield();
}
while (!allWritten) {
while (auto elem = queue.dequeue()) {
EXPECT_TRUE(elem->isValid());
}
}
});
}
start = true;
writers.clear();
allWritten = true;
}
@@ -19,7 +19,8 @@ using Kotlin_getSourceInfo_FunctionType = int(*)(void * /*addr*/, SourceInfo* /*
* but can be changed after compiling caches. So use this way for variables, which will be rarely accessed.
*/
RUNTIME_WEAK int32_t Kotlin_destroyRuntimeMode = 1;
RUNTIME_WEAK int32_t Kotlin_gcMarkSingleThreaded = 1;
RUNTIME_WEAK int32_t Kotlin_gcMutatorsCooperate = 0;
RUNTIME_WEAK uint32_t Kotlin_auxGCThreads = 0;
RUNTIME_WEAK int32_t Kotlin_workerExceptionHandling = 0;
RUNTIME_WEAK int32_t Kotlin_suspendFunctionsFromAnyThreadFromObjC = 0;
RUNTIME_WEAK Kotlin_getSourceInfo_FunctionType Kotlin_getSourceInfo_Function = nullptr;
@@ -36,8 +37,12 @@ ALWAYS_INLINE compiler::DestroyRuntimeMode compiler::destroyRuntimeMode() noexce
return static_cast<compiler::DestroyRuntimeMode>(Kotlin_destroyRuntimeMode);
}
ALWAYS_INLINE bool compiler::gcMarkSingleThreaded() noexcept {
return Kotlin_gcMarkSingleThreaded != 0;
ALWAYS_INLINE bool compiler::gcMutatorsCooperate() noexcept {
return Kotlin_gcMutatorsCooperate != 0;
}
ALWAYS_INLINE uint32_t compiler::auxGCThreads() noexcept {
return Kotlin_auxGCThreads;
}
ALWAYS_INLINE compiler::WorkerExceptionHandling compiler::workerExceptionHandling() noexcept {
@@ -38,6 +38,7 @@ extern "C" const int32_t Kotlin_runtimeAssertsMode;
extern "C" const int32_t Kotlin_disableMmap;
extern "C" const char* const Kotlin_runtimeLogs;
extern "C" const int32_t Kotlin_concurrentWeakSweep;
extern "C" const int32_t Kotlin_gcMarkSingleThreaded;
extern "C" const int32_t Kotlin_freezingEnabled;
extern "C" const int32_t Kotlin_freezingChecksEnabled;
@@ -103,9 +104,15 @@ ALWAYS_INLINE inline bool concurrentWeakSweep() noexcept {
return Kotlin_concurrentWeakSweep != 0;
}
ALWAYS_INLINE inline bool gcMarkSingleThreaded() noexcept {
return Kotlin_gcMarkSingleThreaded != 0;
}
WorkerExceptionHandling workerExceptionHandling() noexcept;
DestroyRuntimeMode destroyRuntimeMode() noexcept;
bool gcMarkSingleThreaded() noexcept;
bool gcMutatorsCooperate() noexcept;
uint32_t auxGCThreads() noexcept;
bool suspendFunctionsFromAnyThreadFromObjCEnabled() noexcept;
AppStateTracking appStateTracking() noexcept;
int getSourceInfo(void* addr, SourceInfo *result, int result_size) noexcept;
@@ -1,116 +0,0 @@
/*
* Copyright 2010-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license
* that can be found in the LICENSE file.
*/
#pragma once
#include <mutex>
#include "IntrusiveList.hpp"
#include "Porting.h"
#include "Mutex.hpp"
namespace kotlin {
/**
* A list for thread-local use with a separate shared part that can be accessed by other threads.
*/
template<typename T, typename Traits = DefaultIntrusiveForwardListTraits<T>>
class CooperativeIntrusiveList : Pinned {
using ListImpl = intrusive_forward_list<T, Traits>;
public:
using value_type = typename ListImpl::value_type;
using size_type = typename ListImpl::size_type;
using reference = typename ListImpl::reference;
using pointer = typename ListImpl::pointer;
CooperativeIntrusiveList() = default;
bool localEmpty() const {
return local_.empty();
}
size_type localSize() const {
return localSize_;
}
/**
* Tries to add `value` to the local list.
* See `intrusive_forward_list.try_push_front`.
*/
bool tryPushLocal(reference value) {
auto pushed = local_.try_push_front(value);
if (pushed) ++localSize_;
return pushed;
}
/**
* Tries to pop a value from the local list.
* See `intrusive_forward_list.try_pop_front`.
*/
pointer tryPopLocal() {
auto popped = local_.try_pop_front();
if (popped) {
--localSize_;
} else {
RuntimeAssert(localEmpty(), "Pop can only fail if the list is empty");
}
return popped;
}
void clearLocal() {
local_.clear();
localSize_ = 0;
}
bool sharedEmpty() const {
return shared_.empty();
}
/**
* Tries to move at most `maxAmount` elements from a from's shared list into `this`'s local list.
* In case some other thread is currently operating with the from's shared list, returns `0`.
* @return the number of elements stolen
*/
size_type tryTransferFrom(CooperativeIntrusiveList<T, Traits>& from, size_type maxAmount) noexcept {
std::unique_lock guard(from.sharedLocked_, std::try_to_lock);
if (!guard || from.sharedEmpty()) {
return 0;
}
auto amount = local_.splice_after(local_.before_begin(),
from.shared_.before_begin(),
from.shared_.end(),
maxAmount);
from.sharedSize_ -= amount;
localSize_ += amount;
return amount;
}
/**
* Moves all of the local items into own shared list.
* @return `0` if the shared list is busy, amount of newly shared items otherwise.
*/
size_type shareAll() noexcept {
RuntimeAssert(!localEmpty(), "Nothing to share");
std::unique_lock guard(sharedLocked_, std::try_to_lock);
if (!guard) return 0;
auto amount = shared_.splice_after(shared_.before_begin(), local_.before_begin(), local_.end(), localSize_);
sharedSize_ += amount;
localSize_ -= amount;
return amount;
}
private:
ListImpl local_;
size_type localSize_ = 0;
ListImpl shared_;
size_type sharedSize_ = 0;
SpinLock<MutexThreadStateHandling::kIgnore> sharedLocked_;
};
}
@@ -1,236 +0,0 @@
/*
* Copyright 2010-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license
* that can be found in the LICENSE file.
*/
#include "CooperativeIntrusiveList.hpp"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "ScopedThread.hpp"
#include "TestSupport.hpp"
#include "std_support/Deque.hpp"
#include "std_support/Vector.hpp"
#include "std_support/List.hpp"
using namespace kotlin;
using ::testing::_;
namespace {
class Node : private Pinned {
public:
explicit Node(int value) : value_(value) {}
int& operator*() { return value_; }
const int& operator*() const { return value_; }
void clearNext() noexcept { next_ = nullptr; }
int value() const {
return value_;
}
private:
friend struct DefaultIntrusiveForwardListTraits<Node>;
Node* next() const noexcept { return next_; }
void setNext(Node* next) noexcept {
RuntimeAssert(next, "next cannot be nullptr");
next_ = next;
}
bool trySetNext(Node* next) noexcept {
RuntimeAssert(next, "next cannot be nullptr");
if (next_) return false;
next_ = next;
return true;
}
int value_;
Node* next_ = nullptr;
};
using TestSubject = CooperativeIntrusiveList<Node>;
std_support::vector<int> range(int first, int lastExclusive) {
std_support::vector<int> values;
for (int i = first; i < lastExclusive; ++i) {
values.push_back(i);
}
return values;
}
template<typename Values>
[[nodiscard]] std_support::list<typename TestSubject::value_type> fill(TestSubject& list, Values&& values) {
std_support::list<typename TestSubject::value_type> nodesHandle;
for (int value: values) {
auto& elem = nodesHandle.emplace_back(value);
list.tryPushLocal(elem);
}
return nodesHandle;
}
void drainLocalInto(TestSubject& list, std_support::vector<int>& dest) {
while (auto elem = list.tryPopLocal()) {
dest.push_back(elem->value());
}
}
} // namespace
TEST(CooperativeIntrusiveListTest, Init) {
TestSubject list;
EXPECT_THAT(list.localEmpty(), true);
EXPECT_THAT(list.localSize(), 0);
EXPECT_THAT(list.sharedEmpty(), true);
}
TEST(CooperativeIntrusiveListTest, TryPopLocalEmpty) {
TestSubject list;
auto res = list.tryPopLocal();
EXPECT_THAT(res, nullptr);
}
TEST(CooperativeIntrusiveListTest, TryPushLocalPopLocal) {
TestSubject list;
typename TestSubject::value_type value1(1);
typename TestSubject::value_type value2(2);
bool pushed1 = list.tryPushLocal(value1);
bool pushed2 = list.tryPushLocal(value2);
EXPECT_THAT(pushed1, true);
EXPECT_THAT(pushed2, true);
EXPECT_THAT(list.localEmpty(), false);
EXPECT_THAT(list.localSize(), 2);
EXPECT_THAT(list.sharedEmpty(), true);
std_support::vector<int> popped;
drainLocalInto(list, popped);
EXPECT_THAT(list.localEmpty(), true);
EXPECT_THAT(list.localSize(), 0);
EXPECT_THAT(list.sharedEmpty(), true);
EXPECT_THAT(popped, testing::UnorderedElementsAre(1, 2));
}
TEST(CooperativeIntrusiveListTest, TryPushLocalTwice) {
TestSubject list;
typename TestSubject::value_type value(1);
bool pushed1 = list.tryPushLocal(value);
EXPECT_THAT(pushed1, true);
bool pushed2 = list.tryPushLocal(value);
EXPECT_THAT(pushed2, false);
EXPECT_THAT(list.localEmpty(), false);
EXPECT_THAT(list.localSize(), 1);
EXPECT_THAT(list.sharedEmpty(), true);
}
TEST(CooperativeIntrusiveListTest, ShareSome) {
TestSubject list;
auto values = range(0, 10);
auto nodeHandle = fill(list, values);
EXPECT_THAT(list.localEmpty(), false);
EXPECT_THAT(list.localSize(), values.size());
EXPECT_THAT(list.sharedEmpty(), true);
auto sharedAmount = list.shareAll();
EXPECT_THAT(sharedAmount, values.size());
EXPECT_THAT(list.localEmpty(), true);
EXPECT_THAT(list.sharedEmpty(), false);
}
TEST(CooperativeIntrusiveListTest, TryTransferFromEmpty) {
TestSubject from;
TestSubject thief;
auto stolenAmount = thief.tryTransferFrom(from, 1);
EXPECT_THAT(stolenAmount, 0);
}
TEST(CooperativeIntrusiveListTest, TryTransferHalf) {
TestSubject from;
auto values = range(0, 10);
auto nodeHandle = fill(from, values);
from.shareAll();
TestSubject thief;
auto toTransfer = values.size() / 2;
auto stolenAmount = thief.tryTransferFrom(from, toTransfer);
EXPECT_THAT(stolenAmount, toTransfer);
EXPECT_THAT(thief.localSize(), stolenAmount);
from.tryTransferFrom(from, values.size());
EXPECT_THAT(from.sharedEmpty(), true);
std_support::vector<int> allTheElements;
drainLocalInto(from, allTheElements);
drainLocalInto(thief, allTheElements);
EXPECT_THAT(allTheElements, testing::UnorderedElementsAreArray(values));
}
TEST(CooperativeIntrusiveListTest, TryTransferAllEventually) {
TestSubject from;
auto values = range(0, 10);
auto nodeHandle = fill(from, values);
from.shareAll();
TestSubject thief;
for (std::size_t i = 0; i < values.size(); ++i) {
auto stolenAmount = thief.tryTransferFrom(from, 1);
EXPECT_THAT(stolenAmount, 1);
}
EXPECT_THAT(from.sharedEmpty(), true);
EXPECT_THAT(thief.localSize(), values.size());
std_support::vector<int> allTheElements;
drainLocalInto(from, allTheElements);
drainLocalInto(thief, allTheElements);
EXPECT_THAT(allTheElements, testing::UnorderedElementsAreArray(values));
}
TEST(CooperativeIntrusiveListTest, TransferingPingPong) {
TestSubject list1;
TestSubject list2;
const auto size = 100;
auto values = range(0, size);
auto nodesHandle1 = fill(list1, values);
auto nodesHandle2 = fill(list2, values);
std::atomic ready = false;
auto kIters = 10000;
std_support::vector<ScopedThread> threads;
for (int tIdx = 0; tIdx < 2; ++tIdx) {
threads.emplace_back([&ready, kIters, tIdx, &list1, &list2] {
TestSubject& self = tIdx % 2 == 0 ? list1 : list2;
TestSubject& from = tIdx % 2 == 0 ? list2 : list1;
while (!ready.load()) {
std::this_thread::yield();
}
for (int iter = 0; iter < kIters; ++iter) {
if (!self.localEmpty()) self.shareAll();
self.tryTransferFrom(from, size / 2);
if (!self.localEmpty()) self.shareAll();
self.tryTransferFrom(from, size);
if (auto popped = self.tryPopLocal()) {
popped->clearNext();
self.tryPushLocal(*popped);
}
}
});
}
ready = true;
for (auto& thr: threads) {
thr.join();
}
// check nothing is lost
list1.tryTransferFrom(list1, size * 2);
list2.tryTransferFrom(list2, size * 2);
std_support::vector<int> allTheElements;
drainLocalInto(list1, allTheElements);
drainLocalInto(list2, allTheElements);
std_support::vector<int> expected;
expected.insert(expected.end(), values.begin(), values.end());
expected.insert(expected.end(), values.begin(), values.end());
EXPECT_THAT(allTheElements, testing::UnorderedElementsAreArray(expected));
}
@@ -31,7 +31,7 @@ public:
const T* operator->() const noexcept { return impl(); }
private:
T* impl() noexcept { return reinterpret_cast<T*>(implStorage_); }
__attribute__((used)) T* impl() noexcept { return reinterpret_cast<T*>(implStorage_); }
const T* impl() const noexcept { return reinterpret_cast<const T*>(implStorage_); }
alignas(T) char implStorage_[sizeof(T)];
@@ -16,6 +16,23 @@ namespace kotlin {
// TODO: Consider an iterator/ranges based approaches for traversals.
template <typename F>
ALWAYS_INLINE void traverseClassObjectFields(ObjHeader* object, F process) noexcept(noexcept(process(std::declval<ObjHeader**>()))) {
const TypeInfo* typeInfo = object->type_info();
RuntimeAssert(typeInfo != theArrayTypeInfo, "Must not be an array of objects");
for (int index = 0; index < typeInfo->objOffsetsCount_; index++) {
process(reinterpret_cast<ObjHeader**>(reinterpret_cast<uintptr_t>(object) + typeInfo->objOffsets_[index]));
}
}
template <typename F>
ALWAYS_INLINE void traverseArrayOfObjectsElements(ArrayHeader* array, F process) noexcept(noexcept(process(std::declval<ObjHeader**>()))) {
RuntimeAssert(array->type_info() == theArrayTypeInfo, "Must be an array of objects");
for (uint32_t index = 0; index < array->count_; index++) {
process(ArrayAddressOfElementAt(array, index));
}
}
template <typename F>
void traverseObjectFields(ObjHeader* object, F process) noexcept(noexcept(process(std::declval<ObjHeader**>()))) {
const TypeInfo* typeInfo = object->type_info();
@@ -0,0 +1,215 @@
/*
* Copyright 2010-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license
* that can be found in the LICENSE file.
*/
#pragma once
#include "CompilerConstants.hpp"
#include "KAssert.h"
#include "Logging.hpp"
#include "Utils.hpp"
#include "Porting.h"
#include "BoundedQueue.hpp"
namespace kotlin {
/**
* Coordinates a group of workers working in parallel on a large amounts of identical tasks.
* The dispatcher will try to balance the work among the workers.
*
* Requirements:
* - Every instantiated worker must execute `tryPop` sooner or later;
* - Every instantiated worker must finish execution before the destruction of the processor;
*/
template <typename ListImpl, std::size_t kBatchSize, std::size_t kBatchesPoolSize>
class ParallelProcessor : private Pinned {
class Batch {
public:
ALWAYS_INLINE bool empty() const noexcept {
return elems_.empty();
}
ALWAYS_INLINE bool full() const noexcept {
return elemsCount_ == kBatchSize;
}
ALWAYS_INLINE std::size_t elementsCount() const noexcept {
return elemsCount_;
}
ALWAYS_INLINE bool tryPush(typename ListImpl::reference value) noexcept {
RuntimeAssert(!full(), "Batch overflow");
bool pushed = elems_.try_push_front(value);
if (pushed) {
++elemsCount_;
}
return pushed;
}
ALWAYS_INLINE typename ListImpl::pointer tryPop() noexcept {
auto popped = elems_.try_pop_front();
if (popped) {
--elemsCount_;
}
return popped;
}
void transferAllInto(ListImpl& dst) noexcept {
dst.splice_after(dst.before_begin(), elems_.before_begin(), elems_.end(), std::numeric_limits<typename ListImpl::size_type>::max());
RuntimeAssert(empty(), "All the elements must be transferred");
elemsCount_ = 0;
}
void fillFrom(ListImpl& src) noexcept {
auto spliced = elems_.splice_after(elems_.before_begin(), src.before_begin(), src.end(), kBatchSize);
elemsCount_ = spliced;
}
private:
ListImpl elems_;
std::size_t elemsCount_ = 0;
};
public:
class Worker : private Pinned {
friend ParallelProcessor;
public:
explicit Worker(ParallelProcessor& dispatcher) : dispatcher_(dispatcher) {
dispatcher_.registeredWorkers_.fetch_add(1, std::memory_order_relaxed);
RuntimeLogDebug({ "balancing" }, "Worker registered");
}
ALWAYS_INLINE bool localEmpty() const noexcept {
return batch_.empty() && overflowList_.empty();
}
ALWAYS_INLINE bool tryPushLocal(typename ListImpl::reference value) noexcept {
return overflowList_.try_push_front(value);
}
ALWAYS_INLINE typename ListImpl::pointer tryPopLocal() noexcept {
return overflowList_.try_pop_front();
}
ALWAYS_INLINE bool tryPush(typename ListImpl::reference value) noexcept {
if (batch_.full()) {
bool released = dispatcher_.releaseBatch(std::move(batch_));
if (!released) {
RuntimeLogDebug({ "balancing" }, "Batches pool overflow");
batch_.transferAllInto(overflowList_);
}
batch_ = Batch{};
}
return batch_.tryPush(value);
}
ALWAYS_INLINE typename ListImpl::pointer tryPop() noexcept {
if (batch_.empty()) {
while (true) {
bool acquired = dispatcher_.acquireBatch(batch_);
if (!acquired) {
if (!overflowList_.empty()) {
batch_.fillFrom(overflowList_);
RuntimeLogDebug({ "balancing" }, "Acquired %zu elements from the overflow list", batch_.elementsCount());
} else {
bool newWorkAvailable = waitForMoreWork();
if (newWorkAvailable) continue;
return nullptr;
}
}
RuntimeAssert(!batch_.empty(), "Must have acquired some elements");
break;
}
}
return batch_.tryPop();
}
private:
bool waitForMoreWork() noexcept {
RuntimeAssert(batch_.empty(), "Local batch must be depleted before waiting for shared work");
RuntimeAssert(overflowList_.empty(), "Local overflow list must be depleted before waiting for shared work");
std::unique_lock lock(dispatcher_.waitMutex_);
auto nowWaiting = dispatcher_.waitingWorkers_.fetch_add(1, std::memory_order_relaxed) + 1;
RuntimeLogDebug({ "balancing" }, "Worker goes to sleep (now sleeping %zu of %zu)",
nowWaiting, dispatcher_.registeredWorkers_.load(std::memory_order_relaxed));
if (dispatcher_.allDone_) {
dispatcher_.waitingWorkers_.fetch_sub(1, std::memory_order_relaxed);
return false;
}
if (nowWaiting == dispatcher_.registeredWorkers_.load(std::memory_order_relaxed)) {
// we are the last ones awake
RuntimeLogDebug({ "balancing" }, "Worker has detected termination");
dispatcher_.allDone_ = true;
lock.unlock();
dispatcher_.waitCV_.notify_all();
dispatcher_.waitingWorkers_.fetch_sub(1, std::memory_order_relaxed);
return false;
}
dispatcher_.waitCV_.wait(lock);
dispatcher_.waitingWorkers_.fetch_sub(1, std::memory_order_relaxed);
if (dispatcher_.allDone_) {
return false;
}
RuntimeLogDebug({ "balancing" }, "Worker woke up");
return true;
}
ParallelProcessor& dispatcher_;
Batch batch_;
ListImpl overflowList_;
};
ParallelProcessor() = default;
~ParallelProcessor() {
RuntimeAssert(waitingWorkers_.load() == 0, "All the workers must terminate before dispatcher destruction");
}
size_t registeredWorkers() {
return registeredWorkers_.load(std::memory_order_relaxed);
}
private:
bool releaseBatch(Batch&& batch) {
RuntimeAssert(!batch.empty(), "A batch to release into shared pool must be non-empty");
RuntimeLogDebug({ "balancing" }, "Releasing batch of %zu elements", batch.elementsCount());
bool shared = sharedBatches_.enqueue(std::move(batch));
if (shared) {
if (waitingWorkers_.load(std::memory_order_relaxed) > 0) {
waitCV_.notify_one();
}
}
return shared;
}
bool acquireBatch(Batch& dst) {
RuntimeAssert(dst.empty(), "Destination batch must be already depleted");
auto acquired = sharedBatches_.dequeue();
if (acquired) {
dst = std::move(*acquired);
RuntimeLogDebug({ "balancing" }, "Acquired a batch of %zu elements", dst.elementsCount());
return true;
}
return false;
}
BoundedQueue<Batch, kBatchesPoolSize> sharedBatches_;
std::atomic<size_t> registeredWorkers_ = 0;
std::atomic<size_t> waitingWorkers_ = 0;
std::atomic<bool> allDone_ = false;
mutable std::mutex waitMutex_;
mutable std::condition_variable waitCV_;
};
}
@@ -0,0 +1,115 @@
/*
* Copyright 2010-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license
* that can be found in the LICENSE file.
*/
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <list>
#include "IntrusiveList.hpp"
#include "ParallelProcessor.hpp"
#include "std_support/Vector.hpp"
#include "SingleThreadExecutor.hpp"
#include "TestSupport.hpp"
using ::testing::_;
using namespace kotlin;
namespace {
struct Task {
template <typename WorkList>
static void workLoop(WorkList& workList) {
while (Task* task = workList.tryPop()) {
RuntimeAssert(!task->done_.load(), "Tasks are not idempotent");
task->done_ = true;
}
}
Task* next() const noexcept { return next_; }
void setNext(Task* next) noexcept {
RuntimeAssert(next, "next cannot be nullptr");
next_ = next;
}
bool trySetNext(Task* next) noexcept {
RuntimeAssert(next, "next cannot be nullptr");
if (next_ == nullptr) {
next_ = next;
return true;
}
return false;
}
std::atomic<bool> done_ = false;
Task* next_ = nullptr;
};
auto workBatch(std::size_t size) {
std::list<Task> batch;
for (size_t i = 0; i < size; ++i) {
batch.emplace_back();
}
return batch;
}
template <typename WorkList, typename Iterable>
void offerWork(WorkList& wl, Iterable& batch) {
for (auto& task: batch) {
bool accepted = wl.tryPush(task);
RuntimeAssert(accepted, "Must be accepted");
}
}
using ListImpl = intrusive_forward_list<Task>;
static constexpr auto kBatchSize = 256;
using Processor = ParallelProcessor<ListImpl, kBatchSize, 1024>;
using Worker = typename Processor::Worker;
} // namespace
TEST(ParallelProcessorTest, ContededRegistration) {
Processor processor;
std::vector<std::unique_ptr<Worker>> workers(kDefaultThreadCount);
std::atomic<bool> start = false;
std::list<ScopedThread> threads;
for (int i = 0; i < kDefaultThreadCount; ++i) {
threads.emplace_back([i, &start, &workers, &processor] {
while (!start.load()) {}
workers[i] = std::make_unique<Worker>(processor);
});
}
start = true;
for (auto& t : threads) {
t.join();
}
EXPECT_THAT(processor.registeredWorkers(), kDefaultThreadCount);
workers.clear();
}
TEST(ParallelProcessorTest, Sharing) {
Processor processor;
Worker giver(processor);
Worker taker(processor);
auto work = workBatch(kBatchSize * 2);
offerWork(giver, work);
EXPECT_TRUE(taker.localEmpty());
// have to steal from giver
EXPECT_NE(taker.tryPop(), nullptr);
EXPECT_FALSE(taker.localEmpty());
}
@@ -56,6 +56,22 @@ protected:
~Pinned() = default;
};
// A helper that executes the action provided upon destruction of the ScopeGuard instance.
template<typename FinalAction>
class ScopeGuard final : private Pinned {
public:
template<typename InitAction>
ScopeGuard(InitAction initAction, FinalAction finalAction) noexcept : finalAction_(finalAction) {
initAction();
}
ScopeGuard(FinalAction finalAction) noexcept : finalAction_(finalAction) {}
~ScopeGuard() noexcept {
finalAction_();
}
private:
FinalAction finalAction_;
};
// A helper that scopley assigns a value to a variable. The variable will
// be set to its original value upon destruction of the AutoReset instance.
// Note that an AutoReset instance must have a shorter lifetime than