I need to store the AsyncIterator
of a ThrowingTaskGroup
for later use. I'm creating this iterator inside of an async
function inside of an actor. However, as the resulting iterator
is a value type, I cannot store and later access it outside of the scope of the current function.
Is there something inherently wrong with my approach?
Here's some code to illustrate the issue:
actor Foo {
private var iterator: ThrowingTaskGroup<String, any Error>.Iterator?
func bar() async throws {
let workItems = ["A", "B", "C", "D"]
try await withThrowingTaskGroup(of: String.self) { group in
workItems.forEach { item in
group.addTask { [unowned self] in
return try await doWork(on: item)
}
}
iterator = group.makeAsyncIterator()
guard let firstItem = try await iterator?.next() else { // Cannot call mutating async function 'next()' on actor-isolated property 'iterator'
throw "An error"
}
// do some work based on first item
}
}
private func atALaterPointInTime() async throws {
while let item = try await iterator?.next() { // Cannot call mutating async function 'next()' on
await doMoreWork(on: item)
}
}
private func doWork(on item: String) async throws -> String {
return item
}
private func doMoreWork(on item: String) async {
// ...
}
}
Task groups are not supposed to escape from the task that created them, so this approach is not going to work.
Assuming you want bar
to return once the first task in the task group completes. I would use a top-level task to run the task group, and feed the values produced by the task group to a AsyncThrowingStream
, which can then be consumed by atALaterPointInTime
.
Here is an example with a task group of Int
s. I've used Int
s here so that each subtask can complete with a different amount of time by waiting a different number of seconds.
actor Foo {
private var stream: AsyncThrowingStream<Int, Error>?
func bar() async throws {
let workItems = [1, 2, 3, 4]
let stream = AsyncThrowingStream { continuation in
Task {
do {
try await withThrowingTaskGroup(of: Int.self) { group in
workItems.forEach { item in
group.addTask { [unowned self] in
return try await doWork(on: item)
}
}
for try await item in group {
continuation.yield(item)
}
continuation.finish()
}
} catch {
continuation.finish(throwing: error)
}
}
}
self.stream = stream
var iter = stream.makeAsyncIterator()
guard let firstItem = try await iter.next() else {
throw SomeError()
}
print("bar is doing some work with firstItem")
}
func atALaterPointInTime() async throws {
guard let stream else { return }
for try await item in stream {
await doMoreWork(on: item)
}
}
private func doWork(on item: Int) async throws -> Int {
print("Doing work on \(item)")
try await Task.sleep(for: .seconds(item))
print("Work on \(item) is done")
return item
}
private func doMoreWork(on item: Int) async {
print("Doing more work with \(item)")
}
}
Note that in Swift 5.10 this will produce a warning on the try await iter.next()
call. As far as I understand from SE-0421, this warning is a false positive, and will be fixed in Swift 6.
Here is a simple SwiftUI view to call Foo.bar
and Foo.atALaterPointInTime
:
struct ContentView: View {
@State var foo = Foo()
var body: some View {
Button("Do More Work") {
Task {
do {
try await foo.atALaterPointInTime()
} catch {
print(error)
}
}
}
.task {
try! await foo.bar()
}
}
}