diff --git a/Sources/Rownd/Models/Context/ReSwiftObserver.swift b/Sources/Rownd/Models/Context/ReSwiftObserver.swift index ae86824..f34fa9a 100644 --- a/Sources/Rownd/Models/Context/ReSwiftObserver.swift +++ b/Sources/Rownd/Models/Context/ReSwiftObserver.swift @@ -10,17 +10,49 @@ import Foundation import ReSwift import SwiftUI -// MARK: - Main Thread Dispatch Helper - -/// Helper to centralize main-thread dispatch with weak self handling. -/// Reduces duplication and ensures consistent patterns across observable state types. -private func dispatchOnMain(_ instance: T, execute work: @escaping (T) -> Void) { +// MARK: - Main Actor Dispatch Helper + +/// Dispatches work to the MainActor from a nonisolated context. +/// +/// Uses `DispatchQueue.main.async` to preserve FIFO ordering of state updates, +/// then hops into a `@MainActor` Task for proper isolation. This prevents the +/// ordering issues that can occur with unstructured Task spawning under +/// high-frequency state changes. +/// +/// - Parameters: +/// - instance: The object to operate on (captured weakly) +/// - state: The state value to process +/// - work: The MainActor-isolated work to perform +private func dispatchToMainActor( + _ instance: T, + state: S, + work: @escaping @MainActor (T, S) -> Void +) { + // Use DispatchQueue.main.async for FIFO ordering, then Task for @MainActor isolation DispatchQueue.main.async { [weak instance] in guard let instance = instance else { return } - work(instance) + Task { @MainActor in + work(instance, state) + } } } +// MARK: - ObservableState + +/// Observable wrapper for ReSwift state slices that publishes changes to SwiftUI. +/// Uses @MainActor to ensure all @Published property access is thread-safe. +/// +/// ## Thread Safety +/// ReSwift may call `newState(state:)` from any thread. This class uses @MainActor +/// isolation to ensure all @Published property access occurs on the main thread, +/// preventing crashes in swift_retain when accessing Combine's Published wrapper. +/// +/// ## State Update Ordering +/// State updates are dispatched through DispatchQueue.main.async to maintain FIFO ordering, +/// then processed on the MainActor. While this preserves ordering of dispatch calls, +/// the actual property updates occur asynchronously. For most SwiftUI use cases this is +/// acceptable since SwiftUI will render the final state. +@MainActor public class ObservableState: ObservableObject, StoreSubscriber, ObservableSubscription { @@ -42,43 +74,45 @@ public class ObservableState: ObservableObject, StoreSubscriber, Ob public func subscribe() { guard !isSubscribed else { return } - // Capture selector directly to avoid retaining self in the transform closure let selector = self.selector - dispatchOnMain(self) { instance in - guard !instance.isSubscribed else { return } - Context.currentContext.store.subscribe( - instance, transform: { $0.select(selector) }) - instance.isSubscribed = true - } + Context.currentContext.store.subscribe(self, transform: { $0.select(selector) }) + isSubscribed = true } func unsubscribe() { guard isSubscribed else { return } - dispatchOnMain(self) { instance in - guard instance.isSubscribed else { return } - Context.currentContext.store.unsubscribe(instance) - instance.isSubscribed = false - } + Context.currentContext.store.unsubscribe(self) + isSubscribed = false } deinit { - unsubscribe() + // Note: deinit is nonisolated even for @MainActor classes. + // ReSwift's SubscriptionBox holds a weak reference to subscribers, + // so cleanup happens automatically when this object is deallocated. } - public func newState(state: T) { - // All @Published property access must happen on main thread - dispatchOnMain(self) { instance in - guard instance.current != state else { return } - let old = instance.current - if let animation = instance.animation { - withAnimation(animation) { - instance.current = state - } - } else { - instance.current = state + /// Called by ReSwift when state changes. This method is nonisolated because + /// ReSwift may call it from any thread. Updates are dispatched to MainActor + /// via DispatchQueue.main to maintain FIFO ordering. + nonisolated public func newState(state: T) { + dispatchToMainActor(self, state: state) { instance, newState in + instance.applyStateUpdate(newState) + } + } + + /// Applies the state update on MainActor. Separated from newState to keep + /// the dispatch logic clean and enable subclass overrides. + fileprivate func applyStateUpdate(_ state: T) { + guard current != state else { return } + let old = current + if let animation = animation { + withAnimation(animation) { + current = state } - instance.objectDidChange.send(DidChangeSubject(old: old, new: instance.current)) + } else { + current = state } + objectDidChange.send(DidChangeSubject(old: old, new: current)) } public let objectDidChange = PassthroughSubject, Never>() @@ -101,30 +135,36 @@ public class ObservableThrottledState: ObservableState { objectThrottled .throttle(for: .milliseconds(throttleInMs), scheduler: DispatchQueue.main, latest: true) - .sink { [weak self] in self?.current = $0 } + .sink { [weak self] in + guard let self = self else { return } + let old = self.current + self.current = $0 + self.objectDidChange.send(DidChangeSubject(old: old, new: self.current)) + } .store(in: &cancellables) } - override public func newState(state: T) { - // All @Published property access must happen on main thread to avoid crashes - // in swift_retain when accessing Combine's Published wrapper from background threads - dispatchOnMain(self) { instance in - guard instance.current != state else { return } - let old = instance.current - if let animation = instance.animation { - withAnimation(animation) { - instance.objectThrottled.send(state) - } - } else { - instance.objectThrottled.send(state) + nonisolated override public func newState(state: T) { + dispatchToMainActor(self, state: state) { instance, newState in + instance.applyThrottledStateUpdate(newState) + } + } + + fileprivate func applyThrottledStateUpdate(_ state: T) { + guard current != state else { return } + if let animation = animation { + withAnimation(animation) { + objectThrottled.send(state) } - instance.objectDidChange.send(DidChangeSubject(old: old, new: instance.current)) + } else { + objectThrottled.send(state) } } private let objectThrottled = PassthroughSubject() } +@MainActor public class ObservableDerivedState: ObservableObject, StoreSubscriber, ObservableSubscription { @@ -151,43 +191,41 @@ public class ObservableDerivedState: Obse func subscribe() { guard !isSubscribed else { return } - // Capture selector directly to avoid retaining self in the transform closure let selector = self.selector - dispatchOnMain(self) { instance in - guard !instance.isSubscribed else { return } - Context.currentContext.store.subscribe( - instance, transform: { $0.select(selector) }) - instance.isSubscribed = true - } + Context.currentContext.store.subscribe(self, transform: { $0.select(selector) }) + isSubscribed = true } func unsubscribe() { guard isSubscribed else { return } - dispatchOnMain(self) { instance in - guard instance.isSubscribed else { return } - Context.currentContext.store.unsubscribe(instance) - instance.isSubscribed = false - } + Context.currentContext.store.unsubscribe(self) + isSubscribed = false } deinit { - unsubscribe() + // Note: deinit is nonisolated even for @MainActor classes. + // ReSwift's SubscriptionBox holds a weak reference to subscribers, + // so cleanup happens automatically when this object is deallocated. + } + + nonisolated public func newState(state original: Original) { + dispatchToMainActor(self, state: original) { instance, newState in + instance.applyStateUpdate(newState) + } } - public func newState(state original: Original) { - dispatchOnMain(self) { instance in - let old = instance.current - instance.objectWillChange.send(ChangeSubject(old: old, new: instance.current)) - - if let animation = instance.animation { - withAnimation(animation) { - instance.current = instance.transform(original) - } - } else { - instance.current = instance.transform(original) + fileprivate func applyStateUpdate(_ original: Original) { + let old = current + objectWillChange.send(ChangeSubject(old: old, new: current)) + + if let animation = animation { + withAnimation(animation) { + current = transform(original) } - instance.objectDidChange.send(ChangeSubject(old: old, new: instance.current)) + } else { + current = transform(original) } + objectDidChange.send(ChangeSubject(old: old, new: current)) } public let objectWillChange = PassthroughSubject, Never>() @@ -215,22 +253,27 @@ public class ObservableDerivedThrottledState( select selector: @escaping (RowndState) -> (T), animation: SwiftUI.Animation? = nil ) -> ObservableState { ObservableState(select: selector, animation: animation) } + @MainActor public func subscribe( select selector: @escaping (RowndState) -> (Original), transform: @escaping (Original) -> Derived, animation: SwiftUI.Animation? = nil @@ -252,6 +297,7 @@ extension Store where State == RowndState { ObservableDerivedState(select: selector, transform: transform, animation: animation) } + @MainActor public func subscribeThrottled( select selector: @escaping (RowndState) -> (T), throttleInMs: Int = 350, animation: SwiftUI.Animation? = nil @@ -259,6 +305,7 @@ extension Store where State == RowndState { ObservableThrottledState(select: selector, animation: animation, throttleInMs: throttleInMs) } + @MainActor public func subscribeThrottled( select selector: @escaping (RowndState) -> (Original), transform: @escaping (Original) -> Derived, throttleInMs: Int = 350, diff --git a/Sources/Rownd/Rownd.swift b/Sources/Rownd/Rownd.swift index 719bef2..4196afe 100644 --- a/Sources/Rownd/Rownd.swift +++ b/Sources/Rownd/Rownd.swift @@ -95,10 +95,10 @@ public class Rownd: NSObject { store.dispatch(UserData.fetch()) store.dispatch(PasskeyData.fetchPasskeyRegistration()) } - } - InstantUsers(context: Context.currentContext) - .tmpForceInstantUserConversionIfRequested() + InstantUsers(context: Context.currentContext) + .tmpForceInstantUserConversionIfRequested() + } return state } diff --git a/Sources/Rownd/framework/InstantUsers.swift b/Sources/Rownd/framework/InstantUsers.swift index f2c0fd8..63ca434 100644 --- a/Sources/Rownd/framework/InstantUsers.swift +++ b/Sources/Rownd/framework/InstantUsers.swift @@ -7,6 +7,7 @@ import Combine +@MainActor class InstantUsers { private let context: Context private var cancellables = Set() diff --git a/Sources/Rownd/framework/RowndEvent.swift b/Sources/Rownd/framework/RowndEvent.swift index b1c83ba..87be59b 100644 --- a/Sources/Rownd/framework/RowndEvent.swift +++ b/Sources/Rownd/framework/RowndEvent.swift @@ -30,23 +30,36 @@ public protocol RowndEventHandlerDelegate: AnyObject { func handleRowndEvent(_ event: RowndEvent) } +@MainActor class RowndEventEmitter { static private var cancellables = Set() static func emit(_ event: RowndEvent) { if event.event == .signInCompleted { + // Check if the access token is already valid — if so, fire immediately + // to avoid a race where the Combine subscription misses a value that's + // already settled before the sink is attached. + let authState = Context.currentContext.store.state.auth + if authState.isAccessTokenValid { + Self.notifyListeners(event) + return + } + + // Token not yet valid — subscribe and wait for it let subscription = Context.currentContext.store.subscribe { $0.auth.isAccessTokenValid } subscription.$current.sink { isAccessTokenValid in if isAccessTokenValid { subscription.unsubscribe() - Context.currentContext.eventListeners.forEach { listener in - listener.handleRowndEvent(event) - } + Self.notifyListeners(event) } }.store(in: &Self.cancellables) } else { - Context.currentContext.eventListeners.forEach { listener in - listener.handleRowndEvent(event) - } + Self.notifyListeners(event) + } + } + + private static func notifyListeners(_ event: RowndEvent) { + Context.currentContext.eventListeners.forEach { listener in + listener.handleRowndEvent(event) } } } diff --git a/Tests/RowndTests/ObservableStateTests.swift b/Tests/RowndTests/ObservableStateTests.swift index 8b8466b..121a02d 100644 --- a/Tests/RowndTests/ObservableStateTests.swift +++ b/Tests/RowndTests/ObservableStateTests.swift @@ -14,6 +14,7 @@ import Testing @testable import Rownd +@MainActor struct ObservableStateTests { /// Tests that ObservableState can handle newState being called from background threads. diff --git a/Tests/RowndTests/SubscriberMutationTests.swift b/Tests/RowndTests/SubscriberMutationTests.swift index 186c479..ccff0c9 100644 --- a/Tests/RowndTests/SubscriberMutationTests.swift +++ b/Tests/RowndTests/SubscriberMutationTests.swift @@ -5,6 +5,7 @@ import Testing @testable import Rownd +@MainActor struct SubscriberMutationTests { @Test func rapidClockSyncAndObserverChurnDoesNotCrash() async throws {