Advertisement
Guest User

Untitled

a guest
Sep 2nd, 2015
59
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.12 KB | None | 0 0
  1. var fs = require('fs');
  2. var csv2 = require('csv2');
  3. var through2 = require('through2');
  4. var pg = require('pg');
  5. var copyFrom = require('pg-copy-streams').from;
  6.  
  7. /**
  8. * No of rows that will be COPY'd in a round
  9. * (i.e. max rowCount in a stream instance)
  10. *
  11. * @type {number}
  12. */
  13. var rowsInARound = 2;
  14.  
  15. /**
  16. * Stream instance
  17. * @param tableName
  18. * @param columns
  19. * @private
  20. */
  21. var _stream = function(tableName, columns)
  22. {
  23. var stream = client.query(copyFrom('COPY ' + tableName + '(' + columns.join(',') + ') FROM STDIN'));
  24. stream.on('end', function()
  25. {
  26. if (streamsClosed === streamsOpened)
  27. {
  28. client.end();
  29. }
  30. streamsClosed++;
  31. });
  32.  
  33. return stream;
  34. };
  35.  
  36. /**
  37. * PG client
  38. * @returns {pools.Client|*|FakeClient}
  39. * @private
  40. */
  41. var _client = function()
  42. {
  43. var client = new pg.Client('postgres://xxx:xxx@example.com/dbName');
  44. client.connect();
  45. return client
  46. };
  47.  
  48. //let's rock'n'roll
  49.  
  50. //aux vars
  51. var streamsClosed = 0;
  52. var streamsOpened = 0;
  53.  
  54. //PG client
  55. var client = _client();
  56.  
  57. //clean up test table
  58. client.query('TRUNCATE TABLE test;');
  59.  
  60. //1st stream obj
  61. var stream = new _stream('test', ['text', 'num']);
  62.  
  63. //let's process the data
  64. fs
  65. .createReadStream('ne_110m_admin_0_countries.csv')
  66. .pipe(csv2())
  67. .pipe(through2.obj(function (chunk, enc, callback)
  68. {
  69. //stream decrypt, data/type conversion, etc. goes here
  70. //just pass it on for now
  71. this.push(chunk);
  72. callback();
  73. }))
  74. .on('data', function (data)
  75. {
  76. //add the data to the stream
  77. stream.write(data.join('\t') + '\n');
  78.  
  79. //round is over, i.e. "buffer" is full
  80. if (stream.rowCount === rowsInARound)
  81. {
  82. //mark the end of the round
  83. stream.write('full round over\t' + streamsOpened + '\n');
  84.  
  85. //execute SQL
  86. stream.end();
  87.  
  88. //create new stream
  89. stream = new _stream('test', ['text', 'num']);
  90.  
  91. //make sure we know how many streams we have
  92. ++streamsOpened;
  93. }
  94. })
  95. .on('end', function ()
  96. {
  97. if (stream.rowCount > 0)
  98. {
  99. stream.write('partial round over\t' + streamsOpened + '\n');
  100. }
  101. //make sure we close the last (most likely not full or emtpy) stream too
  102. stream.end();
  103. });
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement