Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Reading Big Files in Node.js is a little tricky. Node.js is meant to deal with I/O tasks efficiently and
- not CPU intensive computations. It is still doable though but I'd prefer doing such tasks in languages like python, R etc.
- Reading, Parsing, Transforming and then Saving large data sets (I'm talking millions of records here) can be done in
- a lot of ways but only a few of those are efficient. Following snippet is able to parse **millions** of records without
- wasting a lot of CPU (15% - 30% max) and (40 MB - 60 MB max) memory. It is based on `Streams`.
- The following program expects the input to be a csv file source eg. `big-data.unpr.csv`
- It saves the result as ndjson and not json as working with huge datasets is easier when done using ndjson format.
- ```typescript
- import { createReadStream, createWriteStream, readdirSync } from 'fs';
- import split from 'split2';
- import through2 from 'through2';
- import parse from 'csv-parse';
- import pump from 'pump';
- import ndjson from 'ndjson';
- import uniqid from 'uniqid';
- import { Logger } from '../util';
- import { CronJob } from 'cron';
- const serialize = () => ndjson.serialize();
- const source = (filename: string) => createReadStream(filename);
- const output = (filename: string) => createWriteStream(filename);
- const transformObjectStream = (context: any) => {
- return through2.obj(async function(chunk: string, enc: string, callback: (err?: Error) => any) {
- let stringChunk;
- try {
- stringChunk = chunk.toString();
- } catch (e) {
- stringChunk = null;
- Logger.error(`STEP - Transform Object To String Error at line ${context.line} ${e}`);
- return callback();
- }
- parse(
- stringChunk,
- {
- bom: true,
- skip_empty_lines: true,
- skip_lines_with_empty_values: true,
- skip_lines_with_error: true,
- trim: true,
- },
- async (err: Error, parsedString: string[]) => {
- if (err) {
- Logger.error(`STEP - Transform Object Error at line ${context.line} ${err}`);
- return callback();
- }
- // this will give rest to cpu
- // brings cpu usage from 100% to 20-25% :-)
- if (context.line % 5000 === 0) {
- await delay();
- }
- try {
- const join = parsedString[0];
- if (!join || !Array.isArray(join)) {
- throw new Error(`Bad CSV Line at ${context.line}`);
- }
- const data = {
- address: join[3],
- age: join[1],
- city: join[2],
- firstName: join[0],
- gender: join[7],
- lastName: join[6],
- pin: join[8],
- searchId: uniqid(),
- state: join[4],
- };
- this.push(data);
- context.line++;
- return callback();
- } catch (e) {
- Logger.error(`STEP - Transform Object Error at line ${context.line} ${e}`);
- return callback();
- }
- }
- );
- });
- };
- const onErrorOrFinish = (context: any) => (e: Error) => {
- if (e) {
- Logger.error(`FINAL - Error After Parsing ${context.line} lines - ${e}`);
- } else {
- Logger.debug(`Time Taken ${(Date.now() - context.time) / 100} seconds`);
- Logger.debug(`Lines Parsed ${context.line}`);
- }
- globalContext.jobInProgress = false;
- };
- const delay = () =>
- new Promise(resolve => {
- setTimeout(resolve, 1000);
- });
- const globalContext = { jobInProgress: false };
- function run() {
- Logger.debug(`Job is ${globalContext.jobInProgress ? 'ALREADY' : 'NOT'} running.`);
- if (globalContext.jobInProgress) {
- return;
- }
- const files: string[] = readdirSync('./');
- let fileName: string = null;
- for (const name of files) {
- if (
- name.includes('.unpr.csv') &&
- !files.includes(`${name.replace('.unpr.csv', '')}.pr.ndjson`)
- ) {
- fileName = name;
- break;
- }
- }
- if (!fileName) {
- Logger.debug(`No more files to process. Exiting.`);
- return;
- }
- const jobContext = {
- existFileName: fileName,
- existFilePath: `./${fileName}`,
- fileName: `./${fileName.replace('.unpr.csv', '')}.pr.ndjson`,
- line: 1,
- time: Date.now(),
- };
- globalContext.jobInProgress = true;
- pump(
- source(jobContext.existFilePath),
- split(),
- transformObjectStream(jobContext),
- serialize(),
- output(jobContext.fileName),
- onErrorOrFinish(jobContext)
- );
- }
- const job = new CronJob({
- context: globalContext,
- cronTime: '*/1 * * * *',
- onTick: run,
- start: false,
- timeZone: 'America/Los_Angeles',
- });
- job.start();
- ```
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement