swiftasync-awaitconcurrencytaskasyncstream

Merge 2 AsyncThrowingStream while keeping sorting


Context

I have 2 AsyncThrowingStream(s) which push (respectively) the first 5 even (and odd) non negative integers.

var evens = stream(start: 0) // 0, 2, 4, 6, 8
var odds = stream(start: 1) // 1, 3, 5, 7, 9

Every value is pushed after 1...3 seconds.

Concurrency

Since the 2 streams need some time to push the next value, I want to run them in concurrency.

E.g.

Task {
    let evens = stream(start: 0)
    for try await even in evens {
        
    }
}
Task {
    let odds = stream(start: 1)
    for try await odd in odds {
        
    }
}

Processing in order

Next, I have this function

func process(value: Int) {
    print(value)
}

Question

How can I invoke process(value:) for all the generated Int(s) in order?

The result invocations should look like this

process(value: 0)
process(value: 1)
process(value: 2)
process(value: 3)
process(value: 4)
process(value: 5)
process(value: 6)
process(value: 7)
process(value: 8)
process(value: 9)

More

For reference and testing I am adding a mock implementation to generate the 2 streams

func stream(start: Int) -> AsyncThrowingStream<Int, Error> {
    .init { continuation in
        Task {
            for index in stride(from: start, to: 10, by: 2) {
                continuation.yield(index)
                let random = UInt64.random(in: (1...3))
                try await Task.sleep(nanoseconds: random * 1_000_000_000)
            }
            continuation.finish()
        }
    }
}

Thanks.

Update 1: Example for sync context

To be clear, if this was a sync context, my solution would look like this

let evens = [0, 2, 4, 6, 8]
let odds = [1, 3, 5, 7, 9]
zip(evens, odds)
    .map { [$0, $1] }
    .flatMap { $0 }
    .forEach(process)

Output:
0
1
2
3
4
5
6
7
8
9

Update 2: Priority Queue

Another requirement that may not be obvious from the question is that process(value:) must be called as soon as the next integer is produced by the streams. the successor of the last processed integer is produced by any of the 2 streams.

In other words:

  1. process(value:) must be called with arguments following the natural order of the non negative integers (0, 1, 2, 3, 4).
  2. AND process(value:) must be called as soon as possible (as the next integer has been produced by any of the streams).

Example

evens pushes 0
then process(value:0) is called

odds pushed 1
then process(value:1) is called

odds pushed 3
then 3 is put on hold

evens pushes 2
then process(value:2) is called
and process(value:3) is called

A Priority Queue sounds like a good fit for this case, but I am looking for the correct implementation compatible with AsyncThrowingStream and Swift Concurrency.


Solution

  • In a broader observation, you can zip or merge sequences using Apple’s Swift Async Algorithms package. For example, you said:

    To be clear, if this was a sync context, my solution would look like this

    let evens = [0, 2, 4, 6, 8]
    let odds = [1, 3, 5, 7, 9]
    zip(evens, odds)
        .map { [$0, $1] }
        .flatMap { $0 }
        .forEach(process)
    

    If you use Apple’s Swift Async Algorithms package, you can (a) make your async sequences by just adding async; and (b) zip the two sequences into one. E.g.:

    let evens = stream(start: 0) // or [0, 2, 4, 6, 8].async
    let odds = stream(start: 1)  // or [1, 3, 5, 7, 9].async
    
    for try await (even, odd) in zip(evens, odds) {
        process(value: even)
        process(value: odd)
    }
    

    This will, for example, not process the third odd element until both the third even and odd elements were emitted.

    So, imagine that these two sequences emitted the values in this order: [0, 1, 3, 2, 5, 7, 4, 9, 6, 8]. The zip approach will process them in order, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].


    On the other hand, if you wanted to process them as they come in, you might use merge

    let evens = stream(start: 0)
    let odds = stream(start: 1)
    
    for try await element in merge(evens, odds) {
        process(value: element)
    }
    

    Considering the above example, this merge rendition will process them in the order that they were encountered, e.g., [0, 1, 3, 2, 5, 7, 4, 9, 6, 8].


    To illustrate the difference between zip and merge, let me share some RxMarbles-like diagrams, which visually demonstrate these two sequence combining functions. In each diagram, the top two arrows are the timeline for the respective two input sequences, and the bottom arrow is the timeline for the resulting output sequence.

    E.g., zip:

    zip marbles

    As opposed to merge:

    merge marbles

    Hopefully, that helps illustrate what is going on.


    As an aside, I would advise to always handle cancellation in your custom asynchronous streams. For example, in your stream(start:), we might add an onTermination closure:

    func stream(start: Int) -> AsyncThrowingStream<Int, Error> {
        AsyncThrowingStream<Int, Error> { continuation in
            let task = Task {
                do {
                    for index in stride(from: start, to: 10, by: 2) {
                        try await Task.sleep(for: .seconds(.random(in: 1...3)))
                        continuation.yield(index)
                    }
                    continuation.finish()
                } catch {
                    continuation.finish(throwing: error)
                }
            }
    
            continuation.onTermination = { @Sendable state in
                if case .cancelled = state {
                    task.cancel()
                }
            }
        }
    }
    

    You’re probably not contemplating the possibility of cancellation at this point, but it is one of those things one should always handle in any asynchronous stream (or, for that matter, any async function).