javascriptnode.jsffmpegffprobe

Using NodeJS ReadStream or Buffer with ffmpeg and ffprobe hangs


I'm using ffmpeg to convert some audio files in NodeJS This function takes as input a ReadStream from a media file or a Buffer with the bytes of the media file and convert it:

let convertStream = function (streamOrBuffer, opath, sample_rate, params) {
      var self = this;
      // defaults
      var loglevel = self.logger.isDebug() ? 'debug' : 'error';
      return new Promise(async (resolve, reject) => {
        var args = [];
        args = args.concat([
          '-y',
          '-loglevel', loglevel,
          '-v', 'quiet',
          '-i', 'pipe:'
        ]);
        if (sample_rate) { // valid sample rate
          args.push('-ar');
          args.push(sample_rate);
        }
        // output
        args = args.concat([
          opath
        ]);
        const opts = self._options.child_process;
        self.logger.debug("convertStream options: %s", args.join(" "));
        const ffmpeg = cp.spawn('ffmpeg', args, opts)
          .on('message', msg => self.logger.info(msg))
          .on('error', reject)
          .on('exit', (code, signal) => {
            self.logger.debug("convertStream [exit] code:%s signal:%s", code, signal);
          })
          .on('close', () => {
            self.logger.debug("downloadHLS [close]");
            return resolve();
          });
        ffmpeg.stdin.on("error", (err) => {
          if (err.code === "EPIPE") {

            // ignore EPIPE error
            // throws sometimes a EPIPE error, ignore as long ffmpeg exit with code 0

          } else {
            self.logger.warn("convertStream ffmepg stdin error", err);
          }
        });
        if (streamOrBuffer instanceof Buffer) { // Buffer
          ffmpeg.stdin.write(streamOrBuffer);
        } else { // ReadStream
          ffmpeg.stdin.write((await stream2Buffer(streamOrBuffer)));
        }
        ffmpeg.stdin.end();
      });
    }//convertStream

where stream2Buffer looks like:

let stream2Buffer = function (stream) {
      return new Promise((resolve, reject) => {
        const _buf = [];
        stream.on("data", (chunk) => _buf.push(chunk));
        stream.on("end", () => resolve(Buffer.concat(_buf)));
        stream.on("error", (err) => reject(err));
      });
    }//stream2Buffer

This works ok using both a ReadStream:

var readStream = fs.createReadStream(mediaPath);
await convertStream (readStream, 'out.mp4', '44100', {})

that using a Buffer object like

const buff = await request.get(mediaUrl, {});
await convertStream (buff, 'out.mp4', '44100', {})      

so I would expect to work when using ffprobe:

let probeStream = function (streamOrBuffer, sample_rate, params) {
      var self = this;
      return new Promise(async (resolve, reject) => {
        var loglevel = self.logger.isDebug() ? 'debug' : 'error';
        var args = [];
        args = args.concat([
          '-v', 'quiet',
          '-loglevel', loglevel,
          '-print_format', 'json',
          '-show_chapters',
          '-show_format',
          '-show_streams',
          '-i', 'pipe:'
        ]);
        if (sample_rate) { // valid sample rate
          args.push('-ar');
          args.push(sample_rate);
        }
        const opts = self._options.child_process;
        self.logger.debug("probeStream probe options: %s", args.join(" "));
        var result = '';
        const ffprobe = cp.spawn('ffprobe', args, opts)
          .on('message', msg => self.logger.info(msg))
          .on('error', reject)
          .on('exit', (code, signal) => {
            self.logger.debug("probeStream [exit] code:%s signal:%s", code, signal);
          })
          .on('close', (out) => {
            self.logger.debug("probeStream %@", out);
            return resolve(result);
          });
        ffprobe.stdin.on("error", (err) => {
          if (err.code === "EPIPE") {

            // ignore EPIPE error
            // throws sometimes a EPIPE error, ignore as long ffmpeg exit with code 0

          } else {
            self.logger.warn("probeStream ffmepg stdin error", err);
          }
        });
        ffprobe.stdout.on('data', function (data) {
          result += data.toString();
        });
        if (streamOrBuffer instanceof Buffer) { // Buffer
          ffprobe.stdin.write(streamOrBuffer);
        } else { // ReadStream
          ffprobe.stdin.write((await stream2Buffer(streamOrBuffer)));
        }
        ffprobe.stdin.end();
      });
    }//probeStream

But ffprobe hangs on running the command:

probeStream probe options: -v quiet -loglevel debug -print_format json -show_chapters -show_format -show_streams -i pipe: -ar 44100

without exiting the process or processing the media.


Solution

  • Okay, 2 things.

    1. You don't need to pass sample_rate, since the probe will automatically get this
    2. -i pipe: is not valid argument, the right format to pipe data is just -

    This works for WSL2 nicely:

    let probeStream = function (streamOrBuffer, params) {
          var self = this;
          return new Promise(async (resolve, reject) => {
            var loglevel = self.logger.isDebug() ? 'debug' : 'error';
            var args = [];
            args = args.concat([
              '-v', 'quiet',
              '-loglevel', loglevel,
              '-print_format', 'json',
              '-show_chapters',
              '-show_format',
              '-show_streams',
              '-'
            ]);
            // assuming opts = { stdio: ['pipe', 'pipe', 'pipe'] };
            const opts = self._options.child_process;
    
            self.logger.debug("probeStream probe options: %s", args.join(" "));
            var result = '';
            const ffprobe = cp.spawn('ffprobe', args, opts)
              .on('message', msg => self.logger.info(msg))
              .on('error', reject)
              .on('exit', (code, signal) => {
                self.logger.debug("probeStream [exit] code:%s signal:%s", code, signal);
              })
              .on('close', (out) => {
                ffprobe.stdin.end();
                self.logger.debug("probeStream %@", out);
                return resolve(result);
              });
            ffprobe.stdin.on("error", (err) => {
              if (err.code !== "EPIPE") {
                self.logger.warn("probeStream ffmepg stdin error", err);
              }
            });
            ffprobe.stdout.on('data', function (data) {
              result += data.toString();
            });
            if (streamOrBuffer instanceof Buffer) { // Buffer
              ffprobe.stdin.write(streamOrBuffer);
            } else { // ReadStream
              ffprobe.stdin.write((await stream2Buffer(streamOrBuffer)));
            }       
          });
        }//probeStream