swiftconcurrencytaskcombineproperty-wrapper-published

Swift asynchronous sequence missing updates from @Published property


I’m encountering an issue in my app where a test occasionally fails. I’ve managed to narrow the problem down to this code snippet. The snippet contains two asynchronous sequences that both observe and modify the same @Published property, fire.

My expectation is that the asynchronous sequences will receive all updates from the @Published property, ensuring the test always succeeds. However, after a certain number of test runs (around < 100,000), the test hangs and never completes.

At first, I suspected a data race issue, but the problem persists even after annotating the class with @MainActor, to ensure that the whole class runs on the same thread.

On successful tests the console prints:

createStream1(): true
createStream2(): true
createStream1(): false

On unsuccessful tests the console prints:

createStream1(): true
createStream2(): true
createStream1(): true

The Code snippet:

final class Tests: XCTestCase {
    @MainActor func test_SUTStream1() async {
        await SUT().toTest()
    }
}

@MainActor
class SUT {
    @Published private(set) var fire = false

    init() {}

    func toTest() async {
        let cancelable = Task {
            await createStream2()
        }
        await createStream1()
        cancelable.cancel()
    }

    private func createStream1() async {
        fire = true

        for await didFire in $fire.values {
            print("\(#function): \(didFire)")
            if didFire == false {
                break
            }
        }
    }

    private func createStream2() async {
        for await didFire in $fire.values {
            print("\(#function): \(didFire)")
            if didFire == true {
                fire = false
                break
            }
        }
    }
}

Solution

  • There are two issues:

    1. The problem is that values does not buffer its values in the presence of back-pressure. So, if subsequent values are published before an existing one has been consumed, the previous value will be dropped.

    2. And even when isolating these to the same actor, there is a race between the continuations. As SE-0306 – Actors warns us (emphasis from original):

      Implementation note: At an implementation level, the messages are partial tasks (described by the Structured Concurrency proposal) for the asynchronous call, and each actor instance contains its own serial executor (also in the Structured Concurrency proposal). The default serial executor is responsible for running the partial tasks one-at-a-time. This is conceptually similar to a serial DispatchQueue, but with an important difference: tasks awaiting an actor are not guaranteed to be run in the same order they originally awaited that actor. Swift’s runtime system aims to avoid priority inversions whenever possible, using techniques like priority escalation. Thus, the runtime system considers a task’s priority when selecting the next task to run on the actor from its queue. This is in contrast with a serial DispatchQueue, which are strictly first-in-first-out. In addition, Swift’s actor runtime uses a lighter-weight queue implementation than Dispatch to take full advantage of Swift’s async functions.

      So, in general, tasks (and continuations) of the same priority tend to run FIFO, but not strictly so: You may occasionally see partial tasks not honor FIFO order.

    The combination of these two factors mean that you may occasionally drop values.


    To focus on the first point, consider the following, in which I have added delays (to mitigate the race of my second point). This will consistently manifest the behavior:

    @MainActor
    class Foo {
        @Published private(set) var value = 0
    
        init() {}
    
        func experiment() async throws {
            let publishTask = Task {
                for i in 0 ..< 10 {
                    value = i
                    try await Task.sleep(for: .milliseconds(300))
                }
            }
            
            let consumeTask = Task {
                for await value in $value.values {
                    print(value)
                    if value == 9 { break }
                    try await Task.sleep(for: .seconds(2))
                }
            }
            
            try await withTaskCancellationHandler {
                try await publishTask.value
                try await consumeTask.value
            } onCancel: { 
                publishTask.cancel()
                consumeTask.cancel()
            }
            
            print("done")
        }
    }
    

    Calling experiment, that will produce:

    0
    6
    9
    

    If you do not want it to drop values, you can add a buffer to the consumeTask:

    let consumeTask = Task {
        let sequence = $value
            .buffer(size: 10, prefetch: .keepFull, whenFull: .dropOldest)
            .values
    
        for await value in sequence {
            print(value)
            if value == 9 { break }
            try await Task.sleep(for: .seconds(2))
        }
    }
    

    That will capture all the values (up to the buffer size, of course).

    There are other patterns that one can employ to support different back-pressure behaviors, but hopefully this illustrates the fundamental issue.