I have 2 AsyncThrowingStream(s)
which push (respectively) the first 5 even (and odd) non negative integers.
var evens = stream(start: 0) // 0, 2, 4, 6, 8
var odds = stream(start: 1) // 1, 3, 5, 7, 9
Every value is pushed after 1...3
seconds.
Since the 2 streams need some time to push the next value, I want to run them in concurrency.
E.g.
Task {
let evens = stream(start: 0)
for try await even in evens {
}
}
Task {
let odds = stream(start: 1)
for try await odd in odds {
}
}
Next, I have this function
func process(value: Int) {
print(value)
}
How can I invoke process(value:)
for all the generated Int(s)
in order?
The result invocations should look like this
process(value: 0)
process(value: 1)
process(value: 2)
process(value: 3)
process(value: 4)
process(value: 5)
process(value: 6)
process(value: 7)
process(value: 8)
process(value: 9)
For reference and testing I am adding a mock implementation to generate the 2 streams
func stream(start: Int) -> AsyncThrowingStream<Int, Error> {
.init { continuation in
Task {
for index in stride(from: start, to: 10, by: 2) {
continuation.yield(index)
let random = UInt64.random(in: (1...3))
try await Task.sleep(nanoseconds: random * 1_000_000_000)
}
continuation.finish()
}
}
}
Thanks.
To be clear, if this was a sync context, my solution would look like this
let evens = [0, 2, 4, 6, 8]
let odds = [1, 3, 5, 7, 9]
zip(evens, odds)
.map { [$0, $1] }
.flatMap { $0 }
.forEach(process)
Output:
0
1
2
3
4
5
6
7
8
9
Another requirement that may not be obvious from the question is that process(value:)
must be called as soon as the next integer is produced by the streams. the successor of the last processed
integer is produced by any of the 2 streams.
In other words:
process(value:)
must be called with arguments following the natural order of the non negative integers (0, 1, 2, 3, 4)
.process(value:)
must be called as soon as possible (as the next integer has been produced by any of the streams).Example
evens pushes 0
then process(value:0) is called
odds pushed 1
then process(value:1) is called
odds pushed 3
then 3 is put on hold
evens pushes 2
then process(value:2) is called
and process(value:3) is called
A Priority Queue sounds like a good fit for this case, but I am looking for the correct implementation compatible with AsyncThrowingStream and Swift Concurrency.
In a broader observation, you can zip
or merge
sequences using Apple’s Swift Async Algorithms package. For example, you said:
To be clear, if this was a sync context, my solution would look like this
let evens = [0, 2, 4, 6, 8] let odds = [1, 3, 5, 7, 9] zip(evens, odds) .map { [$0, $1] } .flatMap { $0 } .forEach(process)
If you use Apple’s Swift Async Algorithms package, you can (a) make your async sequences by just adding async
; and (b) zip
the two sequences into one. E.g.:
let evens = stream(start: 0) // or [0, 2, 4, 6, 8].async
let odds = stream(start: 1) // or [1, 3, 5, 7, 9].async
for try await (even, odd) in zip(evens, odds) {
process(value: even)
process(value: odd)
}
This will, for example, not process the third odd element until both the third even and odd elements were emitted.
So, imagine that these two sequences emitted the values in this order: [0, 1, 3, 2, 5, 7, 4, 9, 6, 8]
. The zip
approach will process them in order, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
.
On the other hand, if you wanted to process them as they come in, you might use merge
let evens = stream(start: 0)
let odds = stream(start: 1)
for try await element in merge(evens, odds) {
process(value: element)
}
Considering the above example, this merge
rendition will process them in the order that they were encountered, e.g., [0, 1, 3, 2, 5, 7, 4, 9, 6, 8]
.
To illustrate the difference between zip
and merge
, let me share some RxMarbles-like diagrams, which visually demonstrate these two sequence combining functions. In each diagram, the top two arrows are the timeline for the respective two input sequences, and the bottom arrow is the timeline for the resulting output sequence.
E.g., zip
:
As opposed to merge
:
Hopefully, that helps illustrate what is going on.
As an aside, I would advise to always handle cancellation in your custom asynchronous streams. For example, in your stream(start:)
, we might add an onTermination
closure:
func stream(start: Int) -> AsyncThrowingStream<Int, Error> {
AsyncThrowingStream<Int, Error> { continuation in
let task = Task {
do {
for index in stride(from: start, to: 10, by: 2) {
try await Task.sleep(for: .seconds(.random(in: 1...3)))
continuation.yield(index)
}
continuation.finish()
} catch {
continuation.finish(throwing: error)
}
}
continuation.onTermination = { @Sendable state in
if case .cancelled = state {
task.cancel()
}
}
}
}
You’re probably not contemplating the possibility of cancellation at this point, but it is one of those things one should always handle in any asynchronous stream (or, for that matter, any async
function).