I'm pretty new to Reactive Programming but already in love. However it is still hard to switch my brain to it. I'm trying to follow all recommendations as "Avoid using subjects" and "Avoid impure functions" and of course "Avoid imperative code."
What I'm finding hard to achieve is simple cross modules communications where one module can register "action"/observable and the other could subscribe and react to it. A simple message bus will probably work but this will enforce the usage of Subjects and imperative code style which I'm trying to avoid.
So here is a simple starting point I'm playing with:
// some sandbox
class Api {
constructor() {
this.actions = {};
}
registerAction(actionName, action) {
// I guess this part will have to be changed
this.actions[actionName] = action.publishReplay(10).refCount();
//this.actions[actionName].connect();
}
getAction(actionName) {
return this.actions[actionName];
}
}
const api = new Api();
// -------------------------------------------------------------------
// module 1
let myAction = Rx.Observable.create((obs) => {
console.log("EXECUTING");
obs.next("42 " + Date.now());
obs.complete();
});
api.registerAction("myAction", myAction);
let myTrigger = Rx.Observable.interval(1000).take(2);
let executedAction = myTrigger
.flatMap(x => api.getAction("myAction"))
.subscribe(
(x) => { console.log(`executed action: ${x}`); },
(e) => {},
() => { console.log("completed");});
// -------------------------------------------------------------------
// module 2
api.getAction("myAction")
.subscribe(
(x) => { console.log(`SECOND executed action: ${x}`); },
(e) => {},
() => { console.log("SECOND completed");});
So currently at the moment the second module subscribes it "triggers" the "myAction" Observable. And in a real life scenario that could be an ajax call. Is there any way to make all subscribers delay/wait until "myAction" is called properly from module1? And again - its easy to do it using subjects but I'm trying to do it following recommended practices.
So here is a much simpler solution than the one I thought. With simply using 2 observables. Similar effect could be achieved with schedulers and subscribeOn.
// some sandbox
class Action {
constructor(name, observable) {
this.name = name;
this.observable = observable;
this.replay = new Rx.ReplaySubject(10);
}
}
function actionFactory(action, param) {
return Rx.Observable.create(obs => {
action.observable
.subscribe(x => {
obs.next(x);
action.replay.next(x);
}, (e) => {}, () => obs.complete);
});
}
class Api {
constructor() {
this.actions = {};
}
registerAction(actionName, action) {
let generatedAction = new Action(actionName, action);
this.actions[actionName] = generatedAction;
return actionFactory.bind(null, generatedAction);
}
getAction(actionName) {
return this.actions[actionName].replay;
}
}
const api = new Api();
// -------------------------------------------------------------------
// module 1
let myAction = Rx.Observable.create((obs) => {
obs.next("42 " + Date.now());
obs.complete();
});
let myRegisteredAction$ = api.registerAction("myAction", myAction);
let myTrigger = Rx.Observable.interval(1000).take(1).delay(1000);
let executedAction = myTrigger
.map(x => { return { someValue: x} })
.concatMap(x => myRegisteredAction$(x))
.subscribe(
(x) => { console.log(`MAIN: ${x}`); },
(e) => { console.log("error", e)},
() => { console.log("MAIN: completed");});
// -------------------------------------------------------------------
// module 2
var sub = api.getAction("myAction")
.subscribe(
(x) => { console.log(`SECOND: ${x}`); },
(e) => {console.log("error : " + e)},
() => { console.log("SECOND: completed");});