swiftswift-concurrency

Swift: Map AsyncStream into another AsyncStream


Update

The accepted answer did not directly answer the original question, but helped resolve the underlying issue I tried to solve: I wanted to map an AsyncStream (which is an AsyncSequence) into another AsyncSequence with element type T2. I added some details in this comment.

Original question

I would like to map an AsyncStream into another AsyncStream. I wonder if there is a .map that can be used just like for arrays.

Quoting from Apple documentation:

Creates an asynchronous sequence that maps the given closure over the asynchronous sequence’s elements.

To code below has an error:

Cannot convert value of type 'AsyncMapSequence<AsyncStream<Int>, Int>' to specified type 'AsyncStream<Int>'

As I understand, it is because the return type of .map in this case is AsyncMapSequence<...> instead of AsyncStream<Int>.

Is there a way to just map an AsyncStream<T1> into an AsyncStream<T2> with a transform function T1 → T2, as it works for mapping Array<T1> into Array<T2>?

Thank you in advance!

import SwiftUI

@main
struct MacosPlaygroundApp: App {
    var body: some Scene {
        WindowGroup("Playground") {
            Text("Hello World")
                .padding(100)
                .onAppear {
                    Task {
                        let numStream: AsyncStream<Int> = AsyncStream { continuation in
                            Task {
                                try await Task.sleep(nanoseconds: 1_000_000_000)
                                continuation.yield(0)
                                try await Task.sleep(nanoseconds: 1_000_000_000)
                                continuation.yield(1)
                                try await Task.sleep(nanoseconds: 1_000_000_000)
                                continuation.yield(2)
                                continuation.finish()
                            }
                        }

                        let doubleNumStream: AsyncStream<Int> = numStream.map { num in
                            return 2 * num
                        }

                        for await doubleNum in doubleNumStream {
                            print("Next num is \(doubleNum)")
                        }

                    }
                }
        }
    }
}


Solution

  • You said:

    Let's say I have a function, input is some async sequence of data of a certain type T, and for each such T item, it does something with it... For this function, it doesn't matter how that async sequence was calculated, e.g. whether it was mapped from another sequence or not. Ideally I would type it as AsyncSequence<T> (T being a specific type in my actual code), but AsyncSequence doesn't take type parameters.

    I would suggest that you define this function to use AsyncSequence, e.g., here is a method that prints the values of the sequence:

    func printSequence<S: AsyncSequence>(_ sequence: S) async throws where S.Element == Int {
        for try await value in sequence {
            print("Next num is \(value)")
        }
        print("done")
    }
    

    This will work with any AsyncSequence of Int, either the original numStream or the mapped doubleNumStream.

    Then, as Sweeper said, you can just use the existing map of AsyncSequence:

    Task {
        let numStream = AsyncStream<Int> { continuation in
            Task {
                try await Task.sleep(nanoseconds: 1_000_000_000)
                continuation.yield(0)
                try await Task.sleep(nanoseconds: 1_000_000_000)
                continuation.yield(1)
                try await Task.sleep(nanoseconds: 1_000_000_000)
                continuation.yield(2)
                continuation.finish()
            }
        }
    
        let doubleNumStream = numStream.map { num in             // let it just infer the type for you
            return 2 * num
        }
    
        try await printSequence(doubleNumStream)
    }