In our codebase, we were using quite often async
Methods encapsulated in a Task
inside a Combine pipeline.
After reading an article from swiftbysundell we implemented a Publisher extension with the asyncMap
method in the article.
However, the way it was implemented we realized that it didn't guarantee that calls that came first in, came first out. After a bit of trying we figured that if we set maxPublisher
in the flatMap
to .max(1)
, we did seem to achieve a first-in-first-out. At least our tests seem to be running fine now.
However, we didn't see any implementation like this anywhere else or as a naive Swift feature, and ask ourselves if there are particular issues why this can't or shouldn't be implemented in a similar way.
We are wondering and asking if we did overlook something and maybe might run into some race conditions or unordered calls when running some async Backend calls that should be in order.
Here is the (not throwing) version of our extension:
import Foundation
import Combine
public extension Publisher {
/// A publisher that transforms all elements from an upstream publisher using an async transform closure.
/// Warning: The order of execution (FIFO) is only with `maxPublishers = .max(1)` guaranteed.
func asyncMap<T>(maxPublishers: Subscribers.Demand = .max(1), _ transform: @escaping (Output) async -> T) -> AnyPublisher<T, Failure> {
flatMap(maxPublishers: maxPublishers) { value -> Future<T, Failure> in
Future { promise in
Task {
let result = await transform(value)
promise(.success(result))
}
}
}
.eraseToAnyPublisher()
}
}
And here are the tests we are running, one when the order doesn't matter, and the second test where the order is checked:
class PublisherAsyncTests: XCTestCase {
var cancellableSubscriber = Set<AnyCancellable>()
override func setUp() {
super.setUp()
cancellableSubscriber = []
}
func testAsyncMap() async throws {
let expectation = expectation(description: "testAsyncMap")
let sequence = [1, 2, 3, 4, 5, 6]
var resultSequence: [Int] = []
sequence
.publisher
.asyncMap(maxPublishers: .unlimited) { value in
try? await Task.sleep(nanoseconds: UInt64.random(in: 10_000_000...20_000_000))
return value
}
.collect()
.sink { value in
resultSequence = value
expectation.fulfill()
}
.store(in: &cancellableSubscriber)
await fulfillment(of: [expectation])
XCTAssertEqual(sequence, resultSequence.sorted())
}
func testAsyncFIFOMap() async throws {
let expectation = expectation(description: "testAsyncMap")
let sequence = [1, 2, 3, 4, 5, 6]
var resultSequence: [Int] = []
sequence
.publisher
.asyncMap { value in
try? await Task.sleep(nanoseconds: UInt64.random(in: 10_000_000...40_000_000))
return value
}
.collect()
.sink { value in
resultSequence = value
expectation.fulfill()
}
.store(in: &cancellableSubscriber)
await fulfillment(of: [expectation], timeout: 5)
XCTAssertEqual(sequence, resultSequence)
}
}
If you need additional information or have questions please don't hesitate to ask and I can update my question.
The maxPublishers
parameter in flatMap
:
Specifies the maximum number of concurrent publisher subscriptions, or
unlimited
if unspecified.
So if you use .max(1)
, flatMap
can only requests one element from its upstream at a time, wait for the Future
to complete, publish that value, and then request another element.
In other words, all the async operations you do in asyncMap
will be carried out sequentially, not in parallel. This means that the order from the upstream is preserved, and there can be no race conditions.
If the async operations are carried out in parallel, then whichever finishes first is first published by flatMap
, and that's why the order is not preserved if you use the .unlimited
default.
For a simple demonstration, consider:
let x = [1,2,3,4].publisher
let cancellable = x.asyncMap(maxPublishers: .max(1)) { i in
try! await Task.sleep(for: .seconds(1))
}.sink { _ in
print("Foo")
}
This will print four Foo
s with 1 second delay between each one.
If it were maxPublishers: .max(2)
, it would be print two Foo
s after 1 second, and another two Foo
s after 1 second.
If it were maxPublishers: .unlimited
, it would print four Foo
s after 1 second.
If it were maxPublishers: .none
, it will print nothing and the publisher never complete.
If you want it to run the async operations in parallel, and still have the order be preserved, that is a lot less trivial than this. I cannot think off the top of my head how you'd implement this.