Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 128 additions & 81 deletions Sources/Rownd/Models/Context/ReSwiftObserver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,49 @@ import Foundation
import ReSwift
import SwiftUI

// MARK: - Main Thread Dispatch Helper

/// Helper to centralize main-thread dispatch with weak self handling.
/// Reduces duplication and ensures consistent patterns across observable state types.
private func dispatchOnMain<T: AnyObject>(_ instance: T, execute work: @escaping (T) -> Void) {
// MARK: - Main Actor Dispatch Helper

/// Dispatches work to the MainActor from a nonisolated context.
///
/// Uses `DispatchQueue.main.async` to preserve FIFO ordering of state updates,
/// then hops into a `@MainActor` Task for proper isolation. This prevents the
/// ordering issues that can occur with unstructured Task spawning under
/// high-frequency state changes.
///
/// - Parameters:
/// - instance: The object to operate on (captured weakly)
/// - state: The state value to process
/// - work: The MainActor-isolated work to perform
private func dispatchToMainActor<T: AnyObject, S>(
_ instance: T,
state: S,
work: @escaping @MainActor (T, S) -> Void
) {
// Use DispatchQueue.main.async for FIFO ordering, then Task for @MainActor isolation
DispatchQueue.main.async { [weak instance] in
guard let instance = instance else { return }
work(instance)
Task { @MainActor in
work(instance, state)
}
}
}

// MARK: - ObservableState

/// Observable wrapper for ReSwift state slices that publishes changes to SwiftUI.
/// Uses @MainActor to ensure all @Published property access is thread-safe.
///
/// ## Thread Safety
/// ReSwift may call `newState(state:)` from any thread. This class uses @MainActor
/// isolation to ensure all @Published property access occurs on the main thread,
/// preventing crashes in swift_retain when accessing Combine's Published wrapper.
///
/// ## State Update Ordering
/// State updates are dispatched through DispatchQueue.main.async to maintain FIFO ordering,
/// then processed on the MainActor. While this preserves ordering of dispatch calls,
/// the actual property updates occur asynchronously. For most SwiftUI use cases this is
/// acceptable since SwiftUI will render the final state.
@MainActor
public class ObservableState<T: Hashable>: ObservableObject, StoreSubscriber, ObservableSubscription
{

Expand All @@ -42,43 +74,45 @@ public class ObservableState<T: Hashable>: ObservableObject, StoreSubscriber, Ob

public func subscribe() {
guard !isSubscribed else { return }
// Capture selector directly to avoid retaining self in the transform closure
let selector = self.selector
dispatchOnMain(self) { instance in
guard !instance.isSubscribed else { return }
Context.currentContext.store.subscribe(
instance, transform: { $0.select(selector) })
instance.isSubscribed = true
}
Context.currentContext.store.subscribe(self, transform: { $0.select(selector) })
isSubscribed = true
}

func unsubscribe() {
guard isSubscribed else { return }
dispatchOnMain(self) { instance in
guard instance.isSubscribed else { return }
Context.currentContext.store.unsubscribe(instance)
instance.isSubscribed = false
}
Context.currentContext.store.unsubscribe(self)
isSubscribed = false
}

deinit {
unsubscribe()
// Note: deinit is nonisolated even for @MainActor classes.
// ReSwift's SubscriptionBox holds a weak reference to subscribers,
// so cleanup happens automatically when this object is deallocated.
}

public func newState(state: T) {
// All @Published property access must happen on main thread
dispatchOnMain(self) { instance in
guard instance.current != state else { return }
let old = instance.current
if let animation = instance.animation {
withAnimation(animation) {
instance.current = state
}
} else {
instance.current = state
/// Called by ReSwift when state changes. This method is nonisolated because
/// ReSwift may call it from any thread. Updates are dispatched to MainActor
/// via DispatchQueue.main to maintain FIFO ordering.
nonisolated public func newState(state: T) {
dispatchToMainActor(self, state: state) { instance, newState in
instance.applyStateUpdate(newState)
}
}

/// Applies the state update on MainActor. Separated from newState to keep
/// the dispatch logic clean and enable subclass overrides.
fileprivate func applyStateUpdate(_ state: T) {
guard current != state else { return }
let old = current
if let animation = animation {
withAnimation(animation) {
current = state
}
instance.objectDidChange.send(DidChangeSubject(old: old, new: instance.current))
} else {
current = state
}
objectDidChange.send(DidChangeSubject(old: old, new: current))
}

public let objectDidChange = PassthroughSubject<DidChangeSubject<T>, Never>()
Expand All @@ -101,30 +135,36 @@ public class ObservableThrottledState<T: Hashable>: ObservableState<T> {

objectThrottled
.throttle(for: .milliseconds(throttleInMs), scheduler: DispatchQueue.main, latest: true)
.sink { [weak self] in self?.current = $0 }
.sink { [weak self] in
guard let self = self else { return }
let old = self.current
self.current = $0
self.objectDidChange.send(DidChangeSubject(old: old, new: self.current))
}
.store(in: &cancellables)
}

override public func newState(state: T) {
// All @Published property access must happen on main thread to avoid crashes
// in swift_retain when accessing Combine's Published wrapper from background threads
dispatchOnMain(self) { instance in
guard instance.current != state else { return }
let old = instance.current
if let animation = instance.animation {
withAnimation(animation) {
instance.objectThrottled.send(state)
}
} else {
instance.objectThrottled.send(state)
nonisolated override public func newState(state: T) {
dispatchToMainActor(self, state: state) { instance, newState in
instance.applyThrottledStateUpdate(newState)
}
}

fileprivate func applyThrottledStateUpdate(_ state: T) {
guard current != state else { return }
if let animation = animation {
withAnimation(animation) {
objectThrottled.send(state)
}
instance.objectDidChange.send(DidChangeSubject(old: old, new: instance.current))
} else {
objectThrottled.send(state)
}
}

private let objectThrottled = PassthroughSubject<T, Never>()
}

@MainActor
public class ObservableDerivedState<Original: Hashable, Derived: Hashable>: ObservableObject,
StoreSubscriber, ObservableSubscription
{
Expand All @@ -151,43 +191,41 @@ public class ObservableDerivedState<Original: Hashable, Derived: Hashable>: Obse

func subscribe() {
guard !isSubscribed else { return }
// Capture selector directly to avoid retaining self in the transform closure
let selector = self.selector
dispatchOnMain(self) { instance in
guard !instance.isSubscribed else { return }
Context.currentContext.store.subscribe(
instance, transform: { $0.select(selector) })
instance.isSubscribed = true
}
Context.currentContext.store.subscribe(self, transform: { $0.select(selector) })
isSubscribed = true
}

func unsubscribe() {
guard isSubscribed else { return }
dispatchOnMain(self) { instance in
guard instance.isSubscribed else { return }
Context.currentContext.store.unsubscribe(instance)
instance.isSubscribed = false
}
Context.currentContext.store.unsubscribe(self)
isSubscribed = false
}

deinit {
unsubscribe()
// Note: deinit is nonisolated even for @MainActor classes.
// ReSwift's SubscriptionBox holds a weak reference to subscribers,
// so cleanup happens automatically when this object is deallocated.
}

nonisolated public func newState(state original: Original) {
dispatchToMainActor(self, state: original) { instance, newState in
instance.applyStateUpdate(newState)
}
}

public func newState(state original: Original) {
dispatchOnMain(self) { instance in
let old = instance.current
instance.objectWillChange.send(ChangeSubject(old: old, new: instance.current))

if let animation = instance.animation {
withAnimation(animation) {
instance.current = instance.transform(original)
}
} else {
instance.current = instance.transform(original)
fileprivate func applyStateUpdate(_ original: Original) {
let old = current
objectWillChange.send(ChangeSubject(old: old, new: current))

if let animation = animation {
withAnimation(animation) {
current = transform(original)
}
instance.objectDidChange.send(ChangeSubject(old: old, new: instance.current))
} else {
current = transform(original)
}
objectDidChange.send(ChangeSubject(old: old, new: current))
}

public let objectWillChange = PassthroughSubject<ChangeSubject<Derived>, Never>()
Expand Down Expand Up @@ -215,22 +253,27 @@ public class ObservableDerivedThrottledState<Original: Hashable, Derived: Hashab
objectThrottled
.throttle(for: .milliseconds(throttleInMs), scheduler: DispatchQueue.main, latest: true)
.sink { [weak self] in
self?.current = transform($0)
guard let self = self else { return }
let old = self.current
self.current = transform($0)
self.objectDidChange.send(ChangeSubject(old: old, new: self.current))
}
.store(in: &cancellables)
}

override public func newState(state original: Original) {
dispatchOnMain(self) { instance in
let old = instance.current
if let animation = instance.animation {
withAnimation(animation) {
instance.objectThrottled.send(original)
}
} else {
instance.objectThrottled.send(original)
nonisolated override public func newState(state original: Original) {
dispatchToMainActor(self, state: original) { instance, newState in
instance.applyThrottledStateUpdate(newState)
}
}

fileprivate func applyThrottledStateUpdate(_ original: Original) {
if let animation = animation {
withAnimation(animation) {
objectThrottled.send(original)
}
instance.objectDidChange.send(ChangeSubject(old: old, new: instance.current))
} else {
objectThrottled.send(original)
}
}

Expand All @@ -239,26 +282,30 @@ public class ObservableDerivedThrottledState<Original: Hashable, Derived: Hashab

extension Store where State == RowndState {

@MainActor
public func subscribe<T>(
select selector: @escaping (RowndState) -> (T), animation: SwiftUI.Animation? = nil
) -> ObservableState<T> {
ObservableState(select: selector, animation: animation)
}

@MainActor
public func subscribe<Original, Derived>(
select selector: @escaping (RowndState) -> (Original),
transform: @escaping (Original) -> Derived, animation: SwiftUI.Animation? = nil
) -> ObservableDerivedState<Original, Derived> {
ObservableDerivedState(select: selector, transform: transform, animation: animation)
}

@MainActor
public func subscribeThrottled<T>(
select selector: @escaping (RowndState) -> (T), throttleInMs: Int = 350,
animation: SwiftUI.Animation? = nil
) -> ObservableThrottledState<T> {
ObservableThrottledState(select: selector, animation: animation, throttleInMs: throttleInMs)
}

@MainActor
public func subscribeThrottled<Original, Derived>(
select selector: @escaping (RowndState) -> (Original),
transform: @escaping (Original) -> Derived, throttleInMs: Int = 350,
Expand Down
6 changes: 3 additions & 3 deletions Sources/Rownd/Rownd.swift
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ public class Rownd: NSObject {
store.dispatch(UserData.fetch())
store.dispatch(PasskeyData.fetchPasskeyRegistration())
}
}

InstantUsers(context: Context.currentContext)
Comment thread
mhamann marked this conversation as resolved.
.tmpForceInstantUserConversionIfRequested()
InstantUsers(context: Context.currentContext)
.tmpForceInstantUserConversionIfRequested()
}

return state
Comment thread
mhamann marked this conversation as resolved.
}
Expand Down
1 change: 1 addition & 0 deletions Sources/Rownd/framework/InstantUsers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import Combine

@MainActor
class InstantUsers {
private let context: Context
private var cancellables = Set<AnyCancellable>()
Expand Down
25 changes: 19 additions & 6 deletions Sources/Rownd/framework/RowndEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,36 @@ public protocol RowndEventHandlerDelegate: AnyObject {
func handleRowndEvent(_ event: RowndEvent)
}

@MainActor
class RowndEventEmitter {
static private var cancellables = Set<AnyCancellable>()
static func emit(_ event: RowndEvent) {
if event.event == .signInCompleted {
// Check if the access token is already valid — if so, fire immediately
// to avoid a race where the Combine subscription misses a value that's
// already settled before the sink is attached.
let authState = Context.currentContext.store.state.auth
if authState.isAccessTokenValid {
Self.notifyListeners(event)
return
}

// Token not yet valid — subscribe and wait for it
let subscription = Context.currentContext.store.subscribe { $0.auth.isAccessTokenValid }
subscription.$current.sink { isAccessTokenValid in
if isAccessTokenValid {
subscription.unsubscribe()
Context.currentContext.eventListeners.forEach { listener in
listener.handleRowndEvent(event)
}
Self.notifyListeners(event)
}
}.store(in: &Self.cancellables)
} else {
Context.currentContext.eventListeners.forEach { listener in
listener.handleRowndEvent(event)
}
Self.notifyListeners(event)
}
}

private static func notifyListeners(_ event: RowndEvent) {
Context.currentContext.eventListeners.forEach { listener in
listener.handleRowndEvent(event)
}
}
}
1 change: 1 addition & 0 deletions Tests/RowndTests/ObservableStateTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import Testing

@testable import Rownd

@MainActor
struct ObservableStateTests {

/// Tests that ObservableState can handle newState being called from background threads.
Expand Down
1 change: 1 addition & 0 deletions Tests/RowndTests/SubscriberMutationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import Testing

@testable import Rownd

@MainActor
struct SubscriberMutationTests {
@Test
func rapidClockSyncAndObserverChurnDoesNotCrash() async throws {
Expand Down
Loading