To forward an event from a publisher to a subject, I just figured out we have two ways:
publisher.sink { subject.send() }
publisher.subscribe(subject)
The first one is the standard way while the second looks more adorable to me. However there seems to be some devil behind the subscribe
operator that keeps the subscription differently. I am wondering what causes the difference.
Below is the minimal code that illustrates the difference:
func pipeToSubject(from publisher: AnyPublisher<Void, Never>) -> AnyCancellable {
// Create an internal subject that never falls out of the method
let subject = PassthroughSubject<Void, Never>()
// Pipe the event from the publisher to the subject
return publisher.print().sink { subject.send() } // <--- the standard way to notify subject
}
// Create a never ending publisher
let publisher = Empty<Void, Never>(completeImmediately: false).eraseToAnyPublisher()
// Get the subscription
let cancellable = pipeToSubject(from: publisher)
// Cancel the subscription
cancellable.cancel()
Here is the printing output:
receive subscription: (Empty)
request unlimited
receive cancel
This is expected because the subject is internal and no one but sink
keeps the subject. Once the subscription gets cancelled, sink
gets released so does the subject.
Now when it comes to the other way to forward to the subject:
func pipeToSubject(from publisher: AnyPublisher<Void, Never>) -> AnyCancellable {
// Create an internal subject that never falls out of the method
let subject = PassthroughSubject<Void, Never>()
// Pipe the event from the publisher to the subject
return publisher.print().subscribe(subject) // <--- notify subject by `subscribe`
}
// Create a never ending publisher
let publisher = Empty<Void, Never>(completeImmediately: false).eraseToAnyPublisher()
// Get the subscription
let cancellable = pipeToSubject(from: publisher)
// Cancel the subscription
cancellable.cancel()
Here is the printing output:
receive subscription: (Empty)
No cancel at all even if we cancel the subscription returned by subscribe(subject)
- obviously subject
gets retained by the subscription internally. But I don't understand subscribe(subject)
has already returned a cancellable, why it still keeps the reference inside? Is it by definition, or it's a bug?
I think the evidence as to what is going on here is in the output line:
request unlimited
Combine uses a pull model. A Publisher will only react, it will only run code, when a subscriber
has asked for values.
For each Subscriber
the Publisher
creates Subscription
. The Subscriber
issues "demand" (formally Subscribers.Demand
) through the Subscription
to the Publisher
. When the Publisher
receives that demand, it can begin an asynchronous loop to respond to new input, or life-cycle events of the Subscription
.
When sink
receives the Subscription
it issues .unlimited
demand from its Subscription
. The Publisher
creates code to actively monitor the Subscription
.
If you just use subscribe
, your subscription is inert, it doesn't demand anything of the Publisher
and no asynchronous code runs that could respond to lifecycle events. When you issue the cancel
, the Subscription
knows it has been cancelled, but the Publisher
has no chance to run to do something about it.
We can force the issue and give the Publisher
that chance:
func pipeToSubject(from publisher: AnyPublisher<Void, Never>) -> AnyCancellable {
// Create an internal subject that never falls out of the method
let subject = PassthroughSubject<Void, Never>()
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + .seconds(5)) {
subject.send()
}
// Pipe the event from the publisher to the subject
return publisher.print().subscribe(subject) // <--- notify subject by `subscribe`
}
// Create a never ending publisher
let publisher = Empty<Void, Never>(completeImmediately: false).eraseToAnyPublisher()
// Get the subscription
var cancellable = pipeToSubject(from: publisher)
// Cancel the subscription
cancellable.cancel()
Now, after 5 seconds, the fact that try to send something gives the Publisher
a chance to run. It has an opportunity to notice that one of its Subscriptions
has been cancelled and can do something about it. Now the output is:
receive subscription: (Empty)
receive cancel
In summary, sink
generates demand and causes the Publisher
to create active code to respond to that demand, and to lifecycle events.
subscribe
, in of itself, makes no demand of the Publisher
so it never begins running code for that Subscription
. You have to invoke the Publisher
somehow and give it a chance to run before it will notice, and respond, to the cancellation.
I think what you are seeing is not a bug. It's expected behavior based on the fact that Combine is a "pull" system - based in Demand, to allow for back pressure - not a push model.