For example, this code this jsbin example:
const pingEpic = action$ =>
action$.ofType(PING)
.delay(1000) // Asynchronously wait 1000ms then continue
.mapTo({ type: PONG })
.takeUntil(action$.ofType(CANCEL));
When I use takeUntil
as above, after dispatching the CANCEL
action and a delay of 1 second, the action never fires again. Why?
The problem is a subtle but critical misunderstanding of how RxJS works--but fear not, this is very common.
So given your example:
const pingEpic = action$ =>
action$.ofType(PING)
.delay(1000)
.mapTo({ type: PONG })
.takeUntil(action$.ofType(CANCEL));
This epic's behavior can be described as filtering out all actions that don't match type PING
. When an action matches, wait 1000ms and then map that action to a different action { type: PONG }
, which will be emitted and then dispatched by redux-observable. If at any time while the app is running someone dispatches an action of type CANCEL
, then unsubscribe from the source, which means this entire chain will unsubscribe, terminating the epic.
It might be helpful to see how this looks if you did it imperatively:
const pingEpic = action$ => {
return new Rx.Observable(observer => {
console.log('[pingEpic] subscribe');
let timer;
const subscription = action$.subscribe(action => {
console.log('[pingEpic] received action: ' + action.type);
// When anyone dispatches CANCEL, we stop listening entirely!
if (action.type === CANCEL) {
observer.complete();
return;
}
if (action.type === PING) {
timer = setTimeout(() => {
const output = { type: PONG };
observer.next(output);
}, 1000);
}
});
return {
unsubscribe() {
console.log('[pingEpic] unsubscribe');
clearTimeout(timer);
subscription.unsubscribe();
}
};
});
};
You can run this code with a fake store here: http://jsbin.com/zeqasih/edit?js,console
Instead, what you usually want to do is insulate the subscriber chain you want to be cancellable from the top-level chain that is suppose to listen indefinitely. Although your example (amended from the docs) is contrived, let's run through it first.
Here we use the mergeMap operator to let us take the matched action and map to another, separate observable chain.
Demo: http://jsbin.com/nofato/edit?js,output
const pingEpic = action$ =>
action$.ofType(PING)
.mergeMap(() =>
Observable.timer(1000)
.takeUntil(action$.ofType(CANCEL))
.mapTo({ type: PONG })
);
We use Observable.timer
to wait 1000ms, then map the value it emits (which happens be to be the number zero, but that's not important here) to our PONG
action. We also say we want to "take" from the timer source until either it completes normally or we receive an action of type CANCEL
.
This isolates the chains because mergeMap
will continue to subscribe to the observable you return until it errors or completes. But when that happens, it doesn't itself stop subscribing to the source you applied it to; the action$.ofType(PING)
in this example.
A more real-world example is in the redux-observable docs in the Cancellation section
Here we placed the .takeUntil() after inside our .mergeMap(), but after our AJAX call; this is important because we want to cancel only the AJAX request, not stop the Epic from listening for any future actions.
const fetchUserEpic = action$ =>
action$.ofType(FETCH_USER)
.mergeMap(action =>
ajax.getJSON(`/api/users/${action.payload}`)
.map(fetchUserFulfilled)
.takeUntil(action$.ofType(FETCH_USER_CANCELLED))
);
This all may sound confusing, but like most powerful things, once you get it, it'll become intuitive. Ben Lesh does an excellent job of explaining how Observables work in his recent talk, including discussing how operators are a chain of Observables and even about isolating subscriber chains. Even though the talk is at AngularConnect, it's not Angular specific.
As an aside, it's important to note that your epics do not swallow or otherwise prevent actions from reaching the reducers, e.g. when you map an incoming action to another, different action. In fact, when your epic receives an action, it has already been through your reducers. Think of your epics as sidecar processes that listens to a stream of your apps actions, but can't prevent normal redux things from happening, it can only emit new actions.