swiftcombineswift-concurrency

Thread safe combine publisher to AsyncStream


I try to figure out is this approach thread safe if getStream() and update(value: ...) will be called on difference thread simultaneously?

final class SomeNotifier {

static let shared = SomeNotifier()

private let value = PassthroughSubject<String, Never>()
private var cancellables: Set<AnyCancellable> = []

private init() {}

func getStream() -> AsyncStream<String> {
    return AsyncStream { [weak self] continuation in
        guard let self = self else { return }

        self.value.sink { completion in
            switch completion {
            case .finished:
                continuation.finish()
            case .failure:
                continuation.finish()
            }
        } receiveValue: { value in
            continuation.yield(value)
        }
        .store(in: &cancellables)
    }
}

func update(value: String) {
    self.value.send(value)
}

I want to have some repository that can notify different observers about change of internal state


Solution

  • No, this is not thread safe, for two reasons:

    1. The shared instance is not concurrency-safe because it represents an unsynchronized mutable state. If you change the “Strict Concurrency Checking” build setting to “Complete”, you will receive a warning this this effect:

      Static property 'shared' is not concurrency-safe because it is not either conforming to 'Sendable' or isolated to a global actor; this is an error in Swift 6

    2. Furthermore, the cancellables is a Set which is not thread-safe. You need to synchronize your access to it.


    Below I outline how to wrap a Publisher in an AsyncStream. This is no longer necessary as of iOS 15, macOS 12, etc. Publisher now exposes an asynchronous sequence, values, that you can use directly with any publisher (including a Subject):

    for await value in publisher.values {
        …
    }
    

    This is the modern alternative to manually wrapping a Publisher with an AsyncStream, shown below.


    In a variation of Cy-4AH’s answer (subsequently deleted), I would suggest using an actor for synchronization. I would also add an onTermination handler to remove the associated continuation if the asynchronous sequence was canceled. E.g.:

    actor Notifier<Element: Sendable> {
        typealias CancellableIdentifier = UUID
    
        private let valuesPublisher = PassthroughSubject<Element, Never>()
        private var cancellables: [CancellableIdentifier: AnyCancellable] = [:]
    
        func values() -> AsyncStream<Element> {
            .init { [valuesPublisher] continuation in
                let id = CancellableIdentifier()
    
                cancellables[id] = valuesPublisher.sink { completion in
                    continuation.finish()
                } receiveValue: { value in
                    continuation.yield(value)
                }
    
                continuation.onTermination = { _ in
                    Task { [weak self] in
                        await self?.removeCancellable(id: id)
                    }
                }
            }
        }
    
        func send(_ element: Element) {
            valuesPublisher.send(element)
        }
    }
    
    private extension Notifier {
        func removeCancellable(id: UUID) {
            cancellables.removeValue(forKey: id)
        }
    }
    

    There are tons of variations on the theme, but the details of the implementation matter less than the general observations of (a) the use of an actor; and (b) the use of the onTermination handler to clean up in case the notifier object might outlive the individual sequences.


    FWIW, if I really wanted to create a singleton for String notifications:

    actor StringNotifier {
        static let shared = Notifier<String>()
        private init() {}
    }
    

    As an aside, the Swift concurrency alternative to a Combine Subject is an AsyncChannel. But, a channel does not allow multiple observers, so you might have a collection of these channels:

    actor Notifier<Element: Sendable> {
        typealias ChannelIdentifier = UUID
    
        private var channels: [ChannelIdentifier: AsyncChannel<Element>] = [:]
    
        func values() -> AsyncStream<Element> {
            .init { continuation in
                let channel = AsyncChannel<Element>()
                let id = ChannelIdentifier()
                channels[id] = channel
    
                let task = Task {
                    for await value in channel {
                        continuation.yield(value)
                    }
                    continuation.finish()
                }
    
                continuation.onTermination = { state in
                    if case .cancelled = state { task.cancel() }
    
                    Task { [weak self] in
                        await self?.removeChannel(id: id)
                    }
                }
            }
        }
    
        func send(_ element: Element) async {
            await withDiscardingTaskGroup { group in
                for channel in channels.values {
                    group.addTask { await channel.send(element) }
                }
            }
        }
    }
    
    private extension Notifier {
        func removeChannel(id: ChannelIdentifier) {
            channels(forKey: id)
        }
    }