dartunit-testingasync-awaitstreaming

Unit testing a broadcast streaming class


I want to unit test a broadcast-streaming repo. This repo yields a StateOk at first and StateError on calling onEvent():

class Repo {
  final _controller = StreamController<State>.broadcast();

  Stream<State> get changes async* {
    yield StateOk();
    yield* _controller.stream;
  }

  Future<void> onEvent() async {
    _controller.add(StateError());
  }
}

abstract class State extends Equatable {
  @override
  List<Object?> get props => [];
}
class StateOk extends State {}
class StateError extends State {}

When I try to run a unit test like this:

void main() {
  test('testError', () async {
    final Repo repo = Repo();
    expectLater(
        repo.changes,
        emitsInOrder([
          StateOk(),
          StateError(),
        ]));

    //await Future.delayed(const Duration(milliseconds: 1));
    repo.onEvent();
  });
}

the test runs indefinitely.

If I add the Future.delayed() before calling onEvent, the test completes successfully. So, I first assumed it was a timing issue and we are missing the initial StateOk streaming element and that's why expectLater doesn't complete.

However, if I remove the StateError() in emitsInOrder(), it works without the additional wait time. This tells me, we are not missing the initial StateOk. What's going on here?


Solution

  • Your problem is that you are using a broadcast stream.

    When you get the changes stream, and listens to it (which itself introduces an asynchronous delay because async* functions don't start immediately when the stream is listened to), then it first emits a StateOk() event. And only after that has been delievered, which can take some time, it does yield* _controller.stream.

    That means that between doing expectLater(repo.changes, ...), which listens on the stream, there will be a number of asynchronous delays before yield* is reached.

    If the StateError is emitted by _controller.stream before that yield* starts listening to the stream, then you'll never see the event. That's how broadcast streams work, if you don't listen in time, the event is broadcast into the void.

    What I'd do is to make changes act eagerly and synchronously to being listend to. I'd probably go with something like:

      late final Stream<State> changes = Stream.multi((controller) {
        // Called when stream listened to.
        // StateOk sent asynchronously.
        controller.add(StateOk()); 
        // Listen to _controller.stream *right now*! Events buffered if needed.
        controller.addStream(_controller.stream).whenComplete(controller.closeSync);
      });
    

    (This also closes the stream if the original stream closes, which is probably a good idea, but the test needs to expect it.)