I try to figure out is this approach thread safe if getStream() and update(value: ...) will be called on difference thread simultaneously?
final class SomeNotifier {
static let shared = SomeNotifier()
private let value = PassthroughSubject<String, Never>()
private var cancellables: Set<AnyCancellable> = []
private init() {}
func getStream() -> AsyncStream<String> {
return AsyncStream { [weak self] continuation in
guard let self = self else { return }
self.value.sink { completion in
switch completion {
case .finished:
continuation.finish()
case .failure:
continuation.finish()
}
} receiveValue: { value in
continuation.yield(value)
}
.store(in: &cancellables)
}
}
func update(value: String) {
self.value.send(value)
}
I want to have some repository that can notify different observers about change of internal state
No, this is not thread safe, for two reasons:
The shared
instance is not concurrency-safe because it represents an unsynchronized mutable state. If you change the “Strict Concurrency Checking” build setting to “Complete”, you will receive a warning this this effect:
Static property 'shared' is not concurrency-safe because it is not either conforming to 'Sendable' or isolated to a global actor; this is an error in Swift 6
Furthermore, the cancellables
is a Set
which is not thread-safe. You need to synchronize your access to it.
Below I outline how to wrap a Publisher
in an AsyncStream
. This is no longer necessary as of iOS 15, macOS 12, etc. Publisher
now exposes an asynchronous sequence, values
, that you can use directly with any publisher (including a Subject
):
for await value in publisher.values {
…
}
This is the modern alternative to manually wrapping a Publisher
with an AsyncStream
, shown below.
In a variation of Cy-4AH’s answer (subsequently deleted), I would suggest using an actor
for synchronization. I would also add an onTermination
handler to remove the associated continuation if the asynchronous sequence was canceled. E.g.:
actor Notifier<Element: Sendable> {
typealias CancellableIdentifier = UUID
private let valuesPublisher = PassthroughSubject<Element, Never>()
private var cancellables: [CancellableIdentifier: AnyCancellable] = [:]
func values() -> AsyncStream<Element> {
.init { [valuesPublisher] continuation in
let id = CancellableIdentifier()
cancellables[id] = valuesPublisher.sink { completion in
continuation.finish()
} receiveValue: { value in
continuation.yield(value)
}
continuation.onTermination = { _ in
Task { [weak self] in
await self?.removeCancellable(id: id)
}
}
}
}
func send(_ element: Element) {
valuesPublisher.send(element)
}
}
private extension Notifier {
func removeCancellable(id: UUID) {
cancellables.removeValue(forKey: id)
}
}
There are tons of variations on the theme, but the details of the implementation matter less than the general observations of (a) the use of an actor
; and (b) the use of the onTermination
handler to clean up in case the notifier object might outlive the individual sequences.
FWIW, if I really wanted to create a singleton for String
notifications:
actor StringNotifier {
static let shared = Notifier<String>()
private init() {}
}
As an aside, the Swift concurrency alternative to a Combine Subject
is an AsyncChannel
. But, a channel does not allow multiple observers, so you might have a collection of these channels:
actor Notifier<Element: Sendable> {
typealias ChannelIdentifier = UUID
private var channels: [ChannelIdentifier: AsyncChannel<Element>] = [:]
func values() -> AsyncStream<Element> {
.init { continuation in
let channel = AsyncChannel<Element>()
let id = ChannelIdentifier()
channels[id] = channel
let task = Task {
for await value in channel {
continuation.yield(value)
}
continuation.finish()
}
continuation.onTermination = { state in
if case .cancelled = state { task.cancel() }
Task { [weak self] in
await self?.removeChannel(id: id)
}
}
}
}
func send(_ element: Element) async {
await withDiscardingTaskGroup { group in
for channel in channels.values {
group.addTask { await channel.send(element) }
}
}
}
}
private extension Notifier {
func removeChannel(id: ChannelIdentifier) {
channels(forKey: id)
}
}