Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Error: Connection terminated by user
- at Client.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:402:36)
- at Pool._remove (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg-pool/index.js:135:12)
- at Timeout.setTimeout (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg-pool/index.js:38:12)
- at ontimeout (timers.js:498:11)
- at tryOnTimeout (timers.js:323:5)
- at Timer.listOnTimeout (timers.js:290:5)
- Line added 6052 0.05135667935111022%
- (node:31819) UnhandledPromiseRejectionWarning: Error: This socket is closed
- at Socket._writeGeneric (net.js:729:18)
- at Socket._write (net.js:783:8)
- at doWrite (_stream_writable.js:397:12)
- at writeOrBuffer (_stream_writable.js:383:5)
- at Socket.Writable.write (_stream_writable.js:290:11)
- at Socket.write (net.js:707:40)
- at Connection.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/connection.js:318:22)
- at global.Promise (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:410:23)
- at new Promise (<anonymous>)
- at Client.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:409:12)
- (node:31819) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
- (node:31819) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
- (node:31819) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 end listeners added. Use emitter.setMaxListeners() to increase limit
- var pg = require("pg");
- var fs = require('fs');
- const pool = new pg.Pool({
- user: 'smurf',
- host: 'localhost',
- database: 'mydb',
- password: 'smurf',
- port: 5432,
- })
- var filename = 'allCountries.txt';
- var fs = require('fs'),
- es = require('event-stream');
- var lineNr = 0;
- var max = 11784251; // Number of line, dirty, to get % of lines inserted
- // Connect to Postgresql
- pool.connect((err, client, done) => {
- if (err) throw err
- // Stream file line by line
- var s = fs.createReadStream(filename)
- .pipe(es.split())
- .pipe(es.mapSync(function(e) {
- // pause the readstream
- s.pause();
- lineNr += 1;
- // Each line need to be properly formated
- e = e.split("t"); //TAB split
- // The following fields need formating
- e[0] = parseInt(e[0]);
- e[4] = parseFloat(e[4]);
- e[5] = parseFloat(e[5]);
- e[14] = parseInt(e[14]);
- e[15] = e[15] == '' ? 0 : e[15];
- e[16] = parseInt(e[16]);
- // Insert into db
- pool.query('INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19);', e, function(err, result) {
- if (err) {
- console.log(err);
- }
- done(); // Release this connection to the pool
- console.log("Line added ", lineNr, (lineNr / max * 100) + "%") // Monitor progress
- s.resume(); // Go to next line
- });
- })
- .on('error', function(err) {
- console.log('Error while reading file.', err);
- })
- .on('end', function() {
- console.log('Read entire file.')
- })
- );
- }) // END pool.connect
- var fs = require('fs'),
- es = require('event-stream');
- var lineNr = 0;
- var max = 11784251; // Number of line, dirty, to get % of lines inserted
- // Connect to Postgresql
- pool.connect((err, client, done) => {
- if (err) throw err
- // Stream file line by line
- var s = fs.createReadStream(filename)
- .pipe(es.split())
- .pipe(es.map(function(e, cb) {
- lineNr += 1;
- // Each line need to be properly formated
- e = e.split("t"); //TAB split
- // The following fields need formating
- e[0] = parseInt(e[0]);
- e[4] = parseFloat(e[4]);
- e[5] = parseFloat(e[5]);
- e[14] = parseInt(e[14]);
- e[15] = e[15] == '' ? 0 : e[15];
- e[16] = parseInt(e[16]);
- // Insert into db
- pool.query('INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19);', e, function(err, result) {
- cb(err, result); // call the callback
- console.log("Line added ", lineNr, (lineNr / max * 100) + "%") // Monitor progress
- });
- })
- .resume()
- .on('error', function(err) {
- done();
- console.log('Error while reading file.', err);
- })
- .on('end', function() {
- done();
- console.log('Read entire file.')
- })
- );
- }) // END pool.connect
- const {Pool} = require("pg");
- const {StringStream} = require("scramjet");
- const fs = require("fs");
- const pool = new Pool(options);
- const max = 11784251;
- const INSERT_ENTRY = 'INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19);';
- StringStream
- .from(fs.createReadStream(filename))
- .lines()
- .parse(line => {
- // Each line need to be properly formated
- const entry = line.split("t"); //TAB split
- // The following fields need formating
- entry[0] = parseInt(entry[0]);
- entry[4] = parseFloat(entry[4]);
- entry[5] = parseFloat(entry[5]);
- entry[14] = parseInt(entry[14]);
- entry[15] = entry[15] == '' ? 0 : entry[15];
- entry[16] = parseInt(entry[16]);
- return entry;
- })
- .setOptions({maxParallel: 32})
- .each(entry => {
- const client = await pool.connect();
- try {
- await client.query(INSERT_ENTRY, entry)
- } catch(err) {
- console.log('Error while adding line...', err);
- // some more logic could be here?
- } finally {
- client.release();
- }
- })
- .each(() => !(lineNr++ % 1000) && console.log("Line added ", lineNr, (lineNr / max * 100) + "%"))
- .run()
- .then(
- () => console.log('Read entire file.'),
- e => console.log('Error while handling file.', err)
- );
Add Comment
Please, Sign In to add comment