1

I've downloaded Geonames database dump and I'm trying to put everything in a postgresql table but I keep running into multiple errors no matter what I try.

Last modification I made I got the following:

Error: Connection terminated by user
    at Client.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:402:36)
    at Pool._remove (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg-pool/index.js:135:12)
    at Timeout.setTimeout (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg-pool/index.js:38:12)
    at ontimeout (timers.js:498:11)
    at tryOnTimeout (timers.js:323:5)
    at Timer.listOnTimeout (timers.js:290:5)
Line added  6052 0.05135667935111022%
(node:31819) UnhandledPromiseRejectionWarning: Error: This socket is closed
    at Socket._writeGeneric (net.js:729:18)
    at Socket._write (net.js:783:8)
    at doWrite (_stream_writable.js:397:12)
    at writeOrBuffer (_stream_writable.js:383:5)
    at Socket.Writable.write (_stream_writable.js:290:11)
    at Socket.write (net.js:707:40)
    at Connection.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/connection.js:318:22)
    at global.Promise (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:410:23)
    at new Promise (<anonymous>)
    at Client.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:409:12)
(node:31819) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:31819) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
(node:31819) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 end listeners added. Use emitter.setMaxListeners() to increase limit

My code is:

var pg = require("pg");
var fs = require('fs');

const pool = new pg.Pool({
  user: 'smurf',
  host: 'localhost',
  database: 'mydb',
  password: 'smurf',
  port: 5432,
})

var filename = 'allCountries.txt';

var fs = require('fs'),
  es = require('event-stream');

var lineNr = 0;
var max = 11784251; // Number of line, dirty, to get % of lines inserted

// Connect to Postgresql
pool.connect((err, client, done) => {
  if (err) throw err

  // Stream file line by line
  var s = fs.createReadStream(filename)
    .pipe(es.split())
    .pipe(es.mapSync(function(e) {

        // pause the readstream
        s.pause();

        lineNr += 1;

        // Each line need to be properly formated
        e = e.split("\t"); //TAB split

        // The following fields need formating
        e[0] = parseInt(e[0]);
        e[4] = parseFloat(e[4]);
        e[5] = parseFloat(e[5]);
        e[14] = parseInt(e[14]);

        e[15] = e[15] == '' ? 0 : e[15];

        e[16] = parseInt(e[16]);

        // Insert into db
        pool.query('INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19);', e, function(err, result) {
          if (err) {
            console.log(err);

          }
          done(); // Release this connection to the pool
          console.log("Line added ", lineNr, (lineNr / max * 100) + "%") // Monitor progress
          s.resume(); // Go to next line

        });

      })
      .on('error', function(err) {
        console.log('Error while reading file.', err);
      })
      .on('end', function() {
        console.log('Read entire file.')
      })
    );
}) // END pool.connect

I tried with ReadFile, ReadFileSync, readline extension. Moving or ommiting the done() function or just moving it around.

I usually use php to insert massive files so I have no idea what I'm doing wrong here.

The MaxListenersExceededWarning error makes no sense to me because it seems like I close everything that I open. What am I doing wrong here?

Thanks!

HypeWolf
  • 750
  • 12
  • 29
  • Have you tried changing the default max listeners? I think there is a limit but I have never had an issue with this. [Ref](https://nodejs.org/dist/latest-v8.x/docs/api/events.html#events_eventemitter_defaultmaxlisteners) – wdfc Aug 12 '18 at 21:10
  • I don't understand why the max listeners would be reached as I close everything. I'm confused. Changing max listener with `require('events').EventEmitter.defaultMaxListeners = 40;` (from this: https://stackoverflow.com/questions/8313628/node-js-request-how-to-emitter-setmaxlisteners) doesn't get me farther in the process (~6500 lines) – HypeWolf Aug 12 '18 at 21:36
  • 1
    This is because you're flooding your pg database with entries using `mapSync`. What it does, it creates inserts, but doesn't wait unitl those are completed. Try using `es.map` instead and use a callback after the data is inserted. You can also use my [`scramjet`](https://www.npmjs.com/package/scramjet) and use pg with es6 async/await code - it should be easier. [This sample](https://github.com/signicode/scramjet/blob/master/samples/so-examples/csv-to-stored-procedure.js) shows a similar caase. – Michał Karpacki Sep 22 '18 at 19:06
  • @MichałKapracki I forgot to let you know that you were right, obviously :) Thank you very much for your help. – HypeWolf Oct 15 '18 at 13:58
  • Hmm... I can make this to an answer... – Michał Karpacki Oct 16 '18 at 20:13
  • Sure go ahead @MichałKapracki :) – HypeWolf Oct 20 '18 at 16:32

1 Answers1

1

As mentioned in the comments - when you work on asynchronous code you need to use map instead of mapSync operation and call the callback after the item has been inserted.

If you use this, the call to pause and resume is not needed anymore (this is done by event-stream), you just need to resume the last stream you create. Then there's the question when done should be called - that is: after all operations are completed.

Your code should look like this:

var pg = require("pg");
var fs = require('fs');

const pool = new pg.Pool({
  user: 'smurf',
  host: 'localhost',
  database: 'mydb',
  password: 'smurf',
  port: 5432,
})

var filename = 'allCountries.txt';

var fs = require('fs'),
    es = require('event-stream');

var lineNr = 0;
var max = 11784251; // Number of line, dirty, to get % of lines inserted

// Connect to Postgresql
pool.connect((err, client, done) => {
  if (err) throw err

  // Stream file line by line
  var s = fs.createReadStream(filename)
    .pipe(es.split())
    .pipe(es.map(function(e, cb) {

        lineNr += 1;

        // Each line need to be properly formated
        e = e.split("\t"); //TAB split

        // The following fields need formating
        e[0] = parseInt(e[0]);
        e[4] = parseFloat(e[4]);
        e[5] = parseFloat(e[5]);
        e[14] = parseInt(e[14]);

        e[15] = e[15] == '' ? 0 : e[15];

        e[16] = parseInt(e[16]);

        // Insert into db
        pool.query('INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19);', e, function(err, result) {
            cb(err, result); // call the callback
            console.log("Line added ", lineNr, (lineNr / max * 100) + "%") // Monitor progress
        });

    })
    .resume()
    .on('error', function(err) {
        done();
        console.log('Error while reading file.', err);
    })
    .on('end', function() {
        done();
        console.log('Read entire file.')
    })
    );
}) // END pool.connect

The above version of the code will not make use of the pool you're creating - it will operate on one item at once.

If you're using a fairly recent node (8.4+) you can use a framework of my own scramjet which would allow to write an even simpler code with the use of ES6 async functions:

const {Pool} = require("pg");
const {StringStream} = require("scramjet");
const fs = require("fs");

const pool = new Pool(options);
const max = 11784251;
const INSERT_ENTRY = 'INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19);';

StringStream
    .from(fs.createReadStream(filename))
    .lines()
    .parse(line => {
        // Each line need to be properly formated
        const entry = line.split("\t"); //TAB split

        // The following fields need formating
        entry[0] = parseInt(entry[0]);
        entry[4] = parseFloat(entry[4]);
        entry[5] = parseFloat(entry[5]);
        entry[14] = parseInt(entry[14]);
        entry[15] = entry[15] == '' ? 0 : entry[15];
        entry[16] = parseInt(entry[16]);

        return entry;
    })
    .setOptions({maxParallel: 32})
    .each(entry => {
        const client = await pool.connect();
        try {
            await client.query(INSERT_ENTRY, entry)
        } catch(err) {
            console.log('Error while adding line...', err);
            // some more logic could be here?
        } finally {
            client.release();
        }
    })
    .each(() => !(lineNr++ % 1000) && console.log("Line added ", lineNr, (lineNr / max * 100) + "%"))
    .run()
    .then(
        () => console.log('Read entire file.'), 
        e => console.log('Error while handling file.', err)
    );

The code above will attempt to run 32 parallel inserts using the pool (requesting a client on every entry - the pg pool.query method will reuse and add clients to it's set limit. This doesn't mean necessarily it'll be 32 times quicker - as there are some external limiting factors, but you should see a drastic increase in speed.

Michał Karpacki
  • 2,588
  • 21
  • 34