node.jspostgresqlgeonames

How to properly insert the Geonames 1.5Gib file into Postgresql using Node?


I've downloaded Geonames database dump and I'm trying to put everything in a postgresql table but I keep running into multiple errors no matter what I try.

Last modification I made I got the following:

Error: Connection terminated by user
    at Client.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:402:36)
    at Pool._remove (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg-pool/index.js:135:12)
    at Timeout.setTimeout (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg-pool/index.js:38:12)
    at ontimeout (timers.js:498:11)
    at tryOnTimeout (timers.js:323:5)
    at Timer.listOnTimeout (timers.js:290:5)
Line added  6052 0.05135667935111022%
(node:31819) UnhandledPromiseRejectionWarning: Error: This socket is closed
    at Socket._writeGeneric (net.js:729:18)
    at Socket._write (net.js:783:8)
    at doWrite (_stream_writable.js:397:12)
    at writeOrBuffer (_stream_writable.js:383:5)
    at Socket.Writable.write (_stream_writable.js:290:11)
    at Socket.write (net.js:707:40)
    at Connection.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/connection.js:318:22)
    at global.Promise (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:410:23)
    at new Promise (<anonymous>)
    at Client.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:409:12)
(node:31819) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:31819) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
(node:31819) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 end listeners added. Use emitter.setMaxListeners() to increase limit

My code is:

var pg = require("pg");
var fs = require('fs');

const pool = new pg.Pool({
  user: 'smurf',
  host: 'localhost',
  database: 'mydb',
  password: 'smurf',
  port: 5432,
})

var filename = 'allCountries.txt';

var fs = require('fs'),
  es = require('event-stream');

var lineNr = 0;
var max = 11784251; // Number of line, dirty, to get % of lines inserted

// Connect to Postgresql
pool.connect((err, client, done) => {
  if (err) throw err

  // Stream file line by line
  var s = fs.createReadStream(filename)
    .pipe(es.split())
    .pipe(es.mapSync(function(e) {

        // pause the readstream
        s.pause();

        lineNr += 1;

        // Each line need to be properly formated
        e = e.split("\t"); //TAB split

        // The following fields need formating
        e[0] = parseInt(e[0]);
        e[4] = parseFloat(e[4]);
        e[5] = parseFloat(e[5]);
        e[14] = parseInt(e[14]);

        e[15] = e[15] == '' ? 0 : e[15];

        e[16] = parseInt(e[16]);

        // Insert into db
        pool.query('INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19);', e, function(err, result) {
          if (err) {
            console.log(err);

          }
          done(); // Release this connection to the pool
          console.log("Line added ", lineNr, (lineNr / max * 100) + "%") // Monitor progress
          s.resume(); // Go to next line

        });

      })
      .on('error', function(err) {
        console.log('Error while reading file.', err);
      })
      .on('end', function() {
        console.log('Read entire file.')
      })
    );
}) // END pool.connect

I tried with ReadFile, ReadFileSync, readline extension. Moving or ommiting the done() function or just moving it around.

I usually use php to insert massive files so I have no idea what I'm doing wrong here.

The MaxListenersExceededWarning error makes no sense to me because it seems like I close everything that I open. What am I doing wrong here?

Thanks!


Solution

  • As mentioned in the comments - when you work on asynchronous code you need to use map instead of mapSync operation and call the callback after the item has been inserted.

    If you use this, the call to pause and resume is not needed anymore (this is done by event-stream), you just need to resume the last stream you create. Then there's the question when done should be called - that is: after all operations are completed.

    Your code should look like this:

    var pg = require("pg");
    var fs = require('fs');
    
    const pool = new pg.Pool({
      user: 'smurf',
      host: 'localhost',
      database: 'mydb',
      password: 'smurf',
      port: 5432,
    })
    
    var filename = 'allCountries.txt';
    
    var fs = require('fs'),
        es = require('event-stream');
    
    var lineNr = 0;
    var max = 11784251; // Number of line, dirty, to get % of lines inserted
    
    // Connect to Postgresql
    pool.connect((err, client, done) => {
      if (err) throw err
    
      // Stream file line by line
      var s = fs.createReadStream(filename)
        .pipe(es.split())
        .pipe(es.map(function(e, cb) {
    
            lineNr += 1;
    
            // Each line need to be properly formated
            e = e.split("\t"); //TAB split
    
            // The following fields need formating
            e[0] = parseInt(e[0]);
            e[4] = parseFloat(e[4]);
            e[5] = parseFloat(e[5]);
            e[14] = parseInt(e[14]);
    
            e[15] = e[15] == '' ? 0 : e[15];
    
            e[16] = parseInt(e[16]);
    
            // Insert into db
            pool.query('INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19);', e, function(err, result) {
                cb(err, result); // call the callback
                console.log("Line added ", lineNr, (lineNr / max * 100) + "%") // Monitor progress
            });
    
        })
        .resume()
        .on('error', function(err) {
            done();
            console.log('Error while reading file.', err);
        })
        .on('end', function() {
            done();
            console.log('Read entire file.')
        })
        );
    }) // END pool.connect
    

    The above version of the code will not make use of the pool you're creating - it will operate on one item at once.

    If you're using a fairly recent node (8.4+) you can use a framework of my own scramjet which would allow to write an even simpler code with the use of ES6 async functions:

    const {Pool} = require("pg");
    const {StringStream} = require("scramjet");
    const fs = require("fs");
    
    const pool = new Pool(options);
    const max = 11784251;
    const INSERT_ENTRY = 'INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19);';
    
    StringStream
        .from(fs.createReadStream(filename))
        .lines()
        .parse(line => {
            // Each line need to be properly formated
            const entry = line.split("\t"); //TAB split
    
            // The following fields need formating
            entry[0] = parseInt(entry[0]);
            entry[4] = parseFloat(entry[4]);
            entry[5] = parseFloat(entry[5]);
            entry[14] = parseInt(entry[14]);
            entry[15] = entry[15] == '' ? 0 : entry[15];
            entry[16] = parseInt(entry[16]);
    
            return entry;
        })
        .setOptions({maxParallel: 32})
        .each(entry => {
            const client = await pool.connect();
            try {
                await client.query(INSERT_ENTRY, entry)
            } catch(err) {
                console.log('Error while adding line...', err);
                // some more logic could be here?
            } finally {
                client.release();
            }
        })
        .each(() => !(lineNr++ % 1000) && console.log("Line added ", lineNr, (lineNr / max * 100) + "%"))
        .run()
        .then(
            () => console.log('Read entire file.'), 
            e => console.log('Error while handling file.', err)
        );
    

    The code above will attempt to run 32 parallel inserts using the pool (requesting a client on every entry - the pg pool.query method will reuse and add clients to it's set limit. This doesn't mean necessarily it'll be 32 times quicker - as there are some external limiting factors, but you should see a drastic increase in speed.