javascriptnode.jsgeneratorbluebirdco

Node Coroutines Parallel Flow Control with Generators and Promise


I am trying to mimic the control flow of the async.js library with coroutines and promises,using both co and bluebird.js but I am running into some issues. My code is as follows, although this is mostly psuedo code, because actual code would've been very long, I can add the acutal code later on if needed...

 co(function*(){  
    var re = /^\w+([\.-]?\w+)*@\w+([\.-]?\w+)*(\.\w{2,3})+$/;
    var doc = yield People.findOne({email:  req.body.email}).exec();

    var filePath = path.join(__dirname, '../email-template.html');                         
    var html = yield fs.readFileAsync(filePath,'utf8');
    var emailsToSend = [];
    var emailStatuses = [];
    var validEmails = [];

    //make sure email is ok
    req.body.messagesToSend.forEach(function(message){
      if(message.email != null && re.test(message.email))
      { 
        validEmails.push(message);
      }else{
        // mark it as failed...
        emailStatuses.push({success : "FAILURE", email : message.email}); 
      }
    });

    yield Promise.all( validEmails, Promise.coroutine(function * (message){
      try{
        var person = yield People.findOne({email:  message.email }).exec();

        if(person){ 
          emailStatuses.push({status : "Already exists", email : message.email});
        }else{          
          emailsToSend.push({ email: message.email, message: message.text });
          }              
        }// else
      }catch(err){
        emailStatuses.push({status : "FAILURE", email : message.email}); 
      }//
    }));

    if( emailsToSend.length === 0){
      // no valid emails to process so just return              
      return res.status(200).json(emailStatuses);                     
    }// if no emails to send
    else{
      yield Promise.all(emailsToSend, Promise.coroutine(function * (emailMessage){
        try{                   
          var newInvite =  new Invite();  
          newInvite.email = emailMessage.email;
          newInvite.message = emailMessage.message;
          var invite = yield Invite.save();

          // now try to send the email
          var mailHTMl = html.replace( "{{EMAIL_PLACEHOLDER}}", req.body.registeredEmail);  

          var sendmail              = new emailProvider.Email();
          sendmail.setTos(emailMessage.email);
          sendmail.setFrom(common.DEF_EMAIL_SENDER);
          sendmail.setSubject(common.EMAIL_SUBJECT);
          sendmail.setHtml(mailHTMl);

          var successMail = yield emailProvider.send(sendmail);
          emailStatuses.push({status : "SUCCESS", email : emailMessage.email});
        }catch(err){
          //additional logging here which ive removed for purposes of brevity
          emailStatuses.push({status : "FAILURE", email : emailMessage.email});
        }           
      }));

      return res.status(200).json(emailStatuses);           
    }

  }).catch(function(err){
    //additional logging here which ive removed for purposes of brevity
    return res.status(500)
  });

The issue Im having is with the Promise.all, if i pass in an array, it only seems to process the first element, even though there is no rejection of the promise or any type of error.

This code works, if I use Promise.each, but then it is executed serially. What I want to achieve is basically have an async series with 2 async.foreach which will execute one after another and process each array item in parallel, but process each array sequentially, kind of like below:

async.series([
  async.foreach
  async.foreach
]);

However, I am not sure what I'm missing here in order to get it to execute in parallel, because it seems to work fine now if I use Promise.each and get serial execution for each array item.


Solution

  • So there are basically 2 ways to make this work, the first solution is to use the original code and just use Promise.map, which I'm not sure if it executes in parallel, but basically will not stop at the first array element.

    The second is a fairly simple change of mapping the array values to coroutine functions, and then doing a Promise.all on those as shown below:

    Although, I must note, this is visibly slower than using async.js. It would be helpful if anyone could explain why ?

      co(function*(){  
        var re = /^\w+([\.-]?\w+)*@\w+([\.-]?\w+)*(\.\w{2,3})+$/;
        var doc = yield People.findOne({email:  req.body.email}).exec();
    
        var filePath = path.join(__dirname, '../email-template.html');                         
        var html = yield fs.readFileAsync(filePath,'utf8');
        var emailsToSend = [];
        var emailStatuses = [];
        var validEmails = [];
    
        //make sure email is ok
        req.body.messagesToSend.forEach(function(message){
          if(message.email != null && re.test(message.email))
          { 
            validEmails.push(message);
          }else{
            // mark it as failed...
            emailStatuses.push({success : "FAILURE", email : message.email}); 
          }
        });
    
        //yield Promise.all( validEmails, Promise.coroutine(function * (message){
        var firstPromises = validEmails.map(Promise.coroutine(function * (message){  
          try{
            var person = yield People.findOne({email:  message.email }).exec();
    
            if(person){ 
              emailStatuses.push({status : "Already exists", email : message.email});
            }else{          
              emailsToSend.push({ email: message.email, message: message.text });
              }              
            }// else
          }catch(err){
            emailStatuses.push({status : "FAILURE", email : message.email}); 
          }//
        }));
    
        yield Promise.all(firstPromises);
    
        if( emailsToSend.length === 0){
          // no valid emails to process so just return              
          return res.status(200).json(emailStatuses);                     
        }// if no emails to send
        else{
          //yield Promise.all(emailsToSend, Promise.coroutine(function * (emailMessage){
          var secondPromises = emailsToSend.map( Promise.coroutine(function * (emailMessage){
            try{                   
              var newInvite =  new Invite();  
              newInvite.email = emailMessage.email;
              newInvite.message = emailMessage.message;
              var invite = yield Invite.save();
    
              // now try to send the email
              var mailHTMl = html.replace( "{{EMAIL_PLACEHOLDER}}", req.body.registeredEmail);  
    
              var sendmail              = new emailProvider.Email();
              sendmail.setTos(emailMessage.email);
              sendmail.setFrom(common.DEF_EMAIL_SENDER);
              sendmail.setSubject(common.EMAIL_SUBJECT);
              sendmail.setHtml(mailHTMl);
    
              var successMail = yield emailProvider.send(sendmail);
              emailStatuses.push({status : "SUCCESS", email : emailMessage.email});
            }catch(err){
              //additional logging here which ive removed for purposes of brevity
              emailStatuses.push({status : "FAILURE", email : emailMessage.email});
            }           
          }));
    
          yield Promise.all(secondPromises);
    
          return res.status(200).json(emailStatuses);           
        }
    
      }).catch(function(err){
        //additional logging here which ive removed for purposes of brevity
        return res.status(500)
      });