So, I have a fairly large directory of files that I need to process continually using a long-running process in a NodeJS application. The directory is being continually processed and emptied, but it's not uncommon for 1000+ files to be in line for processing at any given time - they're gzipped CSV files, so my naive solution was to grab the directory listing, iterate over the files, open each, parse them, then continue, like this:
files = fs.readdirSync 'directory'
for filename in files
file_path = path.resolve path.join 'directory', filename
fd = fs.openSync file_path, 'r'
buf = new Buffer fs.statSync(file_path).size
fs.readSync fd, buf, 0, len, 0
fs.closeSync fd
zlib.gunzip buf, (err, buf) =>
throw err if err
content = buf.toString().split("\n")
for line in content
# parse, process content, archive file
I'm quickly running up against an EMFILE (Too Many Open Files) error. Please excuse the Sync versions of the fs functions, and the coffeescript.
Is there a better way of processing a massive number of files in a managed way? Ultimately I'd like to use something like a single parsing Stream - I know how to do that with a single large (or even growing) file, but not with a directory full of separate files.
The files are being generated by a large number of disparate clients to a public-facing web server, which then synchronises them regularly to my input directory over a secure protocol. Not an ideal setup, but necessary given the specific nature of the system, and it explains why I can't simply alter the files to be say, a single multiplexed stream.
Not exactly a parsing stream but could be a step towards it:
You could use https://npmjs.org/package/generic-pool to limit de number of concurrent files being processed. You just have to define what resource are to be pooled.
In your case I assume the resource to pool should be a file processor so that only one or a few can live at once.
You can also have some kind of iterator method to streamline which file is to be processed next.
EDIT: Completing my answer. I had a go at your problem and tried this https://gist.github.com/Floby/5064222