combinesubject

Difference between `subscribe` and `sink` when piping a publisher to a subject in Combine?


To forward an event from a publisher to a subject, I just figured out we have two ways:

  1. publisher.sink { subject.send() }
  2. publisher.subscribe(subject)

The first one is the standard way while the second looks more adorable to me. However there seems to be some devil behind the subscribe operator that keeps the subscription differently. I am wondering what causes the difference.

Below is the minimal code that illustrates the difference:

func pipeToSubject(from publisher: AnyPublisher<Void, Never>) -> AnyCancellable {
    // Create an internal subject that never falls out of the method
    let subject = PassthroughSubject<Void, Never>()

    // Pipe the event from the publisher to the subject
    return publisher.print().sink { subject.send() } // <--- the standard way to notify subject
}

// Create a never ending publisher
let publisher = Empty<Void, Never>(completeImmediately: false).eraseToAnyPublisher()

// Get the subscription
let cancellable = pipeToSubject(from: publisher)

// Cancel the subscription
cancellable.cancel()

Here is the printing output:

receive subscription: (Empty)
request unlimited
receive cancel

This is expected because the subject is internal and no one but sink keeps the subject. Once the subscription gets cancelled, sink gets released so does the subject.

Now when it comes to the other way to forward to the subject:

func pipeToSubject(from publisher: AnyPublisher<Void, Never>) -> AnyCancellable {
    // Create an internal subject that never falls out of the method
    let subject = PassthroughSubject<Void, Never>()

    // Pipe the event from the publisher to the subject
    return publisher.print().subscribe(subject) // <--- notify subject by `subscribe`
}

// Create a never ending publisher
let publisher = Empty<Void, Never>(completeImmediately: false).eraseToAnyPublisher()

// Get the subscription
let cancellable = pipeToSubject(from: publisher)

// Cancel the subscription
cancellable.cancel()

Here is the printing output:

receive subscription: (Empty)

No cancel at all even if we cancel the subscription returned by subscribe(subject) - obviously subject gets retained by the subscription internally. But I don't understand subscribe(subject) has already returned a cancellable, why it still keeps the reference inside? Is it by definition, or it's a bug?


Solution

  • I think the evidence as to what is going on here is in the output line:

    request unlimited

    Combine uses a pull model. A Publisher will only react, it will only run code, when a subscriber has asked for values.

    For each Subscriber the Publisher creates Subscription. The Subscriber issues "demand" (formally Subscribers.Demand) through the Subscription to the Publisher. When the Publisher receives that demand, it can begin an asynchronous loop to respond to new input, or life-cycle events of the Subscription.

    When sink receives the Subscription it issues .unlimited demand from its Subscription. The Publisher creates code to actively monitor the Subscription.

    If you just use subscribe, your subscription is inert, it doesn't demand anything of the Publisher and no asynchronous code runs that could respond to lifecycle events. When you issue the cancel, the Subscription knows it has been cancelled, but the Publisher has no chance to run to do something about it.

    We can force the issue and give the Publisher that chance:

    func pipeToSubject(from publisher: AnyPublisher<Void, Never>) -> AnyCancellable {
        // Create an internal subject that never falls out of the method
        let subject = PassthroughSubject<Void, Never>()
    
        DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + .seconds(5)) {
          subject.send()
        }
    
        // Pipe the event from the publisher to the subject
        return publisher.print().subscribe(subject) // <--- notify subject by `subscribe`
    }
    
    // Create a never ending publisher
    let publisher = Empty<Void, Never>(completeImmediately: false).eraseToAnyPublisher()
    
    // Get the subscription
    var cancellable = pipeToSubject(from: publisher)
    
    // Cancel the subscription
    cancellable.cancel()
    

    Now, after 5 seconds, the fact that try to send something gives the Publisher a chance to run. It has an opportunity to notice that one of its Subscriptions has been cancelled and can do something about it. Now the output is:

    receive subscription: (Empty)
    receive cancel
    

    In summary, sink generates demand and causes the Publisher to create active code to respond to that demand, and to lifecycle events.

    subscribe, in of itself, makes no demand of the Publisher so it never begins running code for that Subscription. You have to invoke the Publisher somehow and give it a chance to run before it will notice, and respond, to the cancellation.

    I think what you are seeing is not a bug. It's expected behavior based on the fact that Combine is a "pull" system - based in Demand, to allow for back pressure - not a push model.