dartasynchronousstream

why am I getting "Bad state: Stream has already been listened to."?


Before asking this question, I extensively searched stackoverflow and other platforms for similar questions. However I could not find an answer to my problem.

So, basically, I am trying to use dart streams for event handling. I have written 2 similar (not identical) programs. First one works as expected. The second one produces Bad state: Stream has already been listened to" error. I can not figure out why.

If you provide an insight, I would be the most grateful.

1st Program (this works as expected)

import 'dart:async';

// Get a Stream as broadcast
StreamController<String> controller = StreamController<String>();
Stream<String> stream = controller.stream.asBroadcastStream();

void main(){
  // define listener 1
  StreamSubscription<String> subscriber = stream.listen((String data){
            print('Receiver1: $data');
      }, 
      onError: (error){
            print('Error occured: ${error}');
               },
      onDone: (){
            print('Stream closed');
  });

  // define listener 2
  StreamSubscription<String> subscriber2 = stream.listen((String data){
            print('Receiver2: $data');
      }, 
      onError: (error){
            print('Error occured: $error');
               },
      onDone: (){
            print('Stream closed');
  });

  // in every second emit an event. On 3rd second emit an error
  for(int i=0; i<5; i++){
    Timer(Duration(seconds: i), (){ 
      if (i == 3){
        controller.sink.addError(ArgumentError.new('important error'));
        } else {
          controller.sink.add('Seconds passed: $i');
        }
    });
  }
  print('finished');
}

2nd Program: This one produces the error.

import 'dart:async';

// Intention-specific StreamController
class MyStream  extends ControllerBase {}

// Just a StreamControllerBase, which centeralize the general idea
abstract class ControllerBase {
  final StreamController _streamController = StreamController();
  Stream<dynamic> get asBroadcastStream =>
      _streamController.stream.asBroadcastStream();
  StreamController get ctr => _streamController;

  void dispose() {
    _streamController.close();
  }
}

// General registry of StreamControllers
class ControllerProvider {
  static final ControllerProvider _controllerProvider =
      ControllerProvider._internal();

  // private constructor
  ControllerProvider._internal();

  // factory method
  factory ControllerProvider() {
    return _controllerProvider;
  }

  final Map<Type, dynamic> _registry = {};

  void register<T>(T controller) {
    _registry[T] = controller;
  }

  T get<T>() {
    return _registry[T] as T;
  }
}


// Just a class, which acts on MyStream
class B{
  B(){
    Stream stream = ControllerProvider().get<MyStream>().asBroadcastStream;
    stream.listen((data){
      print('Aha! from class B: ${data as String}');
    });
  }
}

// Another simple class, which also acts on MyStream. This class has an
// instance of class B. 
class A{
  B _b = B();
  A(){
    Stream stream = ControllerProvider().get<MyStream>().asBroadcastStream;
    stream.listen((data){
      print('Gotcha from class A: ${data as String}');
    });
  }
}


void main(){
  // Get an instance of MyStream
  MyStream myStream = MyStream();
  // register myStream instance in ControllerProvider
  ControllerProvider cp = ControllerProvider();
  cp.register<MyStream>(myStream);

  // Instantiate class A (both class A and class B instances should listen now)
  A a = A();

  // emit an event
  myStream.ctr.sink.add('helloo ');
  print('finished');
}


Solution

  • whenever you call asBroadcastStream, it creates a new broadcast stream on top of the same single subscription stream.

    so, if class B calls asBroadcastStream and then class A also calls it, you actually get two different broadcast streams, both trying to wrap the same single subscription stream which causes the error.

    Change your ControllerBase like this:

    abstract class ControllerBase {
      final StreamController _streamController = StreamController();
      late final Stream<dynamic> _broadcastStream =
          _streamController.stream.asBroadcastStream();
    
      Stream<dynamic> get asBroadcastStream => _broadcastStream;
      StreamController get ctr => _streamController;
    
      void dispose() {
        _streamController.close();
      }
    }
    

    now its always returns the same broadcast stream instance