From fd522ea75608c15542f643816f4a389d6cd63cea Mon Sep 17 00:00:00 2001 From: Alexander Shabalin Date: Tue, 18 Jul 2023 17:05:59 +0200 Subject: [PATCH] [K/N] Add condition_variable that spins on atomics --- .../src/main/cpp/ConditionVariable.hpp | 63 ++++ .../src/main/cpp/ConditionVariableTest.cpp | 353 ++++++++++++++++++ 2 files changed, 416 insertions(+) create mode 100644 kotlin-native/runtime/src/main/cpp/ConditionVariable.hpp create mode 100644 kotlin-native/runtime/src/main/cpp/ConditionVariableTest.cpp diff --git a/kotlin-native/runtime/src/main/cpp/ConditionVariable.hpp b/kotlin-native/runtime/src/main/cpp/ConditionVariable.hpp new file mode 100644 index 00000000000..5eb7c29a604 --- /dev/null +++ b/kotlin-native/runtime/src/main/cpp/ConditionVariable.hpp @@ -0,0 +1,63 @@ +/* + * Copyright 2010-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license + * that can be found in the LICENSE file. + */ + +#pragma once + +#include +#include +#include + +#include "Utils.hpp" + +namespace kotlin { + +// `std::condition_variable_any` implemented via spinning on atomics. +class ConditionVariableSpin : private Pinned { +public: + // Notify all threads waiting for `this. + // Exactly the same as `notify_all`. Providing this method to be fully compatible + // with `std::condition_variable_any`. + void notify_one() noexcept { + // Conditional variable does not protect the data: a mutex must be + // used to protect it, so we don't need synchronization. + epoch_.fetch_add(1, std::memory_order_relaxed); + } + + // Notify all threads waiting for `this. + void notify_all() noexcept { + // Conditional variable does not protect the data: a mutex must be + // used to protect it, so we don't need synchronization. + epoch_.fetch_add(1, std::memory_order_relaxed); + } + + // Wait until next call to `notify_*`. + template + void wait(Lock& lock) { + auto currentEpoch = epoch_.load(std::memory_order_relaxed); + lock.unlock(); + // Waiting for any change of the epoch. + while (epoch_.load(std::memory_order_relaxed) == currentEpoch) { + std::this_thread::yield(); + } + lock.lock(); + } + + // Waits until `stopWaiting()` starts returning true. Any change + // that leads to `stopWaiting()` must be followed by `notify_*` for + // this `wait` to stop waiting. + template + void wait(Lock& lock, Predicate stopWaiting) { + while (!stopWaiting()) { + wait(lock); + } + } + + // TODO: Implement wait_until and wait_for using kotlin::steady_clock. + +private: + std::atomic epoch_ = 0; +}; + +} // namespace kotlin diff --git a/kotlin-native/runtime/src/main/cpp/ConditionVariableTest.cpp b/kotlin-native/runtime/src/main/cpp/ConditionVariableTest.cpp new file mode 100644 index 00000000000..a94e93176fa --- /dev/null +++ b/kotlin-native/runtime/src/main/cpp/ConditionVariableTest.cpp @@ -0,0 +1,353 @@ +/* + * Copyright 2010-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license + * that can be found in the LICENSE file. + */ + +#include "ConditionVariable.hpp" + +#include +#include + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include "ScopedThread.hpp" +#include "TestSupport.hpp" +#include "std_support/Vector.hpp" + +using namespace kotlin; + +template +class ConditionVariableTest : public testing::Test {}; + +using CVTypes = testing::Types< +#ifndef KONAN_WINDOWS // winpthreads are acting strange in our mingw toolchain. + std::condition_variable, + std::condition_variable_any, +#endif + ConditionVariableSpin>; +class CVNames { +public: + template + static std::string GetName(int) { + if constexpr (std::is_same_v) { + return "condition_variable"; + } else if constexpr (std::is_same_v) { + return "condition_variable_any"; + } else if constexpr (std::is_same_v) { + return "ConditionVariableSpin"; + } + } +}; +TYPED_TEST_SUITE(ConditionVariableTest, CVTypes, CVNames); + +TYPED_TEST(ConditionVariableTest, NotifyNobody) { + using CVUnderTest = TypeParam; + + CVUnderTest cv; + cv.notify_one(); + cv.notify_all(); +} + +TYPED_TEST(ConditionVariableTest, WaitOne) { + using CVUnderTest = TypeParam; + + bool flag = false; + std::mutex m; + CVUnderTest cv; + + std::atomic waiting = false; + ScopedThread thread([&] { + std::unique_lock guard(m); + EXPECT_FALSE(flag); + waiting.store(true, std::memory_order_relaxed); + while (!flag) { + cv.wait(guard); + } + }); + + while (!waiting.load(std::memory_order_relaxed)) { + std::this_thread::yield(); + } + { + std::unique_lock guard(m); + flag = true; + } + cv.notify_one(); + + thread.join(); +} + +TYPED_TEST(ConditionVariableTest, WaitOneNotifyUnderLock) { + using CVUnderTest = TypeParam; + + bool flag = false; + std::mutex m; + CVUnderTest cv; + + std::atomic waiting = false; + ScopedThread thread([&] { + std::unique_lock guard(m); + EXPECT_FALSE(flag); + waiting.store(true, std::memory_order_relaxed); + while (!flag) { + cv.wait(guard); + } + }); + + while (!waiting.load(std::memory_order_relaxed)) { + std::this_thread::yield(); + } + { + std::unique_lock guard(m); + flag = true; + cv.notify_one(); + } + + thread.join(); +} + +TYPED_TEST(ConditionVariableTest, WaitAll) { + using CVUnderTest = TypeParam; + + bool flag = false; + std::mutex m; + CVUnderTest cv; + + std::atomic waiting = 0; + std_support::vector threads; + for (int i = 0; i < kDefaultThreadCount; ++i) { + threads.emplace_back([&] { + std::unique_lock guard(m); + EXPECT_FALSE(flag); + waiting.fetch_add(1, std::memory_order_relaxed); + while (!flag) { + cv.wait(guard); + } + }); + } + + while (waiting.load(std::memory_order_relaxed) != threads.size()) { + std::this_thread::yield(); + } + { + std::unique_lock guard(m); + flag = true; + } + cv.notify_all(); + + threads.clear(); +} + +TYPED_TEST(ConditionVariableTest, WaitAllNotifyUnderLock) { + using CVUnderTest = TypeParam; + + bool flag = false; + std::mutex m; + CVUnderTest cv; + + std::atomic waiting = 0; + std_support::vector threads; + for (int i = 0; i < kDefaultThreadCount; ++i) { + threads.emplace_back([&] { + std::unique_lock guard(m); + EXPECT_FALSE(flag); + waiting.fetch_add(1, std::memory_order_relaxed); + while (!flag) { + cv.wait(guard); + } + }); + } + + while (waiting.load(std::memory_order_relaxed) != threads.size()) { + std::this_thread::yield(); + } + { + std::unique_lock guard(m); + flag = true; + cv.notify_all(); + } + + threads.clear(); +} + +TYPED_TEST(ConditionVariableTest, WaitPredicateOne) { + using CVUnderTest = TypeParam; + + bool flag = false; + std::mutex m; + CVUnderTest cv; + + std::atomic waiting = false; + ScopedThread thread([&] { + std::unique_lock guard(m); + EXPECT_FALSE(flag); + waiting.store(true, std::memory_order_relaxed); + cv.wait(guard, [&] { return flag; }); + EXPECT_TRUE(flag); + }); + + while (!waiting.load(std::memory_order_relaxed)) { + std::this_thread::yield(); + } + { + std::unique_lock guard(m); + flag = true; + } + cv.notify_all(); + + thread.join(); +} + +TYPED_TEST(ConditionVariableTest, WaitPredicateOneNotifyUnderLock) { + using CVUnderTest = TypeParam; + + bool flag = false; + std::mutex m; + CVUnderTest cv; + + std::atomic waiting = 0; + ScopedThread thread([&] { + std::unique_lock guard(m); + EXPECT_FALSE(flag); + waiting.store(true, std::memory_order_relaxed); + cv.wait(guard, [&] { return flag; }); + EXPECT_TRUE(flag); + }); + + while (!waiting.load(std::memory_order_relaxed)) { + std::this_thread::yield(); + } + { + std::unique_lock guard(m); + flag = true; + cv.notify_all(); + } + + thread.join(); +} + +TYPED_TEST(ConditionVariableTest, WaitPredicateAll) { + using CVUnderTest = TypeParam; + + bool flag = false; + std::mutex m; + CVUnderTest cv; + + std::atomic waiting = 0; + std_support::vector threads; + for (int i = 0; i < kDefaultThreadCount; ++i) { + threads.emplace_back([&] { + std::unique_lock guard(m); + EXPECT_FALSE(flag); + waiting.fetch_add(1, std::memory_order_relaxed); + cv.wait(guard, [&] { return flag; }); + EXPECT_TRUE(flag); + }); + } + + while (waiting.load(std::memory_order_relaxed) != threads.size()) { + std::this_thread::yield(); + } + { + std::unique_lock guard(m); + flag = true; + } + cv.notify_all(); + + threads.clear(); +} + +TYPED_TEST(ConditionVariableTest, WaitPredicateAllNotifyUnderLock) { + using CVUnderTest = TypeParam; + + bool flag = false; + std::mutex m; + CVUnderTest cv; + + std::atomic waiting = 0; + std_support::vector threads; + for (int i = 0; i < kDefaultThreadCount; ++i) { + threads.emplace_back([&] { + std::unique_lock guard(m); + EXPECT_FALSE(flag); + waiting.fetch_add(1, std::memory_order_relaxed); + cv.wait(guard, [&] { return flag; }); + EXPECT_TRUE(flag); + }); + } + + while (waiting.load(std::memory_order_relaxed) != threads.size()) { + std::this_thread::yield(); + } + { + std::unique_lock guard(m); + flag = true; + cv.notify_all(); + } + + threads.clear(); +} + +TYPED_TEST(ConditionVariableTest, Checkpoint) { + constexpr uint64_t epochsCount = 1000; + using CVUnderTest = TypeParam; + + uint64_t epochScheduled = 0; + uint64_t epochStarted = 0; + uint64_t epochFinished = 0; + std::mutex m; + CVUnderTest cv; + + auto schedule = [&] { + std::unique_lock guard(m); + if (epochScheduled > epochStarted) { + return epochScheduled; + } + epochScheduled = epochStarted + 1; + return epochScheduled; + }; + + std_support::vector threads; + std::array, kDefaultThreadCount> checkpoints = {0}; + for (int i = 0; i < kDefaultThreadCount; ++i) { + threads.emplace_back([&, i] { + while (true) { + uint64_t epoch = schedule(); + if (epoch >= epochsCount) return; + { + std::unique_lock guard(m); + if (epochFinished < epoch) { + checkpoints[i].store(2 * epoch, std::memory_order_relaxed); + cv.wait(guard, [&] { return epochFinished >= epoch; }); + checkpoints[i].store(2 * epoch + 1, std::memory_order_relaxed); + } + } + } + }); + } + + while (epochStarted <= epochsCount) { + { + std::unique_lock guard(m); + ++epochStarted; + } + std::this_thread::yield(); + { + std::unique_lock guard(m); + epochFinished = epochStarted; + } + cv.notify_all(); + for (auto& checkpoint : checkpoints) { + while (true) { + auto value = checkpoint.load(std::memory_order_relaxed); + auto epoch = value / 2; + bool isWaiting = value % 2 == 0; + if (epoch > epochFinished) break; + if (!isWaiting) break; + std::this_thread::yield(); + } + } + } + threads.clear(); +}