Experimenting with Swift's concurrency, I would like to have a clean API for exposing an async sequence of a given element type and a throttled version of the same:
var intStream: AsyncStream<Int> {
AsyncStream<Int>(Int.self, bufferingPolicy: .bufferingNewest(5)) { continuation in
Task.detached {
for _ in 0..<100 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
continuation.yield(Int.random(in: 1...10))
}
continuation.finish()
}
}
}
var throttledIntStream: AsyncStream<Int> {
intStream.throttle(for: .seconds(2))
}
But this does not work as throttle returns its own type:
Error:
Cannot convert return expression of type 'AsyncThrottleSequence<AsyncStream<Int>, ContinuousClock, Int>' to return type 'AsyncStream<Int>'
To get type erasure I could do
var throttledIntStream: some AsyncSequence {
intStream.debounce(for: Duration.seconds(2))
}
but then I lose the element type information as well, which I would like to keep.
Any suggestions how to best solve that?
Edit: This is pointing to the solution I want, but I guess I will need to wait https://forums.swift.org/t/anyasyncsequence/50828/2
I know this does not help you in this old question, but for contemporary readers, it is worth noting that:
AsyncSequence
; andAsyncSequence.Failure
is a supported PAT.So, for contemporary OS versions, one can use type erasure to AsyncSequence
without resorting to the historical sleight of hand:
var integers: some AsyncSequence<Int, Never> {
AsyncStream(bufferingPolicy: .bufferingNewest(5)) { continuation in
Task {
defer { continuation.finish() }
for _ in 0 ..< 100 {
try await Task.sleep(for: .seconds(1))
continuation.yield(.random(in: 1...10))
}
}
}
}
var throttledIntegers: some AsyncSequence<Int, Never> {
integers._throttle(for: .seconds(2))
}
(Unrelated, but throttle
has temporarily been renamed _throttle
. It is a work in progress.)
As an aside, as a matter of best practice, I might also advise:
So, perhaps:
func integers() -> some AsyncSequence<Int, any Error> {
AsyncThrowingStream(bufferingPolicy: .bufferingNewest(5)) { continuation in
let task = Task {
do {
for _ in 0 ..< 100 {
try await Task.sleep(for: .seconds(1))
continuation.yield(.random(in: 1...10))
}
continuation.finish()
} catch {
continuation.finish(throwing: error)
}
}
continuation.onTermination = { state in
if case .cancelled = state {
task.cancel()
}
}
}
}
func throttledIntegers(for duration: Duration = .seconds(2)) -> some AsyncSequence<Int, any Error> {
integers()._throttle(for: duration)
}