swiftcombineswift-concurrency

Thread safe combine publisher to AsyncStream


I try to figure out is this approach thread safe if getStream() and update(value: ...) will be called on difference thread simultaneously?

final class SomeNotifier {

static let shared = SomeNotifier()

private let value = PassthroughSubject<String, Never>()
private var cancellables: Set<AnyCancellable> = []

private init() {}

func getStream() -> AsyncStream<String> {
    return AsyncStream { [weak self] continuation in
        guard let self = self else { return }

        self.value.sink { completion in
            switch completion {
            case .finished:
                continuation.finish()
            case .failure:
                continuation.finish()
            }
        } receiveValue: { value in
            continuation.yield(value)
        }
        .store(in: &cancellables)
    }
}

func update(value: String) {
    self.value.send(value)
}

I want to have some repository that can notify different observers about change of internal state


Solution

  • No, this is not thread safe, for two reasons:

    1. The shared instance is not concurrency-safe because it represents an unsynchronized mutable state. If you change the “Strict Concurrency Checking” build setting to “Complete”, you will receive a warning this this effect:

      Static property 'shared' is not concurrency-safe because it is not either conforming to 'Sendable' or isolated to a global actor; this is an error in Swift 6

    2. Furthermore, the cancellables is a Set which is not thread-safe. You need to synchronize your access to it.


    In a variation of Cy-4AH’s answer (subsequently deleted), I would suggest using an actor for synchronization. I would also add an onTermination handler to remove the associated continuation if the asynchronous sequence was canceled. E.g.:

    actor Notifier<Element: Sendable> {
        typealias CancellableIdentifier = UUID
    
        private let valuesPublisher = PassthroughSubject<Element, Never>()
        private var cancellables: [CancellableIdentifier: AnyCancellable] = [:]
    
        func values() -> AsyncStream<Element> {
            .init { [valuesPublisher] continuation in
                let id = CancellableIdentifier()
    
                cancellables[id] = valuesPublisher.sink { completion in
                    continuation.finish()
                } receiveValue: { value in
                    continuation.yield(value)
                }
    
                continuation.onTermination = { _ in
                    Task { [weak self] in
                        await self?.removeCancellable(id: id)
                    }
                }
            }
        }
    
        func send(_ element: Element) {
            valuesPublisher.send(element)
        }
    }
    
    private extension Notifier {
        func removeCancellable(id: UUID) {
            cancellables.removeValue(forKey: id)
        }
    }
    

    There are tons of variations on the theme, but the details of the implementation matter less than the general observations of (a) the use of an actor; and (b) the use of the onTermination handler to clean up in case the notifier object might outlive the individual sequences.


    FWIW, if I really wanted to create a singleton for String notifications:

    actor StringNotifier {
        static let shared = Notifier<String>()
        private init() {}
    }
    

    As an aside, the Swift concurrency alternative to a Combine Subject is an AsyncChannel. But, a channel does not allow multiple observers, so you might have a collection of these channels:

    actor Notifier<Element: Sendable> {
        typealias ChannelIdentifier = UUID
    
        private var channels: [ChannelIdentifier: AsyncChannel<Element>] = [:]
    
        func values() -> AsyncStream<Element> {
            .init { continuation in
                let channel = AsyncChannel<Element>()
                let id = ChannelIdentifier()
                channels[id] = channel
    
                let task = Task {
                    for await value in channel {
                        continuation.yield(value)
                    }
                    continuation.finish()
                }
    
                continuation.onTermination = { state in
                    if case .cancelled = state { task.cancel() }
    
                    Task { [weak self] in
                        await self?.removeChannel(id: id)
                    }
                }
            }
        }
    
        func send(_ element: Element) async {
            await withDiscardingTaskGroup { group in
                for channel in channels.values {
                    group.addTask { await channel.send(element) }
                }
            }
        }
    }
    
    private extension Notifier {
        func removeChannel(id: ChannelIdentifier) {
            channels(forKey: id)
        }
    }