rxjsjasminerxjs-marblesrxjs-test-scheduler

expectObservable().toBe() only receives last value of stream


I want to test a service that uses internally a BehaviorSubject to hold the state and exposes Observables with a distinctUntilChanged() in the pipe. When I run the following test, than the actual steam that is compared with my expectations only 'contains' the last value. What do I have to understand to fix that?

import { BehaviorSubject } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';

describe('My exposed stream', () => {
  let testScheduler;

  beforeEach(() => {
    testScheduler = new TestScheduler((actual, expected) => {
      expect(actual).toEqual(expected);
    });
  });

  it('does not propagate if the current value equals the last one', () => {
    testScheduler.run(({ expectObservable }) => {
      const internalStream$ = new BehaviorSubject<string>(null);
      const exposedStream$ = internalStream$.pipe(distinctUntilChanged());

      expectObservable(exposedStream$).toBe('012', [null, 'foo', 'bar']);

      internalStream$.next('foo');
      internalStream$.next('foo');
      internalStream$.next('bar');
    });
  });
});

Result:

Expected $.length = 1 to equal 3.
Expected $[0].notification.value = 'bar' to equal null.
Expected $[1] = undefined to equal Object({ frame: 1, notification: Notification({ kind: 'N', value: 'foo', error: undefined, hasValue: true }) }).
Expected $[2] = undefined to equal Object({ frame: 2, notification: Notification({ kind: 'N', value: 'bar', error: undefined, hasValue: true }) }).

You can run and modify the code here: https://stackblitz.com/edit/typescript-moydq7?file=test.ts


Solution

  • My code had 2 issues:

    1. I needed to catch the events with a ReplySubject (Thank you, @AliF50)
    2. The correct marble notation to verify the behavior is (012) instead if 012, because the 3 events are sent within the same time frame synchronously.

    Here is my solution. Still very cumbersome and if you are aware of better ways, please let me know.

    import { BehaviorSubject, ReplaySubject } from 'rxjs';
    import { distinctUntilChanged } from 'rxjs/operators';
    import { TestScheduler } from 'rxjs/testing';
    
    describe('My exposed stream', () => {
      let testScheduler;
    
      beforeEach(() => {
        testScheduler = new TestScheduler((actual, expected) => {
          expect(actual).toEqual(expected);
        });
      });
    
      it('does not propagate if the current value equals the last one', () => {
        testScheduler.run(({ expectObservable }) => {
    
          // given
          const internalStream$ = new BehaviorSubject<string>(null);
          const exposedStream$ = internalStream$.pipe(distinctUntilChanged());
    
          const observedValues$ = new ReplaySubject<string>();
          exposedStream$.subscribe(observedValues$);
    
          // when
          internalStream$.next('foo');
          internalStream$.next('foo');
          internalStream$.next('bar');
    
          // then
          expectObservable(observedValues$).toBe('(012)', [null, 'foo', 'bar']);
        });
      });
    });
    

    You can play around with it here: https://stackblitz.com/edit/typescript-sa1n8v?file=test.ts