javascriptloopbackjsevent-stream

loopback event stream causing race condition


My overall goal is to watch my database for any changes and automatically broadcast those changes to any user connected to my website. The problem that I am seeing is that I have an action triggering a post request to my database and then the event stream is triggered at the same time because the model that I am watching has changed. Which results in the initial action fulfilling and the action triggered by the event stream is being interrupted before it is fulfilled.

this is the first action that is triggered to create a new blog post entry in my database

export const topicSubmit = (date, newTopic, newTopicBody, memberId, name) => {
    return {
        type: 'TOPIC_SUBMIT',
        payload: axios({
            method: 'post',
            url: `/api/blogPosts`,
            data: {
                "blogTitle": newTopic,
                "blogBody": newTopicBody,
                "date": date,
                "upVotes": 0,
                "numComments": 0,
                "voteNames": [],
                "memberId": memberId,
                "steamNameId": name
            }
        })
            .then(response => {
                return response.data
            })
            .catch(err => err)
    }
}

// this is the boot script that creates the change stream

var es = require('event-stream');
module.exports = function (app) {
    console.log('realtime boot script')
    var BlogPost = app.models.BlogPost;
    BlogPost.createChangeStream(function (err, changes) {
        changes.pipe(es.stringify()).pipe(process.stdout);
    });
}

// this is the event listener on my front end that will dispatch all
// changes made in my database to my front end

componentDidMount() {
        const { dispatch } = this.props;
          let urlToChangeStream = '/api/blogPosts/change-stream?_format=event-stream';
          let src = new EventSource(urlToChangeStream);
          src.addEventListener('data', function (msg) {
              let data = JSON.parse(msg.data);
          dispatch(liveChangeBlogs(data))
          });

I am expecting that the 'TOPIC_SUBMIT' action should return fulfilled before the 'liveChangeBlogs' action is dispatched by the event listener Here is the documentation that I found on loopback event stream https://loopback.io/doc/en/lb3/Realtime-server-sent-events.html


Solution

  • I ended up solving this issue using Redux Thunk, adding a setTimeout and a closure to my componentDidMount. The topicSubmit action and the boot script did not change. Not sure if the setTimeout is the proper way to go, but it was the only way I could think of to get around the race case.

     componentDidMount() {
            const { dispatch } = this.props;
            const newBlog = this.handleNewBlog;
            let urlToChangeStream = '/api/blogPosts/change-stream?_format=event-stream';
            let src = new EventSource(urlToChangeStream);
            src.addEventListener('data', function (msg) {
                newBlog(msg)
            });
    
            const newThread = this.handleNewThread;
            let urlToChangeStream2 = '/api/threads/change-stream?_format=event-stream';
            let src2 = new EventSource(urlToChangeStream2);
            src2.addEventListener('data', function (msg) {
                newThread(msg)
            });
            dispatch(getBlogs());
        }
    
      handleNewBlog(msg) {
            const { dispatch } = this.props;
            let data = JSON.parse(msg.data);
            if(data.data == undefined) {
                setTimeout(() => {
                    dispatch(getBlogs());
                }, 1000);
            } else {
                setTimeout(() => {
                    dispatch(liveChangeBlogs(data));
                }, 1000);
            }
        }
    
     handleNewThread(msg) {
            const { dispatch, viewingThreadId } = this.props;
            let data2 = JSON.parse(msg.data);
            console.log('data2: ', data2)
            if (data2.type == 'remove') {
                return dispatch(getThreadsById(viewingThreadId))
            }
            let id = data2.data.blogPostId
            setTimeout(() => {
                if (viewingThreadId === id) {
                    dispatch(getThreadsById(id));
                } else {
                    return
                }
            }, 1000);
        }