I am trying to implement subscriptions with redux-logic middleware. The idea is following: when data is fetched from server, to call callback for each subscriber passing fetched data as arguments.
// logic/subscriptions.js
const fetchLatestLogic = createLogic({
type: FETCH_LATEST_DATA,
latest: true,
process({getState, action}, dispatch, done) {
const {seriesType, nextUpdateTime} = action.payload;
const callbacks = getState()[seriesType][nextUpdateTime].callbacks
apiFetch(seriesType)
.then(data => {
callbacks.forEach(callback => callback(seriesType, data));
done()
})
}
})
const subscribeLogic = createLogic({
type: SUBSCRIPTIONS_SUBSCRIBE,
cancelType: SUBSCRIPTIONS_REMOVE,
process({getState, action, cancelled$}, dispatch) {
const {seriesType, nextUpdateTime, updateInterval, subscriberId, callback} = action.payload;
const interval = setInterval(() => {
dispatch(fetchLatestData(seriesType, nextUpdateTime))
}, updateInterval);
cancelled$.subscribe(() => {
clearInterval(interval)
})
}
})
// reducers/subscriptions.js
import update from 'immutability-helper';
update.extend('$autoArray', (value, object) => (object ? update(object, value) : update([], value)));
const initialState = {
'SERIESTYPE1': {}
'SERIESTYPE2': {}
}
// state modifications using 'immutable-helpers'
const serieAddSubscriberForTime = (seriesSubscriptions, time, subscriber) =>
update(seriesSubscriptions, {
[time]: {
$autoArray: {
$push: [subscriber]
}
}
});
// state modifications using 'immutable-helpers'
const serieRemoveSubscriberForTime = (seriesSubscriptions, subscriptionTime, subscriber) => {
const subscriptions = seriesSubscriptions[subscriptionTime].filter(s => s.subscriberId !== subscriber.subscriberId);
if (subscriptions.length === 0) {
return update(seriesSubscriptions, { $unset: [subscriptionTime] });
}
return { ...seriesSubscriptions, ...{ [subscriptionTime]: subscriptions }
};
export default function reducer(state = initialState, action) {
switch (action.type) {
case SUBSCRIPTIONS_SUBSCRIBE: {
const { seriesType, nextUpdateTime, subscriber} = action.payload;
const newSubscriptionAdded = serieAddSubscriberForTime(state[seriesType], nextUpdateTime, subscriber);
const oldSubscriptionRemoved = serieRemoveSubscriberForTime(state[seriesType], nextUpdateTime, subscriber);
return update(state, { [seriesType]: { ...oldSubscriptionRemoved, ...newSubscriptionAdded } });
}
default:
return state;
}
}
How would it be possible to cancel running interval for given subscriber only? *(Without dispatching intervalID to reducer and saving it in state?)
Because by just dispatching action
cancelType: SUBSCRIPTIONS_REMOVE
will remove all intervals for all subscriptions with my current implementation.
UPDATE: actually there is much more smooth way to do the cancellation logic.
cancelled$
is an observable, and the RxJS .subscribe() accepts three functions as arguments:
[onNext] (Function): Function to invoke for each element in the observable sequence.
[onError] (Function): Function to invoke upon exceptional termination of the observable sequence.
[onCompleted] (Function): Function to invoke upon graceful termination of the observable sequence.
so the argument of onNext
function is an emited value, and since in our case its the SUBSCRIPTIONS_REMOVE
action, we can access its payload and do the cancellation depending on that payload:
cancelled$.subscribe(cancellAction => {
if (cancellAction.payload.subscriberId === subscriberId &&
cancellAction.payload.seriesType === seriesType) {
clearTimeout(runningTimeout);
}
})