I've downloaded Geonames database dump and I'm trying to put everything in a postgresql table but I keep running into multiple errors no matter what I try.
Last modification I made I got the following:
Error: Connection terminated by user
at Client.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:402:36)
at Pool._remove (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg-pool/index.js:135:12)
at Timeout.setTimeout (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg-pool/index.js:38:12)
at ontimeout (timers.js:498:11)
at tryOnTimeout (timers.js:323:5)
at Timer.listOnTimeout (timers.js:290:5)
Line added 6052 0.05135667935111022%
(node:31819) UnhandledPromiseRejectionWarning: Error: This socket is closed
at Socket._writeGeneric (net.js:729:18)
at Socket._write (net.js:783:8)
at doWrite (_stream_writable.js:397:12)
at writeOrBuffer (_stream_writable.js:383:5)
at Socket.Writable.write (_stream_writable.js:290:11)
at Socket.write (net.js:707:40)
at Connection.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/connection.js:318:22)
at global.Promise (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:410:23)
at new Promise (<anonymous>)
at Client.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:409:12)
(node:31819) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:31819) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
(node:31819) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 end listeners added. Use emitter.setMaxListeners() to increase limit
My code is:
var pg = require("pg");
var fs = require('fs');
const pool = new pg.Pool({
user: 'smurf',
host: 'localhost',
database: 'mydb',
password: 'smurf',
port: 5432,
})
var filename = 'allCountries.txt';
var fs = require('fs'),
es = require('event-stream');
var lineNr = 0;
var max = 11784251; // Number of line, dirty, to get % of lines inserted
// Connect to Postgresql
pool.connect((err, client, done) => {
if (err) throw err
// Stream file line by line
var s = fs.createReadStream(filename)
.pipe(es.split())
.pipe(es.mapSync(function(e) {
// pause the readstream
s.pause();
lineNr += 1;
// Each line need to be properly formated
e = e.split("\t"); //TAB split
// The following fields need formating
e[0] = parseInt(e[0]);
e[4] = parseFloat(e[4]);
e[5] = parseFloat(e[5]);
e[14] = parseInt(e[14]);
e[15] = e[15] == '' ? 0 : e[15];
e[16] = parseInt(e[16]);
// Insert into db
pool.query('INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19);', e, function(err, result) {
if (err) {
console.log(err);
}
done(); // Release this connection to the pool
console.log("Line added ", lineNr, (lineNr / max * 100) + "%") // Monitor progress
s.resume(); // Go to next line
});
})
.on('error', function(err) {
console.log('Error while reading file.', err);
})
.on('end', function() {
console.log('Read entire file.')
})
);
}) // END pool.connect
I tried with ReadFile, ReadFileSync, readline extension. Moving or ommiting the done()
function or just moving it around.
I usually use php to insert massive files so I have no idea what I'm doing wrong here.
The MaxListenersExceededWarning error makes no sense to me because it seems like I close everything that I open. What am I doing wrong here?
Thanks!
As mentioned in the comments - when you work on asynchronous code you need to use map
instead of mapSync
operation and call the callback after the item has been inserted.
If you use this, the call to pause
and resume
is not needed anymore (this is done by event-stream
), you just need to resume the last stream you create. Then there's the question when done
should be called - that is: after all operations are completed.
Your code should look like this:
var pg = require("pg");
var fs = require('fs');
const pool = new pg.Pool({
user: 'smurf',
host: 'localhost',
database: 'mydb',
password: 'smurf',
port: 5432,
})
var filename = 'allCountries.txt';
var fs = require('fs'),
es = require('event-stream');
var lineNr = 0;
var max = 11784251; // Number of line, dirty, to get % of lines inserted
// Connect to Postgresql
pool.connect((err, client, done) => {
if (err) throw err
// Stream file line by line
var s = fs.createReadStream(filename)
.pipe(es.split())
.pipe(es.map(function(e, cb) {
lineNr += 1;
// Each line need to be properly formated
e = e.split("\t"); //TAB split
// The following fields need formating
e[0] = parseInt(e[0]);
e[4] = parseFloat(e[4]);
e[5] = parseFloat(e[5]);
e[14] = parseInt(e[14]);
e[15] = e[15] == '' ? 0 : e[15];
e[16] = parseInt(e[16]);
// Insert into db
pool.query('INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19);', e, function(err, result) {
cb(err, result); // call the callback
console.log("Line added ", lineNr, (lineNr / max * 100) + "%") // Monitor progress
});
})
.resume()
.on('error', function(err) {
done();
console.log('Error while reading file.', err);
})
.on('end', function() {
done();
console.log('Read entire file.')
})
);
}) // END pool.connect
The above version of the code will not make use of the pool you're creating - it will operate on one item at once.
If you're using a fairly recent node (8.4+) you can use a framework of my own scramjet
which would allow to write an even simpler code with the use of ES6 async functions:
const {Pool} = require("pg");
const {StringStream} = require("scramjet");
const fs = require("fs");
const pool = new Pool(options);
const max = 11784251;
const INSERT_ENTRY = 'INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19);';
StringStream
.from(fs.createReadStream(filename))
.lines()
.parse(line => {
// Each line need to be properly formated
const entry = line.split("\t"); //TAB split
// The following fields need formating
entry[0] = parseInt(entry[0]);
entry[4] = parseFloat(entry[4]);
entry[5] = parseFloat(entry[5]);
entry[14] = parseInt(entry[14]);
entry[15] = entry[15] == '' ? 0 : entry[15];
entry[16] = parseInt(entry[16]);
return entry;
})
.setOptions({maxParallel: 32})
.each(entry => {
const client = await pool.connect();
try {
await client.query(INSERT_ENTRY, entry)
} catch(err) {
console.log('Error while adding line...', err);
// some more logic could be here?
} finally {
client.release();
}
})
.each(() => !(lineNr++ % 1000) && console.log("Line added ", lineNr, (lineNr / max * 100) + "%"))
.run()
.then(
() => console.log('Read entire file.'),
e => console.log('Error while handling file.', err)
);
The code above will attempt to run 32 parallel inserts using the pool (requesting a client on every entry - the pg pool.query
method will reuse and add clients to it's set limit. This doesn't mean necessarily it'll be 32 times quicker - as there are some external limiting factors, but you should see a drastic increase in speed.