swiftswift-concurrency

Type erasing `AsyncMapSequence<AsyncThrowingStream<Element, Error>, Element>`


I'd like to type erase AsyncMapSequence<AsyncThrowingStream<Element, Error>, Element>. My consumer doesn't care whether the sequence has been transformed, or how it has been transformed.

Ideally this could be type erased to the original stream type (AsyncThrowingStream<Element, Error>). Is that possible?

This is my attempt:

extension AsyncMapSequence {
    func eraseToAsyncThrowingStream() -> AsyncThrowingStream<Element, Error> {
        AsyncThrowingStream { continuation in
            Task.detached {
                do {
                    var iterator = self.makeAsyncIterator()
                    while let element = try await iterator.next() {
                        continuation.yield(element)
                    }
                    continuation.finish()
                } catch {
                    continuation.finish(throwing: error)
                }
            }
        }
    }
}

but that yields this error...

Fatal error: attempt to await next() on more than one task

I'm also not super sure why transforming a stream becomes a sequence


Solution

  • I believe this can be done more simply and generally with init(unfolding:) (I haven't done much testing on this, but I don't see any reason it shouldn't work):

    extension AsyncSequence {
        func eraseToAsyncThrowingStream() -> AsyncThrowingStream<Element, Error> {
            var iterator = self.makeAsyncIterator()
            return AsyncThrowingStream(unfolding: { try await iterator.next() })
        }
    }
    

    I recommend reading over the AnyAsyncSequence thread in the Swift forums for some further (and future) thoughts on the underlying question.