node.jsgulpnode-streams

The Gulp will wait for the async function completion inside the pipeline before pass the next file?


Because I have heard about the parallelism simulation as the main feature of the Gulp, I have expected that even if there is the async function inside the Gulp pipeline, the processing of the next file will begin before this async function complete the dealing with the previous file.

Assume that we have 2 files: "Foo.txt" and "Bar.txt" herewith the "Foo.txt" have entered in the Gulp pipeline first. I expected that the processing of "Bar.txt" will begin before the myAsyncFunction will finish of the work with "Foo.txt".

const gulp = require('gulp');
const through = require('through2');
const PluginError = require('plugin-error');

// Define your async function that will be used inside the Transform stream
async function myAsyncFunction(data) {

  console.log("Checkpoint 4");
  // Simulate an asynchronous operation
  await new Promise((resolve) => setTimeout(resolve, 5000));

  // Manipulate the data
  const transformedData = data.toString().toUpperCase();

  console.log("Checkpoint 5");

  return transformedData;
}

// Create a Transform stream using through2
const myTransformStream = through.obj(async function (file, encoding, callback) {

  console.log("Checkpoint 1");

  if (file.isNull()) {
    console.log("Checkpoint 2-A");
    // Pass through if null
    return callback(null, file);
  }

  console.log("Checkpoint 2-B");

  if (file.isBuffer()) {

    console.log("Checkpoint 3");

    try {
      // Convert the file contents to string and pass it to the async function
      const transformedData = await myAsyncFunction(file.contents.toString());

      console.log("Checkpoint 6");
      console.log("=============");

      // Create a new file with the transformed data
      const transformedFile = file.clone();
      transformedFile.contents = Buffer.from(transformedData);

      // Pass the transformed file to the next stream
      this.push(transformedFile);
    } catch (err) {
      // Handle any errors that occur during the async operation
      this.emit('error', new PluginError('gulp-my-task', err));
    }
  }

  callback();
});

// Define your Gulp task that uses the Transform stream
gulp.task('my-task', function () {
  return gulp.src('src/*.txt')
      .pipe(myTransformStream)
      .pipe(gulp.dest('dist'));
});

However I got the output:

Checkpoint 1  
Checkpoint 2-B
Checkpoint 3  
Checkpoint 4  
Checkpoint 5
Checkpoint 6  
============= 
Checkpoint 1  
Checkpoint 2-B
Checkpoint 3  
Checkpoint 4  
Checkpoint 5
Checkpoint 6
=============

It means the Gulp has process the first file until the Checkpoint 6, than has began the dealing with the second one. It is not that I expected, I have expected something like

Checkpoint 1  
Checkpoint 2-B
Checkpoint 3  
Checkpoint 4
Checkpoint 1  // <- Begin the processing of the next file  
Checkpoint 2-B
Checkpoint 3  
Checkpoint 4    
Checkpoint 5
Checkpoint 6  
============= 
Checkpoint 5
Checkpoint 6
=============

Please explain the theory.


Solution

  • The series vs parallel nature of your code has 100% to do with your async function implementation and through2 usage...and nothing to do with gulp.

    through2 series vs parallel

    Here is an example of 2 different (but very similar) pipelines using through2. In one case you can see our stream.Transform is blocking (via await) until some async work is finished before signalling to through2 that we are done (by calling callback()). In the other case, we kick of some async work but DO NOT await and tell through2 we are ready for more work (by calling callback()).

    As you can see from the code and output of the 2 cases, the behaviour you are seeing has nothing to do with gulp.

    For explanation of series/parallel task construction in gulp with example, read on.

    
    
    import through2 from 'through2';
    import stream from 'node:stream';
    
    const randMin = 1,
        randMax = 10;
    
    function rand (min, max) { 
        isNaN(min) && (min = randMin);
        isNaN(max) && (max = randMax);
        return Math.floor(Math.random()*(max-min+1))+min;
    }
    
    var counter = 0;
    
    // note through2.obj is alias for through2({objectMode: true})
    const streamTransformBlocking = through2.obj(async function (data, encoding, callback) {
        var callIdx = ++counter,
            timeout = rand()*100;
    
        console.log(`BLOCKING: Started (timeout=${timeout}) `, callIdx, data);
        // we could ditch the promise/await and just call the
        // callback within the timeout, but I wanted to keep this 
        // as similar to your code as possible.
        await new Promise((resolve) => setTimeout(function () {
            console.log('BLOCKING: Finished ', callIdx, data);
            resolve();
        }, timeout));
        
        // callback won't happen until the await is done
        // which is why this runs in series
        callback();
    });
    
    const streamTransformNotBlocking = through2.obj(async function (data, encoding, callback) {
        var callIdx = ++counter,
            timeout = rand()*100;
    
        console.log(`NOBLOCK: Started (timeout=${timeout}) `, callIdx, data);
        // kick off some async work to be completed later
        // do NOT block here (no await)
        setTimeout(function () {
            console.log('NOBLOCK: Finished ', callIdx, data);
        }, timeout);
        
        // tell through2 we are done
        // even though async work may not be completed yet
        callback();
    });
    
    function run (processor) {
        const readable = new stream.Readable({objectMode: true});
    
        // setup pipe
        readable.pipe(processor);
    
        // write some data into the pipe
        'abcdefghijklmnopqrs'.split('').forEach((data) => readable.push({value: data}));
        readable.push(null); // done
    }
    
    console.log('Running with streamTransformBlocking');
    run(streamTransformBlocking);
    
    console.log('Running with streamTransformNotBlocking');
    run(streamTransformNotBlocking);
    
    
    

    blocking case output

    Note everything happens in series.

    $ node through2-serial.js
    Running with streamTransformBlocking
    BLOCKING: Started (timeout=1000)  1 { value: 'a' }
    BLOCKING: Finished  1 { value: 'a' }
    BLOCKING: Started (timeout=1000)  2 { value: 'b' }
    BLOCKING: Finished  2 { value: 'b' }
    BLOCKING: Started (timeout=300)  3 { value: 'c' }
    BLOCKING: Finished  3 { value: 'c' }
    BLOCKING: Started (timeout=1000)  4 { value: 'd' }
    BLOCKING: Finished  4 { value: 'd' }
    BLOCKING: Started (timeout=800)  5 { value: 'e' }
    BLOCKING: Finished  5 { value: 'e' }
    BLOCKING: Started (timeout=600)  6 { value: 'f' }
    BLOCKING: Finished  6 { value: 'f' }
    BLOCKING: Started (timeout=200)  7 { value: 'g' }
    BLOCKING: Finished  7 { value: 'g' }
    BLOCKING: Started (timeout=600)  8 { value: 'h' }
    BLOCKING: Finished  8 { value: 'h' }
    BLOCKING: Started (timeout=600)  9 { value: 'i' }
    BLOCKING: Finished  9 { value: 'i' }
    BLOCKING: Started (timeout=600)  10 { value: 'j' }
    BLOCKING: Finished  10 { value: 'j' }
    BLOCKING: Started (timeout=900)  11 { value: 'k' }
    BLOCKING: Finished  11 { value: 'k' }
    BLOCKING: Started (timeout=600)  12 { value: 'l' }
    BLOCKING: Finished  12 { value: 'l' }
    BLOCKING: Started (timeout=500)  13 { value: 'm' }
    BLOCKING: Finished  13 { value: 'm' }
    BLOCKING: Started (timeout=600)  14 { value: 'n' }
    BLOCKING: Finished  14 { value: 'n' }
    BLOCKING: Started (timeout=100)  15 { value: 'o' }
    BLOCKING: Finished  15 { value: 'o' }
    BLOCKING: Started (timeout=900)  16 { value: 'p' }
    BLOCKING: Finished  16 { value: 'p' }
    BLOCKING: Started (timeout=200)  17 { value: 'q' }
    BLOCKING: Finished  17 { value: 'q' }
    BLOCKING: Started (timeout=600)  18 { value: 'r' }
    BLOCKING: Finished  18 { value: 'r' }
    BLOCKING: Started (timeout=700)  19 { value: 's' }
    BLOCKING: Finished  19 { value: 's' }
    

    Non-blocking case output

    Note everything happens in "parallel" we use random timeouts to highlight how things can finish out of order even though they start in order.

    $ node through2-serial.js
    Running with streamTransformNotBlocking
    NOBLOCK: Started (timeout=900)  1 { value: 'a' }
    NOBLOCK: Started (timeout=800)  2 { value: 'b' }
    NOBLOCK: Started (timeout=300)  3 { value: 'c' }
    NOBLOCK: Started (timeout=800)  4 { value: 'd' }
    NOBLOCK: Started (timeout=200)  5 { value: 'e' }
    NOBLOCK: Started (timeout=1000)  6 { value: 'f' }
    NOBLOCK: Started (timeout=1000)  7 { value: 'g' }
    NOBLOCK: Started (timeout=900)  8 { value: 'h' }
    NOBLOCK: Started (timeout=500)  9 { value: 'i' }
    NOBLOCK: Started (timeout=500)  10 { value: 'j' }
    NOBLOCK: Started (timeout=500)  11 { value: 'k' }
    NOBLOCK: Started (timeout=1000)  12 { value: 'l' }
    NOBLOCK: Started (timeout=700)  13 { value: 'm' }
    NOBLOCK: Started (timeout=1000)  14 { value: 'n' }
    NOBLOCK: Started (timeout=700)  15 { value: 'o' }
    NOBLOCK: Started (timeout=400)  16 { value: 'p' }
    NOBLOCK: Started (timeout=700)  17 { value: 'q' }
    NOBLOCK: Started (timeout=500)  18 { value: 'r' }
    NOBLOCK: Started (timeout=1000)  19 { value: 's' }
    NOBLOCK: Finished  5 { value: 'e' }
    NOBLOCK: Finished  3 { value: 'c' }
    NOBLOCK: Finished  16 { value: 'p' }
    NOBLOCK: Finished  9 { value: 'i' }
    NOBLOCK: Finished  10 { value: 'j' }
    NOBLOCK: Finished  11 { value: 'k' }
    NOBLOCK: Finished  18 { value: 'r' }
    NOBLOCK: Finished  13 { value: 'm' }
    NOBLOCK: Finished  15 { value: 'o' }
    NOBLOCK: Finished  17 { value: 'q' }
    NOBLOCK: Finished  2 { value: 'b' }
    NOBLOCK: Finished  4 { value: 'd' }
    NOBLOCK: Finished  1 { value: 'a' }
    NOBLOCK: Finished  8 { value: 'h' }
    NOBLOCK: Finished  6 { value: 'f' }
    NOBLOCK: Finished  7 { value: 'g' }
    NOBLOCK: Finished  12 { value: 'l' }
    NOBLOCK: Finished  14 { value: 'n' }
    NOBLOCK: Finished  19 { value: 's' }
    

    gulp

    gulp has a parallel and series API you can use to coordinate running tasks based on dependent order (parallel for not dependent, series for dependent) and these can be composed together from individual tasks to make more complicated flows and job wirings.

    In your code you don't use either of these APIs. You define a single gulp task pipeline with src, pipe and dest. The construction of your Transform that is being piped to by gulp.src().pipe(transform) is what causes each file in the pipeline to get processed one after the other (series). See previous section on through2 if this is still not clear.

    src takes a glob specifying input files and produces a stream of vinyl objects to be passed through the pipeline of transformations.

    You may find it useful to take a look at how node streams work, in particular object streams. You may also want to look at backpressure in the context of node streams/pipelines which prevents upstream writers from sending more data if you are busy downstream to keep memory use under control.

    By definition streams are serial in nature (one thing after the other). If you look at the source of gulp.src you see it is using vinyl-fs.src which starts with glob-stream converting a glob (pattern) into individual paths and emitting them one by one.

    So maybe the confusion here is you are thinking the stream API itself will consume a bunch of files in parallel.

    The streaming will happen in serial and will not continue if the pipeline is blocked due to back-pressure.

    You are awaiting the timeout and that means through2 doesn't get a callback and so as far as the pipeline knows you are still busy so you don't get any more work.

    You can have your code "lie" and say you are done when you are not done so that more work comes in, but that sort of defeats the purpose of backpressure to contain memory usage when processing large amounts of data through pipelines. Or maybe in some scenarios it would make sense to have your stream transformer actually be coordinating multiple parallel workers and handing off work when any pool worker is free creating some parallelism in your pipeline within your code. Could make sense for certain types of workloads on multicore machines.

    But back to the point...

    Here is a sample gulpfile for you that demos the series/parallel APIs in gulp for task sequencing.

    A task is just a function by the way. You don't need to use a string for a task name in gulp4 and it is preferred that you use vanilla functions and export them. Gulp will use the function name as the task name but you can override this for programmatically generated functions using displayName.

    
    import gulp from 'gulp';
    
    const {parallel, series} = gulp;
    
    const randMin = 1,
        randMax = 10;
    
    function rand (min, max) { 
        isNaN(min) && (min = randMin);
        isNaN(max) && (max = randMax);
        return Math.floor(Math.random()*(max-min+1))+min;
    }
    
    
    const makeTask = (function () {
        var taskIdx = 0;
        return function () {
            var taskName = 'task'+(++taskIdx),
                task = async function () {
                    var timeout = rand();
                    console.log(`Started: ${taskName} with random delay ${timeout}s `);
                    // Simulate an asynchronous operation
                    await new Promise((resolve) => setTimeout(resolve, timeout*1000));
                    console.log(`Finishing: ${taskName}`);
                };
            task.displayName = taskName;
            return task;
        };   
    })();
    
    // the gulp series/parallel usage
    export const myTask = series(
        makeTask(),     // task1
        parallel(
            makeTask(), // task2
            makeTask(), // task3
            makeTask()  // task4
        ), 
        makeTask()      // task5
    );
    
    
    

    Note with the above construction we expect task1 to always run to completion before we do task2/3/4/5. 2/3/4 all start at the same time and run in parallel. task5 only starts after 2/3/4 have all finished.

    You can save the above as filename.js and then npm install gulp in your directory. You can then run gulp the following...

    $ ./node_modules/.bin/gulp --gulpfile filename.js --tasks
    [18:26:27] └─┬ myTask
    [18:26:27]   └─┬ <series>
    [18:26:27]     ├── task1
    [18:26:27]     ├─┬ <parallel>
    [18:26:27]     │ ├── task2
    [18:26:27]     │ ├── task3
    [18:26:27]     │ └── task4
    [18:26:27]     └── task5
    
    

    or to run myTask you can do...

    $ ./node_modules/.bin/gulp --gulpfile filename.js myTask
    [18:22:54] Starting 'myTask'...
    [18:22:54] Starting 'task1'...
    Started: task1 with random delay 1s
    Finishing: task1
    [18:22:55] Finished 'task1' after 1 s
    [18:22:55] Starting 'task2'...
    [18:22:55] Starting 'task3'...
    [18:22:55] Starting 'task4'...
    Started: task2 with random delay 3s
    Started: task3 with random delay 2s
    Started: task4 with random delay 5s
    Finishing: task3
    [18:22:57] Finished 'task3' after 2.01 s
    Finishing: task2
    [18:22:58] Finished 'task2' after 3.01 s
    Finishing: task4
    [18:23:00] Finished 'task4' after 5 s
    [18:23:00] Starting 'task5'...
    Started: task5 with random delay 8s
    Finishing: task5
    [18:23:08] Finished 'task5' after 8.01 s
    [18:23:08] Finished 'myTask' after 14 s