I'm trying to run an observable repeatedly until some other observable is emitted. I don't want to stop currently unfinished observable, I only want it not to repeat again.
ts/rxjs
I tried using takeUntil, but this closes the stream$, without it actually finishing (it cuts it before 1000ms timer).
const stream$ = interval(1).pipe(takeUntil(timer(1000)));
stream$.pipe(repeat(), takeUntil(timer(1500)));
If we agree that stream$ runs for 1000ms before closing, what I expect to happen is that it should run 2 times 1000ms in this case. I need some sort of operator that would stop repeating the observable upon some event$.
Something like this:
const stream$ = interval(1).pipe(takeUntil(timer(1000)));
stream$.pipe(repeatUntil(timer(1500)));
We can use takeWhile
and wait for the zeroth element emission and then trigger the take condition.
This will generate the output, where we seem to receive an emission even after the set threshold time.
const { rxObserver } = require('api/v0.3');
const { timer, interval, Subject } = require('rxjs');
const { take, map, repeat, takeUntil, takeWhile, toArray} = require('rxjs/operators');
const msg = 'awesome';
let turnOff = true;
interval(33).pipe(
takeUntil(timer(100)),
repeat(),
takeWhile((data) => data === 0 ? turnOff : true),
).subscribe(rxObserver());
setTimeout(() => {
turnOff = false;
}, 134)
Try toggling the value of setTimeout between 134
and 133
.