I'm new to RxJs and having trouble to achieve this in "RxJs way":
An infinite stream a$
emits a value a
once a while.
async()
takes the a
and performs an async operation.
If a$
emits values while async
is pending, only keep the latest one al
.
After the previous async
completes, if there is an al
, run async(al)
.
And so on.
a$:----a1----------a2----a3-----------------------a4-----------
async(a1):------------end async(a4):---
async(a3):-----end
Here is what I came up with, a bit nasty:
var asyncIdle$ = new Rx.BehaviorSubject()
var asyncRunning$ = new Rx.Subject()
var async$ = asyncIdle$
function async (val) {
async$ = asyncRunning$
// do something with val
console.log(val + ' handling')
setTimeout(() => {
console.log(val + ' complete')
async$.next()
async$ = asyncIdle$
}, 2000)
}
// simulate a$
var a$ = Rx.Observable.fromEvent(document, 'click')
.mapTo(1)
.scan((acc, curr) => acc + curr)
.do(val => console.log('got ' + val))
a$.debounce(() => async$)
.subscribe(val => {
async(val)
})
You can use the audit
operator to solve the problem, like this (the comments should explain how it works):
// Simulate the source.
const source = Rx.Observable.merge(
Rx.Observable.of(1).delay(0),
Rx.Observable.of(2).delay(10),
Rx.Observable.of(3).delay(20),
Rx.Observable.of(4).delay(150),
Rx.Observable.of(5).delay(300)
).do(value => console.log("source", value));
// Simulate the async task.
function asyncTask(value) {
return Rx.Observable
.of(value)
.do(value => console.log(" before async", value))
.delay(100)
.do(value => console.log(" after async", value));
}
// Compose an observable that's based on the source.
// Use audit to ensure a value is not emitted until
// the async task has been performed.
// Use share so that the signal does not effect a
// second subscription to the source.
let signal;
const audited = source
.audit(() => signal)
.mergeMap(value => asyncTask(value))
.share();
// Compose a signal from the audited observable to
// which the async task is applied.
// Use startWith so that the first emitted value
// passes the audit.
signal = audited
.mapTo(true)
.startWith(true);
audited.subscribe(value => console.log("output", value));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>