Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- var fs = require('fs');
- var csv2 = require('csv2');
- var through2 = require('through2');
- var pg = require('pg');
- var copyFrom = require('pg-copy-streams').from;
- /**
- * No of rows that will be COPY'd in a round
- * (i.e. max rowCount in a stream instance)
- *
- * @type {number}
- */
- var rowsInARound = 2;
- /**
- * Stream instance
- * @param tableName
- * @param columns
- * @private
- */
- var _stream = function(tableName, columns)
- {
- var stream = client.query(copyFrom('COPY ' + tableName + '(' + columns.join(',') + ') FROM STDIN'));
- stream.on('end', function()
- {
- if (streamsClosed === streamsOpened)
- {
- client.end();
- }
- streamsClosed++;
- });
- return stream;
- };
- /**
- * PG client
- * @returns {pools.Client|*|FakeClient}
- * @private
- */
- var _client = function()
- {
- var client = new pg.Client('postgres://xxx:xxx@example.com/dbName');
- client.connect();
- return client
- };
- //let's rock'n'roll
- //aux vars
- var streamsClosed = 0;
- var streamsOpened = 0;
- //PG client
- var client = _client();
- //clean up test table
- client.query('TRUNCATE TABLE test;');
- //1st stream obj
- var stream = new _stream('test', ['text', 'num']);
- //let's process the data
- fs
- .createReadStream('ne_110m_admin_0_countries.csv')
- .pipe(csv2())
- .pipe(through2.obj(function (chunk, enc, callback)
- {
- //stream decrypt, data/type conversion, etc. goes here
- //just pass it on for now
- this.push(chunk);
- callback();
- }))
- .on('data', function (data)
- {
- //add the data to the stream
- stream.write(data.join('\t') + '\n');
- //round is over, i.e. "buffer" is full
- if (stream.rowCount === rowsInARound)
- {
- //mark the end of the round
- stream.write('full round over\t' + streamsOpened + '\n');
- //execute SQL
- stream.end();
- //create new stream
- stream = new _stream('test', ['text', 'num']);
- //make sure we know how many streams we have
- ++streamsOpened;
- }
- })
- .on('end', function ()
- {
- if (stream.rowCount > 0)
- {
- stream.write('partial round over\t' + streamsOpened + '\n');
- }
- //make sure we close the last (most likely not full or emtpy) stream too
- stream.end();
- });
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement