I'm new to RxJs and having trouble to achieve this in "RxJs way":
An infinite stream a$
emits a value a
once a while.
takes the a
and performs an async operation.
If a$
emits values while async
is pending, only keep the latest one al
After the previous async
completes, if there is an al
, run async(al)
And so on.
async(a1):------------end async(a4):---
Here is what I came up with, a bit nasty:
var asyncIdle$ = new Rx.BehaviorSubject()
var asyncRunning$ = new Rx.Subject()
var async$ = asyncIdle$
function async (val) {
async$ = asyncRunning$
// do something with val
console.log(val + ' handling')
setTimeout(() => {
console.log(val + ' complete')
async$ = asyncIdle$
}, 2000)
// simulate a$
var a$ = Rx.Observable.fromEvent(document, 'click')
.scan((acc, curr) => acc + curr)
.do(val => console.log('got ' + val))
a$.debounce(() => async$)
.subscribe(val => {
You can use the audit
operator to solve the problem, like this (the comments should explain how it works):
// Simulate the source.
const source = Rx.Observable.merge(
).do(value => console.log("source", value));
// Simulate the async task.
function asyncTask(value) {
return Rx.Observable
.do(value => console.log(" before async", value))
.do(value => console.log(" after async", value));
// Compose an observable that's based on the source.
// Use audit to ensure a value is not emitted until
// the async task has been performed.
// Use share so that the signal does not effect a
// second subscription to the source.
let signal;
const audited = source
.audit(() => signal)
.mergeMap(value => asyncTask(value))
// Compose a signal from the audited observable to
// which the async task is applied.
// Use startWith so that the first emitted value
// passes the audit.
signal = audited
audited.subscribe(value => console.log("output", value));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>