swiftswift-concurrency

Type erasure in Swift Concurrency AsyncStream


Experimenting with Swift's concurrency, I would like to have a clean API for exposing an async sequence of a given element type and a throttled version of the same:

  var intStream: AsyncStream<Int> {
    AsyncStream<Int>(Int.self, bufferingPolicy: .bufferingNewest(5)) { continuation in
      Task.detached {
        for _ in 0..<100 {
          try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
          continuation.yield(Int.random(in: 1...10))
        }
        continuation.finish()
      }
    }
  }
  
  var throttledIntStream: AsyncStream<Int> {
    intStream.throttle(for: .seconds(2))
  }

But this does not work as throttle returns its own type: Error: Cannot convert return expression of type 'AsyncThrottleSequence<AsyncStream<Int>, ContinuousClock, Int>' to return type 'AsyncStream<Int>'

To get type erasure I could do

var throttledIntStream: some AsyncSequence {
  intStream.debounce(for: Duration.seconds(2))
}

but then I lose the element type information as well, which I would like to keep.

Any suggestions how to best solve that?

Edit: This is pointing to the solution I want, but I guess I will need to wait https://forums.swift.org/t/anyasyncsequence/50828/2


Solution

  • I know this does not help you in this old question, but for contemporary readers, it is worth noting that:

    So, for contemporary OS versions, one can use type erasure to AsyncSequence without resorting to the historical sleight of hand:

    var integers: some AsyncSequence<Int, Never> {
        AsyncStream(bufferingPolicy: .bufferingNewest(5)) { continuation in
            Task {
                defer { continuation.finish() }
    
                for _ in 0 ..< 100 {
                    try await Task.sleep(for: .seconds(1))
                    continuation.yield(.random(in: 1...10))
                }
            }
        }
    }
    
    var throttledIntegers: some AsyncSequence<Int, Never> {
        integers._throttle(for: .seconds(2))
    }
    

    (Unrelated, but throttle has temporarily been renamed _throttle. It is a work in progress.)


    As an aside, as a matter of best practice, I might also advise:

    So, perhaps:

    func integers() -> some AsyncSequence<Int, any Error> {
        AsyncThrowingStream(bufferingPolicy: .bufferingNewest(5)) { continuation in
            let task = Task {
                do {
                    for _ in 0 ..< 100 {
                        try await Task.sleep(for: .seconds(1))
                        continuation.yield(.random(in: 1...10))
                    }
                    continuation.finish()
                } catch {
                    continuation.finish(throwing: error)
                }
            }
    
            continuation.onTermination = { state in
                if case .cancelled = state {
                    task.cancel()
                }
            }
        }
    }
    
    func throttledIntegers(for duration: Duration = .seconds(2)) -> some AsyncSequence<Int, any Error> {
        integers()._throttle(for: duration)
    }