Guest User

Untitled

a guest
Mar 18th, 2018
67
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.01 KB | None | 0 0
  1. function _run(key) {
  2. return new Promise(function(resolve4, reject4) {
  3. /* Setup Task */
  4. const currentTask = this.tasks[key];
  5. const batchSize = currentTask.Extraction.batchSize;
  6. const E = currentTask.Extraction; // Only single extraction supported
  7. const Ts = currentTask.Transformations; // Multiple transformations supported
  8. const L = currentTask.Load; // Only single load supported
  9. currentTask.startTime = (new Date()).getTime();
  10. currentTask._itemsExtracted = 0;
  11. var extraction;
  12. var batches;
  13. var load;
  14. var transformations;
  15.  
  16. try {
  17. /* Extraction */
  18. try {
  19. extraction = this.extractions[E.Name].call(E);
  20. batches = (function*() {
  21. var finished = false;
  22. while (!finished) {
  23. let data = [];
  24. for (let i = 0; i < batchSize; i++) {
  25. let datum = extraction.getNext();
  26. if (datum.done) {
  27. finished = true;
  28. break;
  29. };
  30. data.push(datum.value);
  31. };
  32. log.info("Task : " + key + " : Batch extracted : Batch size : " + data.length);
  33. yield data;
  34. };
  35. })();
  36. } catch (error) {
  37. throw new Error("Unable to load extraction module:\n" + error.stack);
  38. };
  39. /* Transformations */
  40. try {
  41. transformations = Ts.map(function(t, i, ts) {
  42. return this.transformations[t.Name].call(t);
  43. }.bind(this));
  44. } catch (error) {
  45. throw new Error("Unable to load transformation modules:\n" + error.stack);
  46. };
  47. /* Load */
  48. try {
  49. load = this.loads[L.Name].call(L);
  50. } catch (error) {
  51. throw new Error("Unable to load load module:\n" + error.stack);
  52. };
  53. } catch (error) {
  54. throw new Error("Unexpected error:\n" + error);
  55. };
  56.  
  57. // Generate and handle batches
  58. var batch = null;
  59. (function doEtlTask(self) {
  60. return new Promise(function(resolve3, reject3) {
  61. var payload = [];
  62.  
  63. /* Get the next batch */
  64. try {
  65. batch = batches.next();
  66. } catch (error) {
  67. throw new Error("Task : " + key + " : Error extracting data:\n" + error.stack);
  68. };
  69.  
  70. /* If batch has data */
  71. if (!batch.done) {
  72. batch = batch.value;
  73. self.tasks[key]._itemsExtracted += batch.length;
  74.  
  75. /* Transform items in batch */
  76. (function getBatchItem(i) {
  77. return new Promise(function(resolve2, reject2) {
  78. if (i < batch.length) {
  79. var item = batch[i];
  80.  
  81. /* Do all transformations on each item */
  82. (function transformItem(j) {
  83. return new Promise(function(resolve1, reject1) {
  84. if (item && j < transformations.length) {
  85. const t = transformations[j];
  86. transformations[j].transform.call(t, item)
  87. .then(function(transformedItem) {
  88. item = transformedItem;
  89. j++;
  90. resolve1(transformItem(j));
  91. })
  92. .catch(function(error) {
  93. reject1(new Error("Task : " + key + ` : Error doing transformation on ${JSON.stringify(item)}:\n` + error));
  94. });
  95. } else {
  96. if (item !== {} && item) payload.push(item);
  97. i++;
  98. resolve2(getBatchItem(i));
  99. };
  100. });
  101. })(0).catch(function(error) {
  102. reject2(new Error("Task : " + key + ` : Error transforming item ${JSON.stringify(item)} in batch ${i}:\n` + error));
  103. });
  104.  
  105. } else {
  106. /* All items transformed */
  107. log.info("Task : " + key + " : Batch Transformed : Payload size : " + payload.length);
  108.  
  109. // Load extracted batch
  110. load.batch(payload)
  111. .then(function(msg) {
  112. log.info("Task : " + key + " : Batch loaded : res = " + msg);
  113. self.tasks[key]._itemsProcessed = (self.tasks[key]._itemsProcessed) ? self.tasks[key]._itemsProcessed += payload.length : payload.length;
  114. log.info("Task : " + key + " : Extracted / Processed : " + self.tasks[key]._itemsExtracted + " / " + self.tasks[key]._itemsProcessed);
  115. resolve3(doEtlTask(self));
  116. })
  117. .catch(function(error) {
  118. reject2(new Error("Task : " + key + ' Error loading data:\n' + error));
  119. });
  120. };
  121. });
  122. })(0).catch(function(error) {
  123. reject3(new Error(error));
  124. });
  125.  
  126. } else {
  127. /* Batch doesn't have data: all batches loaded */
  128. log.info("Task : " + key + " : Loading completed");
  129. var endMsg;
  130. try {
  131. // Handle after-extraction functions
  132. var afterE = E.afterTaskRunCBs || [];
  133. afterE.forEach(function(functionName) {
  134. extraction[functionName]();
  135. });
  136.  
  137. // Handle after-transformation functions
  138. transformations.forEach(function(t, i, ts) {
  139. var afterT = t.afterTaskRunCBs || [];
  140. afterT.forEach(function(functionName) {
  141. t[functionName]();
  142. });
  143. });
  144.  
  145. // Handle after-load functions
  146. var afterL = L.afterTaskRunCBs || [];
  147. afterL.forEach(function(functionName) {
  148. load[functionName]();
  149. });
  150.  
  151. // Log the end of the task
  152. self.tasks[key].endTime = (new Date()).getTime();
  153. self.tasks[key].runTimeInSeconds = (self.tasks[key].endTime - self.tasks[key].startTime) / 1000;
  154. endMsg = "Task : " + key + " : Task completed in " + self.tasks[key].runTimeInSeconds + " seconds : " + self.tasks[key]._itemsExtracted + " Items extracted : " + self.tasks[key]._itemsProcessed + " Items processed";
  155.  
  156. } catch (error) {
  157. reject3(new Error("Task : " + key + " : Error on task end:\n" + error));
  158. };
  159. resolve4(endMsg);
  160. };
  161. });
  162. })(this).catch(function(error) {
  163. reject4(new Error("Error processing batches:\n" + error))
  164. })
  165. }.bind(this));
  166. };
Add Comment
Please, Sign In to add comment