dartdart-async

can I know when a StreamSubscription is cancelled?


is there anything like onCancel for a StreamSubscription?

example:

var subscription = someStream.listen((item) => null);

subscription.cancel(); // does this trigger any event?

I ended up creating a _StreamSubscriptionDelegate that delegates all methods and so I can put some logic when the subscription is cancelled, however, maybe there is an easier solution to it.


Solution

  • The onCancel for a StreamSubscription is on the StreamController which created the stream, if that's where the stream came from. There is nothing on the stream subscription itself. The only internal state it exposes is isPaused.

    That is, if the stream comes from a StreamController, then the controller is notified of the cancel by calling its onCancel. The code calling listen is expected to keep track of their own subscription, so if one part of the client code needs to know that another part has cancelled the stream, then they should share boolean variable, and not just the stream subscription.

    Alternatively you can wrap the subscription in something which records that you cancelled it, before sharing it with other code.

    Or you could wrap the stream before letting anyone listen to it:

    Stream<T> onCancel<T>(Stream<T> source, void onCancel()) async* {
      bool isCancelled = true;
      try {
        await for (var event in source) {
          yield event; // exits if cancelled, or on error.
        }
        isCancelled = false;
      } finally {
        if (isCancelled) onCancel();
      }
    }
    

    or

    Stream<T> onCancel<T>(Stream<T> source, void onCancel()) {
      var sink = StreamController<T>();
      sink.onListen = () {
        var subscription = source.listen(sink.add, onError: sink.onError, onDone: sink.close);
        sink
          ..onPause = subscription.pause
          ..onResume = subscription.resume
          ..onCancel = () {
            subscription.cancel();
            onCancel();
          };
      };   
      return sink.stream;
    }