swiftasync-awaitcombine

Collecting Publisher values with async


I've been writing some unit tests of some Combine code we have. I've run in to some issues. I think I've simplified the various pieces in to this test. NB: This isn't a test – it's me trying to understand why one of the tests doesn't work!

func test_collectingPassthroughValues() async throws {
    // In the real test this is injected in to the unit under test.
    let subject = PassthroughSubject<Int, Never>()

    // I'm expecting this to only complete once `subject` finishes. I've used
    // `async let` so I can poke some data through `subject` and then later on
    // `await collectValues` to hopefully get back the stuff published by 
    // `subject`. In the real test this is a property from the unit under test
    // which runs various operators on `subject`.
    async let collectValues = await subject.values.reduce(into: []) { $0.append($1) }

    // Send some data through `subject` and then `.finish` it.
    subject.send(10)
    subject.send(20)
    subject.send(completion: .finished)

    // Await the values so we can check we got what's expected.
    let values = await collectValues

    // This fails…
    XCTAssertEqual(values, [10, 20])
}

The assertion fails with:

est_collectingPassthroughValues(): XCTAssertTrue failed - Found difference for 
Different count:
 |  Received: (0) []
 |  Expected: (2) [10, 20]

So subject.values seems to get nothing at all; I don't know why?

Thanks!


Solution

  • What is happening is fairly straightforward. How to write it correctly is much less clear, and my recommendation is "don't do this."

    First, a minor issue that isn't the problem:

    async let collectValues = await subject.values.reduce(into: []) { $0.append($1) }
    

    You shouldn't use await here. That probably would be a problem if there weren't other problems.

    The fundamental problem is that PassthroughSubject drops messages if there's no subscriber. In your current code, that's absolutely going to happen, but it's also really hard to fix.

    // Taking out the extra `await`
    async let collectValues = subject.values.reduce(into: []) { $0.append($1) }
    
    // That line is pretty close to:
        let collectValues = Task {
            var values: [Int] = []
            for await value in subject.values {
                values.append(value)
            }
            return values
        }
    

    The problem is that this kicks off a task that may not start immediately. So your next line of code, subject.send(10) has no subscriber (it hasn't even gotten to the for-await line), and it's just thrown away.

    You can kind of fix it by throwing in a try await Task.sleep(for: .seconds(1)) after creating the Task, but it doesn't help much. There's no buffering on PassthroughSubject. While you're calling append, there is nothing listening. The value will be thrown away and you'll drop the 20.

    You can improve things by buffering, but you'll still need to sleep (which is unacceptable IMO). The following, nonetheless, is very reliable for me:

    func test_collectingPassthroughValues() async throws {
        // In the real test this is injected in to the unit under test.
        let subject = PassthroughSubject<Int, Never>()
        let readSubject = subject.buffer(size: 10, prefetch: .keepFull, whenFull: .dropOldest)
    
        async let collectValues = readSubject.values.reduce(into: []) { $0.append($1) }
    
        try await Task.sleep(for: .seconds(1))
        subject.send(10)
        subject.send(20)
        subject.send(completion: .finished)
    
        // Await the values so we can check we got what's expected.
        let values = await collectValues
    
        XCTAssertEqual(values, [10, 20])
    }
    

    But IMO, this is a completely broken approach.

    I would not try to mix PassthroughSubject with .values. I just don't see any way to make it robust. More broadly, I recommend being very careful mixing Combine and Structured Concurrency. They tend to have very different ideas about how things are supposed to work.