Before asking this question, I extensively searched stackoverflow and other platforms for similar questions. However I could not find an answer to my problem.
So, basically, I am trying to use dart streams for event handling. I have written 2 similar (not identical) programs. First one works as expected. The second one produces Bad state: Stream has already been listened to" error. I can not figure out why.
If you provide an insight, I would be the most grateful.
1st Program (this works as expected)
import 'dart:async';
// Get a Stream as broadcast
StreamController<String> controller = StreamController<String>();
Stream<String> stream = controller.stream.asBroadcastStream();
void main(){
// define listener 1
StreamSubscription<String> subscriber = stream.listen((String data){
print('Receiver1: $data');
},
onError: (error){
print('Error occured: ${error}');
},
onDone: (){
print('Stream closed');
});
// define listener 2
StreamSubscription<String> subscriber2 = stream.listen((String data){
print('Receiver2: $data');
},
onError: (error){
print('Error occured: $error');
},
onDone: (){
print('Stream closed');
});
// in every second emit an event. On 3rd second emit an error
for(int i=0; i<5; i++){
Timer(Duration(seconds: i), (){
if (i == 3){
controller.sink.addError(ArgumentError.new('important error'));
} else {
controller.sink.add('Seconds passed: $i');
}
});
}
print('finished');
}
2nd Program: This one produces the error.
import 'dart:async';
// Intention-specific StreamController
class MyStream extends ControllerBase {}
// Just a StreamControllerBase, which centeralize the general idea
abstract class ControllerBase {
final StreamController _streamController = StreamController();
Stream<dynamic> get asBroadcastStream =>
_streamController.stream.asBroadcastStream();
StreamController get ctr => _streamController;
void dispose() {
_streamController.close();
}
}
// General registry of StreamControllers
class ControllerProvider {
static final ControllerProvider _controllerProvider =
ControllerProvider._internal();
// private constructor
ControllerProvider._internal();
// factory method
factory ControllerProvider() {
return _controllerProvider;
}
final Map<Type, dynamic> _registry = {};
void register<T>(T controller) {
_registry[T] = controller;
}
T get<T>() {
return _registry[T] as T;
}
}
// Just a class, which acts on MyStream
class B{
B(){
Stream stream = ControllerProvider().get<MyStream>().asBroadcastStream;
stream.listen((data){
print('Aha! from class B: ${data as String}');
});
}
}
// Another simple class, which also acts on MyStream. This class has an
// instance of class B.
class A{
B _b = B();
A(){
Stream stream = ControllerProvider().get<MyStream>().asBroadcastStream;
stream.listen((data){
print('Gotcha from class A: ${data as String}');
});
}
}
void main(){
// Get an instance of MyStream
MyStream myStream = MyStream();
// register myStream instance in ControllerProvider
ControllerProvider cp = ControllerProvider();
cp.register<MyStream>(myStream);
// Instantiate class A (both class A and class B instances should listen now)
A a = A();
// emit an event
myStream.ctr.sink.add('helloo ');
print('finished');
}
whenever you call asBroadcastStream, it creates a new broadcast stream on top of the same single subscription stream.
so, if class B calls asBroadcastStream and then class A also calls it, you actually get two different broadcast streams, both trying to wrap the same single subscription stream which causes the error.
Change your ControllerBase like this:
abstract class ControllerBase {
final StreamController _streamController = StreamController();
late final Stream<dynamic> _broadcastStream =
_streamController.stream.asBroadcastStream();
Stream<dynamic> get asBroadcastStream => _broadcastStream;
StreamController get ctr => _streamController;
void dispose() {
_streamController.close();
}
}
now its always returns the same broadcast stream instance