javascriptreduxsubscriptioncancellationredux-logic

Redux-logic subscription cancel for given subscriber


I am trying to implement subscriptions with redux-logic middleware. The idea is following: when data is fetched from server, to call callback for each subscriber passing fetched data as arguments.

// logic/subscriptions.js

const fetchLatestLogic = createLogic({
  type: FETCH_LATEST_DATA,
  latest: true,

  process({getState, action}, dispatch, done) {
    const {seriesType, nextUpdateTime} = action.payload;
    const callbacks = getState()[seriesType][nextUpdateTime].callbacks
    apiFetch(seriesType)
      .then(data => {
        callbacks.forEach(callback => callback(seriesType, data));
           done()
        })
  }
})

const subscribeLogic = createLogic({
  type: SUBSCRIPTIONS_SUBSCRIBE,
  cancelType: SUBSCRIPTIONS_REMOVE,

  process({getState, action, cancelled$}, dispatch) {
    const {seriesType, nextUpdateTime, updateInterval, subscriberId, callback} = action.payload;
    const interval = setInterval(() => {
      dispatch(fetchLatestData(seriesType, nextUpdateTime))
    }, updateInterval);

    cancelled$.subscribe(() => {
        clearInterval(interval)
    })
 }
})

// reducers/subscriptions.js

import update from 'immutability-helper';

update.extend('$autoArray', (value, object) => (object ? update(object, value) : update([], value)));

const initialState = {
  'SERIESTYPE1': {}
  'SERIESTYPE2': {}
}

// state modifications using 'immutable-helpers'
const serieAddSubscriberForTime = (seriesSubscriptions, time, subscriber) =>
  update(seriesSubscriptions, {
    [time]: {
      $autoArray: {
        $push: [subscriber]
      }
    }
});

// state modifications using 'immutable-helpers'
const serieRemoveSubscriberForTime = (seriesSubscriptions, subscriptionTime, subscriber) => {
  const subscriptions = seriesSubscriptions[subscriptionTime].filter(s => s.subscriberId !== subscriber.subscriberId);
  if (subscriptions.length === 0) {
    return update(seriesSubscriptions, { $unset: [subscriptionTime] });
  }
  return { ...seriesSubscriptions, ...{ [subscriptionTime]: subscriptions } 
};  

export default function reducer(state = initialState, action) {
  switch (action.type) {
    case SUBSCRIPTIONS_SUBSCRIBE: {
        const { seriesType, nextUpdateTime, subscriber} = action.payload;
        const newSubscriptionAdded = serieAddSubscriberForTime(state[seriesType], nextUpdateTime, subscriber);
        const oldSubscriptionRemoved = serieRemoveSubscriberForTime(state[seriesType], nextUpdateTime, subscriber);
        return update(state, { [seriesType]: { ...oldSubscriptionRemoved, ...newSubscriptionAdded } });
    }
    default:
      return state;
  }
}

How would it be possible to cancel running interval for given subscriber only? *(Without dispatching intervalID to reducer and saving it in state?)

Because by just dispatching action

cancelType: SUBSCRIPTIONS_REMOVE

will remove all intervals for all subscriptions with my current implementation.


Solution

  • UPDATE: actually there is much more smooth way to do the cancellation logic.

    cancelled$
    

    is an observable, and the RxJS .subscribe() accepts three functions as arguments:

    [onNext] (Function): Function to invoke for each element in the observable sequence.
    [onError] (Function): Function to invoke upon exceptional termination of the observable sequence.
    [onCompleted] (Function): Function to invoke upon graceful termination of the observable sequence.
    

    so the argument of onNext function is an emited value, and since in our case its the SUBSCRIPTIONS_REMOVE action, we can access its payload and do the cancellation depending on that payload:

    cancelled$.subscribe(cancellAction => {
      if (cancellAction.payload.subscriberId === subscriberId &&
          cancellAction.payload.seriesType === seriesType) {
         clearTimeout(runningTimeout);
      }
    })