diff --git a/kotlin-native/backend.native/tests/runtime/workers/worker4.kt b/kotlin-native/backend.native/tests/runtime/workers/worker4.kt index a6a20b207de..cdcba55d7fc 100644 --- a/kotlin-native/backend.native/tests/runtime/workers/worker4.kt +++ b/kotlin-native/backend.native/tests/runtime/workers/worker4.kt @@ -118,3 +118,60 @@ import kotlin.native.concurrent.* assertEquals(2, counter.value) } } + +// This test checks that when multiple `executeAfter` jobs are submitted to `targetWorker` and have the +// same scheduled execution time (in micros since an epoch), nether of them gets lost. +@Test fun testExecuteAfterScheduledTimeClash() = withWorker { + val targetWorker = this + val mainWorker = Worker.current + + // Configuration of the test. + val numberOfSubmitters = 2 + val numberOfTasks = 100 + val delayInMicroseconds = 100L + + val submitters = Array(numberOfSubmitters) { Worker.start() } + try { + val readySubmittersCounter = AtomicInt(0) + val executedTasksCounter = AtomicInt(0) + val finishedBatchesCounter = AtomicInt(0) + + submitters.forEach { + it.executeAfter(0L, { + readySubmittersCounter.increment() + // Wait for other submitters, to make them all start at the same time: + while (readySubmittersCounter.value != numberOfSubmitters) {} + + // Concurrently submit tasks with matching scheduled execution time: + repeat(numberOfTasks) { + targetWorker.executeAfter(delayInMicroseconds, { + executedTasksCounter.increment() + }.freeze()) + } + + // Use larger delay for the task below, to make sure it gets executed after + // the tasks above submitted by the same worker. + // If the order is wrong, the test will fail as well. + // NOTE: the code below was affected by the same problem with clashing times, so despite all the effort + // the test still might hang without a fix. + targetWorker.executeAfter(delayInMicroseconds + 1, { + mainWorker.executeAfter(0L, { + finishedBatchesCounter.increment() + }.freeze()) + }.freeze()) + }.freeze()) + } + + while (finishedBatchesCounter.value != numberOfSubmitters) { + // Wait and allow processing the `finishedBatchesCounter.increment()` tasks above: + Worker.current.park(delayInMicroseconds, process = true) + } + + // Note: we could have just waited for the condition above to become true, + // but this would mean that the test would hang in case of failure, which is not quite convenient. + + assertEquals(numberOfSubmitters * numberOfTasks, executedTasksCounter.value) + } finally { + submitters.forEach { it.requestTermination().result } + } +} diff --git a/kotlin-native/runtime/src/main/cpp/Types.h b/kotlin-native/runtime/src/main/cpp/Types.h index bcdcdde20c4..51d09cb4adc 100644 --- a/kotlin-native/runtime/src/main/cpp/Types.h +++ b/kotlin-native/runtime/src/main/cpp/Types.h @@ -77,7 +77,7 @@ using KStdUnorderedSet = std::unordered_set, std::equal_to, KonanAllocator>; template> -using KStdOrderedSet = std::set>; +using KStdOrderedMultiset = std::multiset>; template> using KStdOrderedMap = std::map>>; template diff --git a/kotlin-native/runtime/src/main/cpp/Worker.cpp b/kotlin-native/runtime/src/main/cpp/Worker.cpp index b659835ae26..2f717bb951a 100644 --- a/kotlin-native/runtime/src/main/cpp/Worker.cpp +++ b/kotlin-native/runtime/src/main/cpp/Worker.cpp @@ -130,7 +130,10 @@ struct JobCompare { } }; -typedef KStdOrderedSet DelayedJobSet; +// Using multiset instead of regular set, because we compare the jobs only by `whenExecute`. +// So if `whenExecute` of two different jobs is the same, the jobs are considered equivalent, +// and set would simply drop one of them. +typedef KStdOrderedMultiset DelayedJobSet; } // namespace @@ -966,6 +969,9 @@ KLong Worker::checkDelayedLocked() { RuntimeAssert(job.kind == JOB_EXECUTE_AFTER, "Must be delayed job"); auto now = konan::getTimeMicros(); if (job.executeAfter.whenExecute <= now) { + // Note: `delayed_` is multiset sorted only by `whenExecute`. + // So using erase(it) instead of erase(job) is crucial, + // because the latter would remove all the jobs with the same `whenExecute`. delayed_.erase(it); queue_.push_back(job); return 0;