Guest User

Untitled

a guest
Mar 18th, 2018
105
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.20 KB | None | 0 0
  1. async function asyncForEach(array, callback) {
  2. for (let index = 0; index < array.length; index++) {
  3. await callback(array[index], index, array)
  4. };
  5. };
  6.  
  7. function _run(key) {
  8. return new Promise((resolve, reject) => {
  9. /* Setup Task */
  10. const currentTask = this.tasks[key];
  11. const batchSize = currentTask.Extraction.batchSize;
  12. const E = currentTask.Extraction; // Only single extraction supported
  13. const Ts = currentTask.Transformations; // Multiple transformations supported
  14. const L = currentTask.Load; // Only single load supported
  15. currentTask.startTime = (new Date()).getTime();
  16. currentTask._itemsExtracted = 0;
  17. var extraction;
  18. var batches;
  19. var load;
  20. var transformations;
  21.  
  22. try {
  23. extraction = this.extractions[E.Name].call(E);
  24. batches = (function*() {
  25. var finished = false;
  26. while (!finished) {
  27. let data = [];
  28. for (let i = 0; i < batchSize; i++) {
  29. let datum = extraction.getNext();
  30. if (datum.done) {
  31. finished = true;
  32. break;
  33. };
  34. data.push(datum.value);
  35. };
  36. log.info("Task : " + key + " : Batch extracted : Batch size : " + data.length);
  37. yield data;
  38. };
  39. })();
  40.  
  41. transformations = Ts.map((t, i, ts) => {
  42. return this.transformations[t.Name].call(t);
  43. });
  44.  
  45. load = this.loads[L.Name].call(L);
  46. } catch (error) {
  47. throw error;
  48. };
  49.  
  50. var batch = null;
  51. (function doEtlTask(self) {
  52. /* Get new batch */
  53. return new Promise(function(resolve, reject) {
  54. batch = batches.next();
  55. if (batch.done) {
  56. reject('complete');
  57. return;
  58. };
  59. batch = batch.value;
  60. self.tasks[key]._itemsExtracted += batch.length;
  61. resolve(batch);
  62. })
  63. /* Transform the batch */
  64. .then(function(batch) {
  65. return new Promise(async function(resolve, reject) {
  66. const payload = [];
  67. await asyncForEach(batch, async(item) => {
  68. await asyncForEach(transformations, async(t) => {
  69. item = await t.transform.call(t, item);
  70. });
  71. if (item !== {} && item) payload.push(item);
  72. });
  73. resolve(payload);
  74. });
  75. })
  76. /* Load the batch */
  77. .then(function(payload) {
  78. log.info("Task : " + key + " : Batch Transformed : Payload size : " + payload.length);
  79. return new Promise(function(resolve, reject) {
  80. load.batch(payload)
  81. .then(function(msg) {
  82. log.info("Task : " + key + " : Batch loaded : res = " + msg);
  83. self.tasks[key]._itemsProcessed = (self.tasks[key]._itemsProcessed) ? self.tasks[key]._itemsProcessed += payload.length : payload.length;
  84. log.info("Task : " + key + " : Extracted / Processed : " + self.tasks[key]._itemsExtracted + " / " + self.tasks[key]._itemsProcessed);
  85. resolve();
  86. })
  87. .catch(function(error) {
  88. reject(new Error("Task : " + key + ' Error loading data:\n' + error));
  89. });
  90. });
  91. })
  92. /* Do the next batch */
  93. .then(function() {
  94. doEtlTask(self);
  95. })
  96. .catch(function(res) {
  97. if (res !== 'complete') {
  98. reject(new Error(res))
  99. } else {
  100. return new Promise(function(resolve, reject) {
  101. log.info("Task : " + key + " : Loading completed");
  102. var endMsg;
  103. try {
  104. // Handle after-extraction functions
  105. var afterE = E.afterTaskRunCBs || [];
  106. afterE.forEach(function(functionName) {
  107. extraction[functionName]();
  108. });
  109.  
  110. // Handle after-transformation functions
  111. transformations.forEach(function(t, i, ts) {
  112. var afterT = t.afterTaskRunCBs || [];
  113. afterT.forEach(function(functionName) {
  114. t[functionName]();
  115. });
  116. });
  117.  
  118. // Handle after-load functions
  119. var afterL = L.afterTaskRunCBs || [];
  120. afterL.forEach(function(functionName) {
  121. load[functionName]();
  122. });
  123.  
  124. // Log the end of the task
  125. self.tasks[key].endTime = (new Date()).getTime();
  126. self.tasks[key].runTimeInSeconds = (self.tasks[key].endTime - self.tasks[key].startTime) / 1000;
  127. endMsg = "Task : " + key + " : Task completed in " + self.tasks[key].runTimeInSeconds + " seconds : " + self.tasks[key]._itemsExtracted + " Items extracted : " + self.tasks[key]._itemsProcessed + " Items processed";
  128. resolve(endMsg);
  129. } catch (error) {
  130. throw new Error("Task : " + key + " : Error on task end:\n" + error);
  131. };
  132. }).then(function(msg) {
  133. resolve(msg);
  134. })
  135. .catch(function(error) {
  136. reject(error);
  137. });
  138. };
  139. });
  140. })(this);
  141. }).then(function(res) {
  142. log.info(res);
  143. return res;
  144. })
  145. .catch(function(error) {
  146. throw error;
  147. });
  148. };
Add Comment
Please, Sign In to add comment