javascriptrxjsobservable

Is there a way to repeat an observable until other observable emits?


I'm trying to run an observable repeatedly until some other observable is emitted. I don't want to stop currently unfinished observable, I only want it not to repeat again.

ts/rxjs

I tried using takeUntil, but this closes the stream$, without it actually finishing (it cuts it before 1000ms timer).

const stream$ = interval(1).pipe(takeUntil(timer(1000)));
stream$.pipe(repeat(), takeUntil(timer(1500)));

If we agree that stream$ runs for 1000ms before closing, what I expect to happen is that it should run 2 times 1000ms in this case. I need some sort of operator that would stop repeating the observable upon some event$.

Something like this:

const stream$ = interval(1).pipe(takeUntil(timer(1000)));
stream$.pipe(repeatUntil(timer(1500)));

sketch


Solution

  • We can use takeWhile and wait for the zeroth element emission and then trigger the take condition.

    This will generate the output, where we seem to receive an emission even after the set threshold time.

    const { rxObserver } = require('api/v0.3');
    const { timer, interval, Subject  } = require('rxjs');
    const { take, map, repeat, takeUntil, takeWhile, toArray} = require('rxjs/operators');
    
    const msg = 'awesome';
    let turnOff = true;
    
    interval(33).pipe(
        takeUntil(timer(100)),
        repeat(),
        takeWhile((data) => data === 0 ? turnOff : true),
      ).subscribe(rxObserver());
      
      setTimeout(() => {
        turnOff = false;
      }, 134)
    

    Try toggling the value of setTimeout between 134 and 133.

    Marbles diagram for the below code - Try it on ThinkRx -> use the above code to see it in action on this website.