Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
433 changes: 433 additions & 0 deletions Docs/test-determinism-executor-drain.md

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions Sources/SwiftModel/Internal/Cancellables.swift
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,23 @@ extension TaskCancellable {
}
}

#if canImport(Dispatch)
// EXPERIMENTAL (executor-drain quiescence): in `.modelTesting`, run
// model task bodies on the per-test harness executor so the test can
// drive them to a fixpoint instead of waiting on the wall clock. The
// contextual `Task<Void, Error>` return type resolves the throwing
// `executorPreference` overload, exactly as the plain spawns below.
// (Name is dropped here only because the `name:`+`executorPreference:`
// combined initializer isn't available across all supported toolchains.)
if #available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *),
let exec = _TestExecutorBox.current as? _DrainTestExecutor {
if isDetached {
return Task.detached(executorPreference: exec, priority: priority, operation: operation)
} else {
return Task(executorPreference: exec, priority: priority, operation: operation)
}
}
#endif
if isDetached {
return Task.detached(name: taskName, priority: priority, operation: operation)
} else {
Expand Down
8 changes: 8 additions & 0 deletions Sources/SwiftModel/Internal/TestAccess.swift
Original file line number Diff line number Diff line change
Expand Up @@ -960,10 +960,18 @@ final class TestAccess<Root: Model>: ModelAccess, @unchecked Sendable {
/// Thread-safety: callable from any thread. Wakes continuations OUTSIDE
/// the lock so resumed Tasks can call back into `awaitPredicate` /
/// `awaitQuietWindow` without re-entering.
/// Monotonic-ns of the most recent model activity (write / event / probe /
/// task-body-start) seen by `_noteActivity`. The executor-drain grace window
/// debounces against this (and the executor's last enqueue), so any activity
/// — including a clock-suspended task's resume-and-write — resets the grace,
/// keeping a wait from declaring quiescence while work is still in flight.
var _lastActivityNs: UInt64 = 0

func _noteActivity() {
var wakes: [CheckedContinuation<PredicateOutcome, Never>] = []
lock {
let now = monotonicNanoseconds()
_lastActivityNs = now
// Iterate in reverse so removals don't shift indices we haven't
// visited yet.
for i in (0..<_pendingExpects.count).reversed() {
Expand Down
261 changes: 261 additions & 0 deletions Sources/SwiftModel/Internal/TestExecutorDrive.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
import Foundation
#if canImport(Dispatch)
import Dispatch
#endif

// MARK: - Executor-drain quiescence (primary resolution for expect/settle/waitUntil)
//
// The durable fix for `.modelTesting` load-flakiness (see
// docs/test-determinism-executor-drain.md). Under `.modelTesting`, model tasks
// run on a per-test harness executor; the wait verbs resolve on the EXECUTOR
// DRAIN FIXPOINT — a load-independent, non-starvable "the model is quiescent"
// signal — rather than a wall-clock budget or a `.background`-QoS quiet-check
// (which macOS starves under parallel load — the disease this replaces).
//
// Semantics (the contract):
// • pass the instant the target is met (reactive `_noteActivity`, as before);
// • for `expect`/`waitUntil`: fail the instant the model is QUIESCENT with the
// target still unmet (fast interactively; "as long as necessary" under load);
// • `settle`: succeed when quiescent;
// • the wall clock is only a generous deadlock watchdog (`_executorHangNs`).
//
// "Quiescent" = a STABLE fixpoint: executor has no ready jobs AND bg pump idle
// AND MainActor queue idle AND no task pending its first run — observed across
// two consecutive checks (a task suspended mid-`await` re-enqueues between
// checks, so a single idle instant is not trusted; this 2-consecutive gate is
// what iteration 3 proved prevents premature fixpoints).

/// Per-test harness executor box (task-local), set by the `.modelTesting` scope.
/// `(any Sendable)?` so the declaration compiles where `TaskExecutor` is
/// unavailable (macOS < 15, WASM); readers downcast under `if #available`.
enum _TestExecutorBox {
@TaskLocal static var current: (any Sendable)?
}

/// Generous deadlock watchdog: how long a wait will drive toward a fixpoint
/// before giving up and reporting (a true deadlock / runaway never reaches a
/// fixpoint). NOT a per-wait budget — a healthy wait resolves at its fixpoint
/// long before this. **Tunable knob** (maintainer judgment): large enough to
/// never fire on a healthy wait under heavy parallel load; small enough that an
/// interactive deadlock surfaces in reasonable time.
func _executorHangDeadlineNs() -> UInt64 {
_drainMonotonicNs() &+ 120_000_000_000 // 120 s
}

private final class _GTSSleepState: @unchecked Sendable {
var cont: CheckedContinuation<Void, Never>?
var cancel: (@Sendable () -> Void)?
var done = false
}

@inline(__always) func _drainMonotonicNs() -> UInt64 {
#if canImport(Dispatch)
return DispatchTime.now().uptimeNanoseconds
#else
return UInt64(ProcessInfo.processInfo.systemUptime * 1_000_000_000)
#endif
}

/// Build a fresh per-test executor box, or `nil` where custom task executors
/// aren't available. Opt-in via `SWIFT_MODEL_EXPERIMENTAL_DRAIN=1`; inert
/// otherwise (every wait keeps its current path).
///
/// NOTE: cannot be on by default yet. The drive needs model tasks on this
/// executor, and routing them off the cooperative pool is slightly slower under
/// parallel load — enough to trip `expect`'s wall-clock budget in latency-
/// sensitive clock tests (`childTasksCompleteBeforeTeardown`, `testImmediateClock`
/// regress with it on). Enabling by default is unblocked only once `expect`/
/// `waitUntil` are also drive-primary (so they don't depend on a wall-clock
/// budget). Until then this stays opt-in. See docs/test-determinism-executor-drain.md.
func _makeTestExecutorBox() -> (any Sendable)? {
#if canImport(Dispatch)
if #available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *),
ProcessInfo.processInfo.environment["SWIFT_MODEL_EXPERIMENTAL_DRAIN"] == "1" {
return _DrainTestExecutor()
}
#endif
return nil
}

#if canImport(Dispatch)
/// ONE process-wide concurrent queue backs every per-test executor — a per-test
/// `.concurrent` queue would explode GCD's worker pool under full-parallel test
/// runs. Each executor keeps its own `outstanding` counter, so per-test
/// quiescence detection stays isolated.
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
private let _sharedDrainQueue = DispatchQueue(label: "swift-model.test-drain.shared", attributes: .concurrent)

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
final class _DrainTestExecutor: TaskExecutor, @unchecked Sendable {
private let lock = NSLock()
private var outstanding = 0
private var _lastEnqueueNs: UInt64 = 0
/// Closures that fire (at most once each) when `outstanding` hits 0.
private var idleWaiters: [(id: UInt64, fire: @Sendable () -> Void)] = []
private var nextWaiterId: UInt64 = 0

/// Monotonic-ns of the most recent `enqueue`. A task suspended mid-`await`
/// (e.g. on a clock) re-enqueues when it resumes, bumping this — so the drive
/// can require executor-idle to *persist* for a debounce window since the
/// last enqueue, bridging the suspend→resume gap that an instantaneous idle
/// check slips through (premature fixpoint).
var lastEnqueueNs: UInt64 { lock.withLock { _lastEnqueueNs } }

func enqueue(_ job: consuming ExecutorJob) {
let unowned = UnownedJob(job)
lock.withLock { outstanding += 1; _lastEnqueueNs = _drainMonotonicNs() }
_sharedDrainQueue.async {
unowned.runSynchronously(on: self.asUnownedTaskExecutor())
let toFire: [@Sendable () -> Void] = self.lock.withLock {
self.outstanding -= 1
guard self.outstanding == 0 else { return [] }
let fns = self.idleWaiters.map(\.fire)
self.idleWaiters.removeAll()
return fns
}
for f in toFire { f() }
}
}

/// No executor job is ready/running this instant.
var isExecutorIdle: Bool { lock.withLock { outstanding == 0 } }

/// Suspend until `outstanding == 0` (event-driven — fired from `enqueue`'s
/// completion, non-starvable, no QoS dependency), OR the GTS `deadlineNs`
/// fires (the deadlock watchdog — `.userInitiated`, also non-starvable), OR
/// the Task is cancelled. At-most-once resolution across all three.
func waitUntilIdleOrDeadline(_ deadlineNs: UInt64) async {
final class State: @unchecked Sendable {
var cont: CheckedContinuation<Void, Never>?
var timerCancel: (@Sendable () -> Void)?
var resumed = false
}
let state = State()
let id = lock.withLock { () -> UInt64 in nextWaiterId += 1; return nextWaiterId }

let resolve: @Sendable () -> Void = { [self] in
let cont: CheckedContinuation<Void, Never>? = lock.withLock {
guard !state.resumed else { return nil }
state.resumed = true
state.timerCancel?()
state.timerCancel = nil
idleWaiters.removeAll { $0.id == id }
let c = state.cont
state.cont = nil
return c
}
cont?.resume()
}

await withTaskCancellationHandler {
await withCheckedContinuation { (c: CheckedContinuation<Void, Never>) in
let immediate: Bool = lock.withLock {
if Task.isCancelled || outstanding == 0 { state.resumed = true; return true }
state.cont = c
idleWaiters.append((id, { resolve() }))
return false
}
if immediate { c.resume(); return }
let cancel = scheduleAfter(deadline: deadlineNs) { resolve() }
let stale = lock.withLock { () -> Bool in
if state.resumed { return true }
state.timerCancel = cancel
return false
}
if stale { cancel() }
}
} onCancel: {
resolve()
}
}
}
#endif

extension TestAccess {
/// True when a per-test harness executor is installed (flag on, macOS 15+).
var _isExecutorDriveActive: Bool {
#if canImport(Dispatch)
if #available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *) {
return _TestExecutorBox.current is _DrainTestExecutor
}
#endif
return false
}

/// Drive the model to a STABLE fixpoint (quiescent across two consecutive
/// checks). Returns `true` at the fixpoint, `false` if the deadlock watchdog
/// (`hangDeadlineNs`) fires or the Task is cancelled. No executor → `true`
/// (callers gate on `_isExecutorDriveActive`).
func _driveToStableFixpoint(hangDeadlineNs: UInt64, graceNs: UInt64) async -> Bool {
#if canImport(Dispatch)
if #available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *),
let exec = _TestExecutorBox.current as? _DrainTestExecutor {
let bg = backgroundCall
// Quiescence = executor idle AND bg idle AND no task pending its first
// run AND NO ACTIVITY OF ANY KIND for a `graceNs` grace window. The
// grace debounces against the most recent of (a) any `_noteActivity`
// — model write / event / probe / task-body-start — and (b) any
// executor enqueue. So ANY activity (a clock-parked task resuming and
// writing, an enqueue, an event) resets the window: on a single test
// the system is quiet at once and the grace elapses fast; under stress
// every resuming task keeps resetting it, so the wait lasts as long as
// necessary and only declares quiescence once the system is genuinely
// done. NON-STARVABLE throughout (counter + GTS, never `.background`).
// `mainCall` is excluded — it's process-global, not per-test.
while !Task.isCancelled {
if _drainMonotonicNs() >= hangDeadlineNs { return false }
await exec.waitUntilIdleOrDeadline(hangDeadlineNs)
if !bg.isIdle { await bg.waitForCurrentItems(deadline: hangDeadlineNs) }
let idleNow = exec.isExecutorIdle && bg.isIdle && !self.context.hasPendingStartTask
if idleNow {
let lastActivity = max(self._lastActivityNs, exec.lastEnqueueNs)
let sinceActivity = _drainMonotonicNs() &- lastActivity
if sinceActivity >= graceNs {
return true // idle, and no activity of any kind for a full grace window
}
// Idle but recent activity — wait out the remainder of the
// grace (non-starvable), then re-check; a resuming task will
// have produced fresh activity and reset it by then.
await _gtsSleep(graceNs &- sinceActivity, hangDeadlineNs: hangDeadlineNs)
}
}
return false
}
#endif
return true
}

/// Grace window for `settle`'s fixpoint debounce. `settle` is forgiving (a
/// slightly early settle just means the next line re-settles), so a short
/// grace suffices. Tunable knob (runtime vs robustness).
static var _settleGraceNs: UInt64 { 30_000_000 } // 30 ms

/// Non-starvable sleep for `ns` (or until `hangDeadlineNs`), via GTS — used
/// by the drive's debounce. Cancellation-aware.
func _gtsSleep(_ ns: UInt64, hangDeadlineNs: UInt64) async {
#if canImport(Dispatch)
let target = min(_drainMonotonicNs() &+ ns, hangDeadlineNs)
let s = _GTSSleepState(); let lk = NSLock()
let fire: @Sendable () -> Void = {
let c: CheckedContinuation<Void, Never>? = lk.withLock {
guard !s.done else { return nil }
s.done = true; s.cancel?(); s.cancel = nil
let c = s.cont; s.cont = nil; return c
}
c?.resume()
}
await withTaskCancellationHandler {
await withCheckedContinuation { (c: CheckedContinuation<Void, Never>) in
let immediate = lk.withLock { () -> Bool in
if Task.isCancelled || s.done { return true }
s.cont = c; return false
}
if immediate { c.resume(); return }
let cancel = scheduleAfter(deadline: target) { fire() }
let stale = lk.withLock { () -> Bool in if s.done { return true }; s.cancel = cancel; return false }
if stale { cancel() }
}
} onCancel: { fire() }
#endif
}
}
15 changes: 15 additions & 0 deletions Sources/SwiftModel/Internal/TestWaitSupport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,21 @@ extension TestAccess {
/// budget was exhausted.
@discardableResult
package func waitUntilSettled(cleanup: Bool = false, at fileAndLine: FileAndLine) async -> Bool {
// Executor-drain path (primary, when the per-test harness executor is
// active): resolve on the model's STABLE FIXPOINT — a non-starvable,
// load-independent quiescence signal — instead of the `.deferential`/
// `.background` quiet-check (which macOS starves under parallel load,
// the root cause of the false `settle() timed out`). Waits as long as
// necessary; only a genuine deadlock (no fixpoint within the watchdog)
// fails. See docs/test-determinism-executor-drain.md.
if _isExecutorDriveActive {
let reached = await _driveToStableFixpoint(hangDeadlineNs: _executorHangDeadlineNs(), graceNs: Self._settleGraceNs)
if !reached, !cleanup {
fail("settle() timed out: model never reached a fixpoint (deadlock or runaway).\n\(settleDiagnostics())", at: fileAndLine)
}
return reached
}

let window = cleanup ? Self.settleDebounceCleanupNs : Self.settleDebounceInTestNs
let totalBudgetNs = cleanup ? Self.settleCleanupTotalBudgetNs : Self.settleTotalBudgetNs
let startNs = _monotonicNs()
Expand Down
30 changes: 18 additions & 12 deletions Sources/SwiftModel/Testing/ModelTestingTrait.swift
Original file line number Diff line number Diff line change
Expand Up @@ -441,12 +441,15 @@ private func _withModelTestingImpl(
}
let pending = _PendingModelTestScope(exhaustivity: resolvedExhaustivity, dependencies: mergedDependencies)
let testQueue = BackgroundCallQueue()
let execBox = _makeTestExecutorBox()
try await _BackgroundCallLocals.$queue.withValue(testQueue) {
try await _ModelTestingLocals.$scope.withValue(pending) {
try await body()
}
if let concrete = pending.concrete, let fl = pending.registrationFileAndLine {
await concrete.checkExhaustion(at: fl)
try await _TestExecutorBox.$current.withValue(execBox) {
try await _ModelTestingLocals.$scope.withValue(pending) {
try await body()
}
if let concrete = pending.concrete, let fl = pending.registrationFileAndLine {
await concrete.checkExhaustion(at: fl)
}
}
}
}
Expand Down Expand Up @@ -492,6 +495,7 @@ extension ModelTestingTrait: TestScoping, TestTrait, SuiteTrait {
// each other's in-flight Observed pipeline updates.
let testQueue = BackgroundCallQueue()
let testTag = test.name
let execBox = _makeTestExecutorBox()
try await _BackgroundCallLocals.$queue.withValue(testQueue) {
// Per-test wall-clock cap.
//
Expand All @@ -507,13 +511,15 @@ extension ModelTestingTrait: TestScoping, TestTrait, SuiteTrait {
// three the test should fail explicitly rather than hang.
try await withoutActuallyEscaping(function) { escapingFunction in
try await _withTestTimeout(seconds: ModelTestingTraitOptions.testWallClockSeconds, testTag: testTag) {
try await _ModelTestingLocals.$scope.withValue(pending) {
try await escapingFunction()
}
// After the test body completes, run exhaustion check (still inside the
// test-local queue scope so any teardown backgroundCall work uses testQueue).
if let concrete = pending.concrete, let fl = pending.registrationFileAndLine {
await concrete.checkExhaustion(at: fl)
try await _TestExecutorBox.$current.withValue(execBox) {
try await _ModelTestingLocals.$scope.withValue(pending) {
try await escapingFunction()
}
// After the test body completes, run exhaustion check (still inside the
// test-local queue scope so any teardown backgroundCall work uses testQueue).
if let concrete = pending.concrete, let fl = pending.registrationFileAndLine {
await concrete.checkExhaustion(at: fl)
}
}
}
}
Expand Down
Loading