Advertisement
Guest User

Untitled

a guest
Apr 12th, 2017
75
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.54 KB | None | 0 0
  1. const cfg = require("my-config-module");
  2. const mysql = require('mysql');
  3. const BATCH_SIZE = 5;
  4. const stream = require("stream");
  5. const Transform = stream.Transform;
  6. const Writable = stream.Writable;
  7.  
  8. let db = mysql.createPool({
  9. connectionLimit: 50,
  10. user: cfg.sql.user,
  11. host: cfg.sql.host,
  12. password: cfg.sql.password,
  13. database: cfg.sql.db,
  14. multipleStatements: true
  15. });
  16.  
  17. let sql = `SELECT id FROM items`;
  18.  
  19. class BatchTransform extends Transform {
  20. constructor (opts) {
  21. super(opts);
  22. this.buffer = [];
  23. }
  24. _transform (data, encoding, done) {
  25. let { id } = data;
  26. let updated = 'test';
  27. let sql = `UPDATE items SET field = '${updated}' WHERE id = ${id}`;
  28. this.buffer.push(sql);
  29. console.log('Chunk recieved');
  30. if(this.buffer.length >= BATCH_SIZE){
  31. console.log('Chunk transformed!');
  32. this.push(this.buffer.join(";"));
  33. this.buffer = [];
  34. }
  35. done();
  36. }
  37. }
  38.  
  39. class BatchWrite extends Writable {
  40. constructor (opts) {
  41. super(opts);
  42. }
  43. _write (data, encoding, done) {
  44. /*setTimeout(() => {
  45. console.log('Chunk saved');
  46. done();
  47. }, 650);*/
  48. db.query(data, (err, res) => {
  49. console.log('Chunk saved!');
  50. done();
  51. });
  52. }
  53. }
  54.  
  55. let transformStream = new BatchTransform({objectMode: true, highWaterMark: 1});
  56. let writeStream = new BatchWrite({objectMode: true, highWaterMark: 1});
  57.  
  58. let stream = db.query(sql).stream({highWaterMark: 1});
  59. stream.on('error', (err) => console.log('err', err));
  60. stream.on('end', () => console.log("END!"));
  61. stream.pipe(transformStream).pipe(writeStream);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement