swiftcombinebackpressure

Why does `Publishers.Map` consume upstream values eagerly?


Suppose I have a custom subscriber that requests one value on subscription and then an additional value three seconds after it receives the previous value:

class MySubscriber: Subscriber {
    typealias Input = Int
    typealias Failure = Never

    private var subscription: Subscription?

    func receive(subscription: Subscription) {
        print("Subscribed")

        self.subscription = subscription
        subscription.request(.max(1))
    }

    func receive(_ input: Int) -> Subscribers.Demand {
        print("Value: \(input)")

        DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(3)) {
            self.subscription?.request(.max(1))
        }

        return .none
    }

    func receive(completion: Subscribers.Completion<Never>) {
        print("Complete")
        subscription = nil
    }
}

If I use this to subscribe to an infinite range publisher, back pressure is handled gracefully, with the publisher waiting 3 seconds each time until it receives the next demand to send a value:

(1...).publisher.subscribe(MySubscriber())

// Prints values infinitely with ~3 seconds between each:
//
//     Subscribed
//     Value: 1
//     Value: 2
//     Value: 3
//     ...

But if I add a map operator then MySubscriber never even receives a subscription; map appears to have synchronously requested Demand.Unlimited upon receiving its subscription and the app infinitely spins as map tries to exhaust the infinite range:

(1...).publisher
    .map { value in
        print("Map: \(value)")
        return value * 2
    }
    .subscribe(MySubscriber())

// The `map` transform is executed infinitely with no delay:
//
//     Map: 1
//     Map: 2
//     Map: 3
//     ...

My question is, why does map behave this way? I would have expected map to just pass its downstream demand to the upstream. Since map is supposed to be for transformation rather than side effects, I don't understand what the use case is for its current behavior.

EDIT

I implemented a version of map to show how I think it ought to work:

extension Publishers {
    struct MapLazily<Upstream: Publisher, Output>: Publisher {
        typealias Failure = Upstream.Failure

        let upstream: Upstream
        let transform: (Upstream.Output) -> Output

        init(upstream: Upstream, transform: @escaping (Upstream.Output) -> Output) {
            self.upstream = upstream
            self.transform = transform
        }

        public func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Upstream.Failure {
            let mapSubscriber = Subscribers.LazyMapSubscriber(downstream: subscriber, transform: transform)
            upstream.receive(subscriber: mapSubscriber)
        }
    }
}

extension Subscribers {
    class LazyMapSubscriber<Input, DownstreamSubscriber: Subscriber>: Subscriber {
        let downstream: DownstreamSubscriber
        let transform: (Input) -> DownstreamSubscriber.Input

        init(downstream: DownstreamSubscriber, transform: @escaping (Input) -> DownstreamSubscriber.Input) {
            self.downstream = downstream
            self.transform = transform
        }

        func receive(subscription: Subscription) {
            downstream.receive(subscription: subscription)
        }

        func receive(_ input: Input) -> Subscribers.Demand {
            downstream.receive(transform(input))
        }

        func receive(completion: Subscribers.Completion<DownstreamSubscriber.Failure>) {
            downstream.receive(completion: completion)
        }
    }
}

extension Publisher {
    func mapLazily<Transformed>(transform: @escaping (Output) -> Transformed) -> AnyPublisher<Transformed, Failure> {
        Publishers.MapLazily(upstream: self, transform: transform).eraseToAnyPublisher()
    }
}

Using this operator, MySubscriber receives the subscription immediately and the mapLazily transform is only executed when there is demand:

(1...).publisher
    .mapLazily { value in
        print("Map: \(value)")
        return value * 2
    }
    .subscribe(MySubscriber())

// Only transforms the values when they are demanded by the downstream subscriber every 3 seconds:
//
//     Subscribed
//     Map: 1
//     Value: 2
//     Map: 2
//     Value: 4
//     Map: 3
//     Value: 6
//     Map: 4
//     Value: 8

My guess is that the particular overload of map defined for Publishers.Sequence is using some kind of shortcut to enhance performance. This breaks for infinite sequences, but even for finite sequences eagerly exhausting the sequence regardless of the downstream demand messes with my intuition. In my view, the following code:

(1...3).publisher
    .map { value in
        print("Map: \(value)")
        return value * 2
    }
    .subscribe(MySubscriber())

ought to print:

Subscribed
Map: 1
Value: 2
Map: 2
Value: 4
Map: 3
Value: 6
Complete

but instead prints:

Map: 1
Map: 2
Map: 3
Subscribed
Value: 2
Value: 4
Value: 6
Complete

Solution

  • Here's a simpler test that doesn't involve any custom subscribers:

    (1...).publisher
        //.map { $0 }
        .flatMap(maxPublishers: .max(1)) {
            (i:Int) -> AnyPublisher<Int,Never> in
            Just<Int>(i)
                .delay(for: 3, scheduler: DispatchQueue.main)
                .eraseToAnyPublisher()
    }
    .sink { print($0) }
    .store(in: &storage)
    

    It works as expected, but then if you uncomment the .map you get nothing, because the .map operator is accumulating the infinite upstream values without publishing anything.

    On the basis of your hypothesis that map is somehow optimizing for a preceding sequence publisher, I tried this workaround:

    (1...).publisher.eraseToAnyPublisher()
        .map { $0 }
        // ...
    

    And sure enough, it fixed the problem! By hiding the sequence publisher from the map operator, we prevent the optimization.