swiftthread-safetycombinesubject

Combine Subjects, CurrentValueSubject or PassThroughSubject, lose values


I have been using Combine Subjects to convert data streams into Publishers, but I have encountered an inconsistency and am seeking insight into the issue.

It appears obtaining deterministic behavior in Subjects can be challenging due to the implementation of Combine. The code I have written in a playground (Swift 5.7) does not produce the expected outcome of receiving all values, [0, 1, 2, 3, 4], after calling .sink(receiveCompletion:receiveValue:).

import Combine
var cancellables = Set<AnyCancellable>()

func subject() -> CurrentValueSubject<Int, Never>
{
    let subject = CurrentValueSubject<Int, Never>(0)
    Task
    {
        subject.value = 1
        subject.value = 2
        subject.value = 3
        subject.value = 4
        subject.send(completion: .finished)
    }
    return subject
}

subject()
    .buffer(size: 10, prefetch: .keepFull, whenFull: .dropOldest)
    .receive(on: DispatchQueue.main)
    .sink(
        receiveCompletion: { _ in print("Subscription completed.") },
        receiveValue: { value in print("💚 Received value: \(value)") }
    ).store(in: &cancellables)

We can get it to work by wrapping the value changes, or sends when using a PassThroughSubject, inside a DispatchQueue.main.async {} or Task.detached { @MainActor in }.

Matt Gallagher discusses a lack of thread safety guarantees in Combine, and this simple example of a function returning a Subject while sending values within it aptly demonstrates the issue at hand.

How can we use Subjects to get consistent results without customizing and individually testing each use case? Using a .buffer or .receive(on:) at subscription has some effect, but the real problem is having consistent threading at the send site.


Solution

  • I do not believe this is fully solvable with PassThroughSubject or CurrentValueSubject. In both cases, if they do not have listener, they drop their values. For a related discussion, see Collecting Publisher values with async. .buffer with .keepFull is not sufficient. Something has to actually call .sink to create demand.

    You must make sure that there is a listener attached before you start calling send. Generally this means keeping everything on one thread (generally the main one). There just isn't a good tool for synchronizing cross-thread operations in Combine.

    The feature you want is called "back pressure," and adding back pressure to "a thing like Subject" is exactly what AsyncChannel is designed to do. That of course is a big change (moving to Structured Concurrency rather than Combine), but I've found AsyncChannel extremely robust for these kinds of problems.