Because I have heard about the parallelism simulation as the main feature of the Gulp, I have expected that even if there is the async function inside the Gulp pipeline, the processing of the next file will begin before this async function complete the dealing with the previous file.
Assume that we have 2 files: "Foo.txt" and "Bar.txt" herewith the "Foo.txt" have entered in the Gulp pipeline first. I expected that the processing of "Bar.txt" will begin before the myAsyncFunction
will finish of the work with "Foo.txt".
const gulp = require('gulp');
const through = require('through2');
const PluginError = require('plugin-error');
// Define your async function that will be used inside the Transform stream
async function myAsyncFunction(data) {
console.log("Checkpoint 4");
// Simulate an asynchronous operation
await new Promise((resolve) => setTimeout(resolve, 5000));
// Manipulate the data
const transformedData = data.toString().toUpperCase();
console.log("Checkpoint 5");
return transformedData;
}
// Create a Transform stream using through2
const myTransformStream = through.obj(async function (file, encoding, callback) {
console.log("Checkpoint 1");
if (file.isNull()) {
console.log("Checkpoint 2-A");
// Pass through if null
return callback(null, file);
}
console.log("Checkpoint 2-B");
if (file.isBuffer()) {
console.log("Checkpoint 3");
try {
// Convert the file contents to string and pass it to the async function
const transformedData = await myAsyncFunction(file.contents.toString());
console.log("Checkpoint 6");
console.log("=============");
// Create a new file with the transformed data
const transformedFile = file.clone();
transformedFile.contents = Buffer.from(transformedData);
// Pass the transformed file to the next stream
this.push(transformedFile);
} catch (err) {
// Handle any errors that occur during the async operation
this.emit('error', new PluginError('gulp-my-task', err));
}
}
callback();
});
// Define your Gulp task that uses the Transform stream
gulp.task('my-task', function () {
return gulp.src('src/*.txt')
.pipe(myTransformStream)
.pipe(gulp.dest('dist'));
});
However I got the output:
Checkpoint 1
Checkpoint 2-B
Checkpoint 3
Checkpoint 4
Checkpoint 5
Checkpoint 6
=============
Checkpoint 1
Checkpoint 2-B
Checkpoint 3
Checkpoint 4
Checkpoint 5
Checkpoint 6
=============
It means the Gulp has process the first file until the Checkpoint 6
, than has began the dealing with the second one. It is not that I expected, I have expected something like
Checkpoint 1
Checkpoint 2-B
Checkpoint 3
Checkpoint 4
Checkpoint 1 // <- Begin the processing of the next file
Checkpoint 2-B
Checkpoint 3
Checkpoint 4
Checkpoint 5
Checkpoint 6
=============
Checkpoint 5
Checkpoint 6
=============
Please explain the theory.
The series vs parallel nature of your code has 100% to do with your async function implementation and through2
usage...and nothing to do with gulp.
Here is an example of 2 different (but very similar) pipelines using through2. In one case you can see our stream.Transform
is blocking (via await
) until some async work is finished before signalling to through2
that we are done (by calling callback()
). In the other case, we kick of some async work but DO NOT await
and tell through2
we are ready for more work (by calling callback()
).
As you can see from the code and output of the 2 cases, the behaviour you are seeing has nothing to do with gulp.
For explanation of series/parallel task construction in gulp with example, read on.
import through2 from 'through2';
import stream from 'node:stream';
const randMin = 1,
randMax = 10;
function rand (min, max) {
isNaN(min) && (min = randMin);
isNaN(max) && (max = randMax);
return Math.floor(Math.random()*(max-min+1))+min;
}
var counter = 0;
// note through2.obj is alias for through2({objectMode: true})
const streamTransformBlocking = through2.obj(async function (data, encoding, callback) {
var callIdx = ++counter,
timeout = rand()*100;
console.log(`BLOCKING: Started (timeout=${timeout}) `, callIdx, data);
// we could ditch the promise/await and just call the
// callback within the timeout, but I wanted to keep this
// as similar to your code as possible.
await new Promise((resolve) => setTimeout(function () {
console.log('BLOCKING: Finished ', callIdx, data);
resolve();
}, timeout));
// callback won't happen until the await is done
// which is why this runs in series
callback();
});
const streamTransformNotBlocking = through2.obj(async function (data, encoding, callback) {
var callIdx = ++counter,
timeout = rand()*100;
console.log(`NOBLOCK: Started (timeout=${timeout}) `, callIdx, data);
// kick off some async work to be completed later
// do NOT block here (no await)
setTimeout(function () {
console.log('NOBLOCK: Finished ', callIdx, data);
}, timeout);
// tell through2 we are done
// even though async work may not be completed yet
callback();
});
function run (processor) {
const readable = new stream.Readable({objectMode: true});
// setup pipe
readable.pipe(processor);
// write some data into the pipe
'abcdefghijklmnopqrs'.split('').forEach((data) => readable.push({value: data}));
readable.push(null); // done
}
console.log('Running with streamTransformBlocking');
run(streamTransformBlocking);
console.log('Running with streamTransformNotBlocking');
run(streamTransformNotBlocking);
Note everything happens in series.
$ node through2-serial.js
Running with streamTransformBlocking
BLOCKING: Started (timeout=1000) 1 { value: 'a' }
BLOCKING: Finished 1 { value: 'a' }
BLOCKING: Started (timeout=1000) 2 { value: 'b' }
BLOCKING: Finished 2 { value: 'b' }
BLOCKING: Started (timeout=300) 3 { value: 'c' }
BLOCKING: Finished 3 { value: 'c' }
BLOCKING: Started (timeout=1000) 4 { value: 'd' }
BLOCKING: Finished 4 { value: 'd' }
BLOCKING: Started (timeout=800) 5 { value: 'e' }
BLOCKING: Finished 5 { value: 'e' }
BLOCKING: Started (timeout=600) 6 { value: 'f' }
BLOCKING: Finished 6 { value: 'f' }
BLOCKING: Started (timeout=200) 7 { value: 'g' }
BLOCKING: Finished 7 { value: 'g' }
BLOCKING: Started (timeout=600) 8 { value: 'h' }
BLOCKING: Finished 8 { value: 'h' }
BLOCKING: Started (timeout=600) 9 { value: 'i' }
BLOCKING: Finished 9 { value: 'i' }
BLOCKING: Started (timeout=600) 10 { value: 'j' }
BLOCKING: Finished 10 { value: 'j' }
BLOCKING: Started (timeout=900) 11 { value: 'k' }
BLOCKING: Finished 11 { value: 'k' }
BLOCKING: Started (timeout=600) 12 { value: 'l' }
BLOCKING: Finished 12 { value: 'l' }
BLOCKING: Started (timeout=500) 13 { value: 'm' }
BLOCKING: Finished 13 { value: 'm' }
BLOCKING: Started (timeout=600) 14 { value: 'n' }
BLOCKING: Finished 14 { value: 'n' }
BLOCKING: Started (timeout=100) 15 { value: 'o' }
BLOCKING: Finished 15 { value: 'o' }
BLOCKING: Started (timeout=900) 16 { value: 'p' }
BLOCKING: Finished 16 { value: 'p' }
BLOCKING: Started (timeout=200) 17 { value: 'q' }
BLOCKING: Finished 17 { value: 'q' }
BLOCKING: Started (timeout=600) 18 { value: 'r' }
BLOCKING: Finished 18 { value: 'r' }
BLOCKING: Started (timeout=700) 19 { value: 's' }
BLOCKING: Finished 19 { value: 's' }
Note everything happens in "parallel" we use random timeouts to highlight how things can finish out of order even though they start in order.
$ node through2-serial.js
Running with streamTransformNotBlocking
NOBLOCK: Started (timeout=900) 1 { value: 'a' }
NOBLOCK: Started (timeout=800) 2 { value: 'b' }
NOBLOCK: Started (timeout=300) 3 { value: 'c' }
NOBLOCK: Started (timeout=800) 4 { value: 'd' }
NOBLOCK: Started (timeout=200) 5 { value: 'e' }
NOBLOCK: Started (timeout=1000) 6 { value: 'f' }
NOBLOCK: Started (timeout=1000) 7 { value: 'g' }
NOBLOCK: Started (timeout=900) 8 { value: 'h' }
NOBLOCK: Started (timeout=500) 9 { value: 'i' }
NOBLOCK: Started (timeout=500) 10 { value: 'j' }
NOBLOCK: Started (timeout=500) 11 { value: 'k' }
NOBLOCK: Started (timeout=1000) 12 { value: 'l' }
NOBLOCK: Started (timeout=700) 13 { value: 'm' }
NOBLOCK: Started (timeout=1000) 14 { value: 'n' }
NOBLOCK: Started (timeout=700) 15 { value: 'o' }
NOBLOCK: Started (timeout=400) 16 { value: 'p' }
NOBLOCK: Started (timeout=700) 17 { value: 'q' }
NOBLOCK: Started (timeout=500) 18 { value: 'r' }
NOBLOCK: Started (timeout=1000) 19 { value: 's' }
NOBLOCK: Finished 5 { value: 'e' }
NOBLOCK: Finished 3 { value: 'c' }
NOBLOCK: Finished 16 { value: 'p' }
NOBLOCK: Finished 9 { value: 'i' }
NOBLOCK: Finished 10 { value: 'j' }
NOBLOCK: Finished 11 { value: 'k' }
NOBLOCK: Finished 18 { value: 'r' }
NOBLOCK: Finished 13 { value: 'm' }
NOBLOCK: Finished 15 { value: 'o' }
NOBLOCK: Finished 17 { value: 'q' }
NOBLOCK: Finished 2 { value: 'b' }
NOBLOCK: Finished 4 { value: 'd' }
NOBLOCK: Finished 1 { value: 'a' }
NOBLOCK: Finished 8 { value: 'h' }
NOBLOCK: Finished 6 { value: 'f' }
NOBLOCK: Finished 7 { value: 'g' }
NOBLOCK: Finished 12 { value: 'l' }
NOBLOCK: Finished 14 { value: 'n' }
NOBLOCK: Finished 19 { value: 's' }
gulp has a parallel
and series
API you can use to coordinate running tasks based on dependent order (parallel for not dependent, series for dependent) and these can be composed together from individual tasks to make more complicated flows and job wirings.
In your code you don't use either of these APIs. You define a single gulp task pipeline with src
, pipe
and dest
. The construction of your Transform that is being piped to by gulp.src().pipe(transform)
is what causes each file in the pipeline to get processed one after the other (series). See previous section on through2
if this is still not clear.
src
takes a glob specifying input files and produces a stream of vinyl objects to be passed through the pipeline of transformations.
You may find it useful to take a look at how node streams work, in particular object streams. You may also want to look at backpressure in the context of node streams/pipelines which prevents upstream writers from sending more data if you are busy downstream to keep memory use under control.
By definition streams are serial in nature (one thing after the other). If you look at the source of gulp.src
you see it is using vinyl-fs.src
which starts with glob-stream
converting a glob (pattern) into individual paths and emitting them one by one.
So maybe the confusion here is you are thinking the stream API itself will consume a bunch of files in parallel.
The streaming will happen in serial and will not continue if the pipeline is blocked due to back-pressure.
You are await
ing the timeout and that means through2
doesn't get a callback and so as far as the pipeline knows you are still busy so you don't get any more work.
You can have your code "lie" and say you are done when you are not done so that more work comes in, but that sort of defeats the purpose of backpressure to contain memory usage when processing large amounts of data through pipelines. Or maybe in some scenarios it would make sense to have your stream transformer actually be coordinating multiple parallel workers and handing off work when any pool worker is free creating some parallelism in your pipeline within your code. Could make sense for certain types of workloads on multicore machines.
But back to the point...
Here is a sample gulpfile for you that demos the series/parallel APIs in gulp for task sequencing.
A task is just a function by the way. You don't need to use a string for a task name in gulp4 and it is preferred that you use vanilla functions and export them. Gulp will use the function name as the task name but you can override this for programmatically generated functions using displayName
.
import gulp from 'gulp';
const {parallel, series} = gulp;
const randMin = 1,
randMax = 10;
function rand (min, max) {
isNaN(min) && (min = randMin);
isNaN(max) && (max = randMax);
return Math.floor(Math.random()*(max-min+1))+min;
}
const makeTask = (function () {
var taskIdx = 0;
return function () {
var taskName = 'task'+(++taskIdx),
task = async function () {
var timeout = rand();
console.log(`Started: ${taskName} with random delay ${timeout}s `);
// Simulate an asynchronous operation
await new Promise((resolve) => setTimeout(resolve, timeout*1000));
console.log(`Finishing: ${taskName}`);
};
task.displayName = taskName;
return task;
};
})();
// the gulp series/parallel usage
export const myTask = series(
makeTask(), // task1
parallel(
makeTask(), // task2
makeTask(), // task3
makeTask() // task4
),
makeTask() // task5
);
Note with the above construction we expect task1 to always run to completion before we do task2/3/4/5. 2/3/4 all start at the same time and run in parallel. task5 only starts after 2/3/4 have all finished.
You can save the above as filename.js
and then npm install gulp
in your directory. You can then run gulp the following...
$ ./node_modules/.bin/gulp --gulpfile filename.js --tasks
[18:26:27] └─┬ myTask
[18:26:27] └─┬ <series>
[18:26:27] ├── task1
[18:26:27] ├─┬ <parallel>
[18:26:27] │ ├── task2
[18:26:27] │ ├── task3
[18:26:27] │ └── task4
[18:26:27] └── task5
or to run myTask you can do...
$ ./node_modules/.bin/gulp --gulpfile filename.js myTask
[18:22:54] Starting 'myTask'...
[18:22:54] Starting 'task1'...
Started: task1 with random delay 1s
Finishing: task1
[18:22:55] Finished 'task1' after 1 s
[18:22:55] Starting 'task2'...
[18:22:55] Starting 'task3'...
[18:22:55] Starting 'task4'...
Started: task2 with random delay 3s
Started: task3 with random delay 2s
Started: task4 with random delay 5s
Finishing: task3
[18:22:57] Finished 'task3' after 2.01 s
Finishing: task2
[18:22:58] Finished 'task2' after 3.01 s
Finishing: task4
[18:23:00] Finished 'task4' after 5 s
[18:23:00] Starting 'task5'...
Started: task5 with random delay 8s
Finishing: task5
[18:23:08] Finished 'task5' after 8.01 s
[18:23:08] Finished 'myTask' after 14 s