I am building a fastify server to upload/download files to s3, when I want to download a file I am able to get it using a node stream and return it, so I prevent loading the whole file in memory, I would like to do something like:
I have tried with piplines but I need to to check when the scan is done, and after I send the stream from s3 to the AV, the original stream is consumed, hence, I cannot send it to the client. do you guys have any suggestion or any gotcha here?
code I have right now:
const stream = body.transformToWebStream();
const antivirusPassthrough = app.avClient.passthrough();
const downloadPassthrough = new PassThrough();
antivirusPassthrough.once("error", (err) => {
throw err
});
antivirusPassthrough.once("scan-complete", (result) => {
const { isInfected } = result;
if (isInfected) {
throw new Errpr("File is infected");
}
});
pipeline(
stream,
antivirusPassthrough,
downloadPassthrough,
(err) => {
if (err) {
app.log.error(err);
}
}
);
return reply.send(downloadPassthrough);
To achieve your goal you need to find a thread off to save resources.
You may use a PromiseTransform
class in your Fastify server.
This class allows you to wait for a promise (like your virus scan)
to resolve before completing the stream.
Below is a demo of how to implement this using a random promise to simulate the virus scan:
const fs = require('fs');
const { Transform, PassThrough } = require('stream');
const { pipeline } = require('stream/promises');
// The class to wait for the promise to resolve before finishing the stream
class PromiseTransform extends Transform {
constructor (aPromise, options = {}) {
super(options);
this.aPromise = aPromise;
}
_transform (chunk, encoding, callback) {
this.push(chunk);
callback();
}
_flush (callback) {
// https://nodejs.org/api/stream.html#transform_flushcallback
this.aPromise //
.then(callback)
.catch(callback);
}
}
const app = require('fastify')({ logger: true });
app.get('/', async (request, reply) => {
// The S3 object is a stream
const sourceFile = fs.createReadStream(__filename);
// This is a random promise to simulate the virus scan
const slowRandomPromise = new Promise((resolve, reject) => {
setTimeout(() => {
if (Math.random() < 0.5) {
console.log('random promise failed');
reject(new Error('bad luck'));
return;
}
console.log('random promise success');
resolve();
}, 1_500);
});
const promiseTransform = new PromiseTransform(slowRandomPromise);
const downloadPassthrough = new PassThrough();
pipeline(
sourceFile, //
promiseTransform, //
downloadPassthrough,
).catch(() => {
console.log('pipeline failed, do the cleanup (delete s3 object etc..)');
});
reply.header('Content-Disposition', 'attachment; filename="qwe.js"');
return downloadPassthrough;
});
app.listen({ port: 8080 });
The _flush
method in the PromiseTransform
class is called when
there are no more chunks to be transformed.
It provides a way to perform any final processing before the stream is ended.
In this case, the _flush
method is used to wait for the provided promise (this.aPromise
)
to resolve before signaling that the stream has finished.
Note that the stream is sent to the client (the stream is flowing so we are not wasting server resources) BUT the response is destroyed if the promise rejects (the virus scan fails).
As result a node.js client will get the file under the hood:
const http = require('http');
function makeHttpRequest (url) {
http.get(url, (response) => {
const { statusCode } = response;
console.log('statusCode:', statusCode);
response.setEncoding('utf8');
response.on('data', (data) => {
// This is be called ether the virus scan fails or not
console.log('data:', data);
});
response.on('error', (err) => {
// This is be called if the virus scan fails
console.log('Response error:', err);
});
});
}
makeHttpRequest('http://localhost:8080/');
The browser will delete the downloaded the file if the virus scan fails:
This approach:
PromiseTransform
to not send the last data chunks to do not finalize the file on the client.I think this is a good trade-off because the case when the virus scan fails is rare IMHO.