swifttestingasync-awaitcombinepublisher

Is first-in-first-out guaranteed and safe in this Combine asyncMap implementation?


In our codebase, we were using quite often async Methods encapsulated in a Task inside a Combine pipeline.

After reading an article from swiftbysundell we implemented a Publisher extension with the asyncMap method in the article.

However, the way it was implemented we realized that it didn't guarantee that calls that came first in, came first out. After a bit of trying we figured that if we set maxPublisher in the flatMap to .max(1), we did seem to achieve a first-in-first-out. At least our tests seem to be running fine now.

However, we didn't see any implementation like this anywhere else or as a naive Swift feature, and ask ourselves if there are particular issues why this can't or shouldn't be implemented in a similar way.

We are wondering and asking if we did overlook something and maybe might run into some race conditions or unordered calls when running some async Backend calls that should be in order.

Here is the (not throwing) version of our extension:

import Foundation
import Combine

public extension Publisher {

    /// A publisher that transforms all elements from an upstream publisher using an async transform closure.
    /// Warning: The order of execution (FIFO) is only with `maxPublishers = .max(1)` guaranteed.
    func asyncMap<T>(maxPublishers: Subscribers.Demand = .max(1), _ transform: @escaping (Output) async -> T) -> AnyPublisher<T, Failure> {
        flatMap(maxPublishers: maxPublishers) { value -> Future<T, Failure> in
            Future { promise in
                Task {
                    let result = await transform(value)
                    promise(.success(result))
                }
            }
        }
        .eraseToAnyPublisher()
    }
}

And here are the tests we are running, one when the order doesn't matter, and the second test where the order is checked:

class PublisherAsyncTests: XCTestCase {

    var cancellableSubscriber = Set<AnyCancellable>()

    override func setUp() {
        super.setUp()
        cancellableSubscriber = []
    }

    func testAsyncMap() async throws {

        let expectation = expectation(description: "testAsyncMap")

        let sequence = [1, 2, 3, 4, 5, 6]
        var resultSequence: [Int] = []

        sequence
            .publisher
            .asyncMap(maxPublishers: .unlimited) { value in
                try? await Task.sleep(nanoseconds: UInt64.random(in: 10_000_000...20_000_000))
                return value
            }
            .collect()
            .sink { value in
                resultSequence = value
                expectation.fulfill()
            }
            .store(in: &cancellableSubscriber)

        await fulfillment(of: [expectation])

        XCTAssertEqual(sequence, resultSequence.sorted())
    }

    func testAsyncFIFOMap() async throws {

        let expectation = expectation(description: "testAsyncMap")

        let sequence = [1, 2, 3, 4, 5, 6]
        var resultSequence: [Int] = []

        sequence
            .publisher
            .asyncMap { value in
                try? await Task.sleep(nanoseconds: UInt64.random(in: 10_000_000...40_000_000))
                return value
            }
            .collect()
            .sink { value in
                resultSequence = value
                expectation.fulfill()
            }
            .store(in: &cancellableSubscriber)

        await fulfillment(of: [expectation], timeout: 5)

        XCTAssertEqual(sequence, resultSequence)
    }
}

If you need additional information or have questions please don't hesitate to ask and I can update my question.


Solution

  • The maxPublishers parameter in flatMap:

    Specifies the maximum number of concurrent publisher subscriptions, or unlimited if unspecified.

    So if you use .max(1), flatMap can only requests one element from its upstream at a time, wait for the Future to complete, publish that value, and then request another element.

    In other words, all the async operations you do in asyncMap will be carried out sequentially, not in parallel. This means that the order from the upstream is preserved, and there can be no race conditions.

    If the async operations are carried out in parallel, then whichever finishes first is first published by flatMap, and that's why the order is not preserved if you use the .unlimited default.

    For a simple demonstration, consider:

    let x = [1,2,3,4].publisher
    let cancellable = x.asyncMap(maxPublishers: .max(1)) { i in
        try! await Task.sleep(for: .seconds(1))
    }.sink { _ in
        print("Foo")
    }
    

    If you want it to run the async operations in parallel, and still have the order be preserved, that is a lot less trivial than this. I cannot think off the top of my head how you'd implement this.