Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- const cfg = require("my-config-module");
- const mysql = require('mysql');
- const BATCH_SIZE = 5;
- const stream = require("stream");
- const Transform = stream.Transform;
- const Writable = stream.Writable;
- let db = mysql.createPool({
- connectionLimit: 50,
- user: cfg.sql.user,
- host: cfg.sql.host,
- password: cfg.sql.password,
- database: cfg.sql.db,
- multipleStatements: true
- });
- let sql = `SELECT id FROM items`;
- class BatchTransform extends Transform {
- constructor (opts) {
- super(opts);
- this.buffer = [];
- }
- _transform (data, encoding, done) {
- let { id } = data;
- let updated = 'test';
- let sql = `UPDATE items SET field = '${updated}' WHERE id = ${id}`;
- this.buffer.push(sql);
- console.log('Chunk recieved');
- if(this.buffer.length >= BATCH_SIZE){
- console.log('Chunk transformed!');
- this.push(this.buffer.join(";"));
- this.buffer = [];
- }
- done();
- }
- }
- class BatchWrite extends Writable {
- constructor (opts) {
- super(opts);
- }
- _write (data, encoding, done) {
- /*setTimeout(() => {
- console.log('Chunk saved');
- done();
- }, 650);*/
- db.query(data, (err, res) => {
- console.log('Chunk saved!');
- done();
- });
- }
- }
- let transformStream = new BatchTransform({objectMode: true, highWaterMark: 1});
- let writeStream = new BatchWrite({objectMode: true, highWaterMark: 1});
- let stream = db.query(sql).stream({highWaterMark: 1});
- stream.on('error', (err) => console.log('err', err));
- stream.on('end', () => console.log("END!"));
- stream.pipe(transformStream).pipe(writeStream);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement