Native: fix losing Worker.executeAfter jobs on scheduled time clash
The implementation was keeping delayed jobs in std::set sorted only by the scheduled execution time (in microseconds since epoch). So two jobs submitted to a worker and having the same scheduled time were considered equivalent by the set, and one of them got lost. Fix this by using std::multiset instead of std::set.
This commit is contained in:
committed by
Space
parent
95648f1a9e
commit
24e7a11abc
@@ -118,3 +118,60 @@ import kotlin.native.concurrent.*
|
||||
assertEquals(2, counter.value)
|
||||
}
|
||||
}
|
||||
|
||||
// This test checks that when multiple `executeAfter` jobs are submitted to `targetWorker` and have the
|
||||
// same scheduled execution time (in micros since an epoch), nether of them gets lost.
|
||||
@Test fun testExecuteAfterScheduledTimeClash() = withWorker {
|
||||
val targetWorker = this
|
||||
val mainWorker = Worker.current
|
||||
|
||||
// Configuration of the test.
|
||||
val numberOfSubmitters = 2
|
||||
val numberOfTasks = 100
|
||||
val delayInMicroseconds = 100L
|
||||
|
||||
val submitters = Array(numberOfSubmitters) { Worker.start() }
|
||||
try {
|
||||
val readySubmittersCounter = AtomicInt(0)
|
||||
val executedTasksCounter = AtomicInt(0)
|
||||
val finishedBatchesCounter = AtomicInt(0)
|
||||
|
||||
submitters.forEach {
|
||||
it.executeAfter(0L, {
|
||||
readySubmittersCounter.increment()
|
||||
// Wait for other submitters, to make them all start at the same time:
|
||||
while (readySubmittersCounter.value != numberOfSubmitters) {}
|
||||
|
||||
// Concurrently submit tasks with matching scheduled execution time:
|
||||
repeat(numberOfTasks) {
|
||||
targetWorker.executeAfter(delayInMicroseconds, {
|
||||
executedTasksCounter.increment()
|
||||
}.freeze())
|
||||
}
|
||||
|
||||
// Use larger delay for the task below, to make sure it gets executed after
|
||||
// the tasks above submitted by the same worker.
|
||||
// If the order is wrong, the test will fail as well.
|
||||
// NOTE: the code below was affected by the same problem with clashing times, so despite all the effort
|
||||
// the test still might hang without a fix.
|
||||
targetWorker.executeAfter(delayInMicroseconds + 1, {
|
||||
mainWorker.executeAfter(0L, {
|
||||
finishedBatchesCounter.increment()
|
||||
}.freeze())
|
||||
}.freeze())
|
||||
}.freeze())
|
||||
}
|
||||
|
||||
while (finishedBatchesCounter.value != numberOfSubmitters) {
|
||||
// Wait and allow processing the `finishedBatchesCounter.increment()` tasks above:
|
||||
Worker.current.park(delayInMicroseconds, process = true)
|
||||
}
|
||||
|
||||
// Note: we could have just waited for the condition above to become true,
|
||||
// but this would mean that the test would hang in case of failure, which is not quite convenient.
|
||||
|
||||
assertEquals(numberOfSubmitters * numberOfTasks, executedTasksCounter.value)
|
||||
} finally {
|
||||
submitters.forEach { it.requestTermination().result }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,7 +77,7 @@ using KStdUnorderedSet = std::unordered_set<Value,
|
||||
std::hash<Value>, std::equal_to<Value>,
|
||||
KonanAllocator<Value>>;
|
||||
template<class Value, class Compare = std::less<Value>>
|
||||
using KStdOrderedSet = std::set<Value, Compare, KonanAllocator<Value>>;
|
||||
using KStdOrderedMultiset = std::multiset<Value, Compare, KonanAllocator<Value>>;
|
||||
template<class Key, class Value, class Compare = std::less<Key>>
|
||||
using KStdOrderedMap = std::map<Key, Value, Compare, KonanAllocator<std::pair<const Key, Value>>>;
|
||||
template<class Value>
|
||||
|
||||
@@ -130,7 +130,10 @@ struct JobCompare {
|
||||
}
|
||||
};
|
||||
|
||||
typedef KStdOrderedSet<Job, JobCompare> DelayedJobSet;
|
||||
// Using multiset instead of regular set, because we compare the jobs only by `whenExecute`.
|
||||
// So if `whenExecute` of two different jobs is the same, the jobs are considered equivalent,
|
||||
// and set would simply drop one of them.
|
||||
typedef KStdOrderedMultiset<Job, JobCompare> DelayedJobSet;
|
||||
|
||||
} // namespace
|
||||
|
||||
@@ -966,6 +969,9 @@ KLong Worker::checkDelayedLocked() {
|
||||
RuntimeAssert(job.kind == JOB_EXECUTE_AFTER, "Must be delayed job");
|
||||
auto now = konan::getTimeMicros();
|
||||
if (job.executeAfter.whenExecute <= now) {
|
||||
// Note: `delayed_` is multiset sorted only by `whenExecute`.
|
||||
// So using erase(it) instead of erase(job) is crucial,
|
||||
// because the latter would remove all the jobs with the same `whenExecute`.
|
||||
delayed_.erase(it);
|
||||
queue_.push_back(job);
|
||||
return 0;
|
||||
|
||||
Reference in New Issue
Block a user