Advertisement
Guest User

Untitled

a guest
Aug 26th, 2019
90
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.42 KB | None | 0 0
  1. Reading Big Files in Node.js is a little tricky. Node.js is meant to deal with I/O tasks efficiently and
  2. not CPU intensive computations. It is still doable though but I'd prefer doing such tasks in languages like python, R etc.
  3. Reading, Parsing, Transforming and then Saving large data sets (I'm talking millions of records here) can be done in
  4. a lot of ways but only a few of those are efficient. Following snippet is able to parse **millions** of records without
  5. wasting a lot of CPU (15% - 30% max) and (40 MB - 60 MB max) memory. It is based on `Streams`.
  6.  
  7. The following program expects the input to be a csv file source eg. `big-data.unpr.csv`
  8. It saves the result as ndjson and not json as working with huge datasets is easier when done using ndjson format.
  9.  
  10. ```typescript
  11. import { createReadStream, createWriteStream, readdirSync } from 'fs';
  12. import split from 'split2';
  13. import through2 from 'through2';
  14. import parse from 'csv-parse';
  15. import pump from 'pump';
  16. import ndjson from 'ndjson';
  17. import uniqid from 'uniqid';
  18.  
  19. import { Logger } from '../util';
  20. import { CronJob } from 'cron';
  21.  
  22. const serialize = () => ndjson.serialize();
  23. const source = (filename: string) => createReadStream(filename);
  24. const output = (filename: string) => createWriteStream(filename);
  25.  
  26. const transformObjectStream = (context: any) => {
  27. return through2.obj(async function(chunk: string, enc: string, callback: (err?: Error) => any) {
  28. let stringChunk;
  29. try {
  30. stringChunk = chunk.toString();
  31. } catch (e) {
  32. stringChunk = null;
  33. Logger.error(`STEP - Transform Object To String Error at line ${context.line} ${e}`);
  34. return callback();
  35. }
  36. parse(
  37. stringChunk,
  38. {
  39. bom: true,
  40. skip_empty_lines: true,
  41. skip_lines_with_empty_values: true,
  42. skip_lines_with_error: true,
  43. trim: true,
  44. },
  45. async (err: Error, parsedString: string[]) => {
  46. if (err) {
  47. Logger.error(`STEP - Transform Object Error at line ${context.line} ${err}`);
  48. return callback();
  49. }
  50. // this will give rest to cpu
  51. // brings cpu usage from 100% to 20-25% :-)
  52. if (context.line % 5000 === 0) {
  53. await delay();
  54. }
  55. try {
  56. const join = parsedString[0];
  57. if (!join || !Array.isArray(join)) {
  58. throw new Error(`Bad CSV Line at ${context.line}`);
  59. }
  60. const data = {
  61. address: join[3],
  62. age: join[1],
  63. city: join[2],
  64. firstName: join[0],
  65. gender: join[7],
  66. lastName: join[6],
  67. pin: join[8],
  68. searchId: uniqid(),
  69. state: join[4],
  70. };
  71. this.push(data);
  72. context.line++;
  73. return callback();
  74. } catch (e) {
  75. Logger.error(`STEP - Transform Object Error at line ${context.line} ${e}`);
  76. return callback();
  77. }
  78. }
  79. );
  80. });
  81. };
  82.  
  83. const onErrorOrFinish = (context: any) => (e: Error) => {
  84. if (e) {
  85. Logger.error(`FINAL - Error After Parsing ${context.line} lines - ${e}`);
  86. } else {
  87. Logger.debug(`Time Taken ${(Date.now() - context.time) / 100} seconds`);
  88. Logger.debug(`Lines Parsed ${context.line}`);
  89. }
  90. globalContext.jobInProgress = false;
  91. };
  92.  
  93. const delay = () =>
  94. new Promise(resolve => {
  95. setTimeout(resolve, 1000);
  96. });
  97.  
  98. const globalContext = { jobInProgress: false };
  99.  
  100. function run() {
  101. Logger.debug(`Job is ${globalContext.jobInProgress ? 'ALREADY' : 'NOT'} running.`);
  102. if (globalContext.jobInProgress) {
  103. return;
  104. }
  105. const files: string[] = readdirSync('./');
  106. let fileName: string = null;
  107. for (const name of files) {
  108. if (
  109. name.includes('.unpr.csv') &&
  110. !files.includes(`${name.replace('.unpr.csv', '')}.pr.ndjson`)
  111. ) {
  112. fileName = name;
  113. break;
  114. }
  115. }
  116. if (!fileName) {
  117. Logger.debug(`No more files to process. Exiting.`);
  118. return;
  119. }
  120. const jobContext = {
  121. existFileName: fileName,
  122. existFilePath: `./${fileName}`,
  123. fileName: `./${fileName.replace('.unpr.csv', '')}.pr.ndjson`,
  124. line: 1,
  125. time: Date.now(),
  126. };
  127. globalContext.jobInProgress = true;
  128.  
  129. pump(
  130. source(jobContext.existFilePath),
  131. split(),
  132. transformObjectStream(jobContext),
  133. serialize(),
  134. output(jobContext.fileName),
  135. onErrorOrFinish(jobContext)
  136. );
  137. }
  138.  
  139. const job = new CronJob({
  140. context: globalContext,
  141. cronTime: '*/1 * * * *',
  142. onTick: run,
  143. start: false,
  144. timeZone: 'America/Los_Angeles',
  145. });
  146.  
  147. job.start();
  148. ```
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement