Guest User

Untitled

a guest
Mar 20th, 2018
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.06 KB | None | 0 0
  1. async function _run(key) {
  2. try {
  3. /* Setup Task */
  4. const currentTask = this.tasks[key];
  5. const batchSize = currentTask.Extraction.batchSize;
  6. const E = currentTask.Extraction; // Only single extraction supported
  7. const Ts = currentTask.Transformations; // Multiple transformations supported
  8. const L = currentTask.Load; // Only single load supported
  9. currentTask.startTime = (new Date()).getTime();
  10. currentTask._itemsExtracted = 0;
  11.  
  12. /* Get extraction */
  13. const extraction = this.extractions[E.Name].call(E);
  14.  
  15. /* Get batch iterator */
  16. const batches = (function*() {
  17. var finished = false;
  18. while (!finished) {
  19. let data = [];
  20. for (let i = 0; i < batchSize; i++) {
  21. let datum = extraction.getNext();
  22. if (datum.done) {
  23. finished = true;
  24. break;
  25. };
  26. data.push(datum.value);
  27. };
  28. log.info("Task : " + key + " : Batch extracted : Batch size : " + data.length);
  29. yield data;
  30. };
  31. })();
  32.  
  33. /* Get transformations */
  34. const transformations = Ts.map((t, i, ts) => {
  35. return this.transformations[t.Name].call(t);
  36. });
  37.  
  38. /* get load */
  39. const load = this.loads[L.Name].call(L);
  40.  
  41. /* Extract, transform and load batches via a loop */
  42. await (async function nextBatch() {
  43. const batch = batches.next();
  44. const payload = [];
  45. if (batch.done) return;
  46. const values = batch.value;
  47.  
  48. /* Do transformations */
  49. await asyncForEach(values, async(item) => {
  50. await asyncForEach(transformations, async(t) => {
  51. item = await t.transform.call(t, item);
  52. });
  53. if (item !== {} && item) payload.push(item);
  54. });
  55. log.info("Task : " + key + " : Batch transformed : Payload size : " + payload.length);
  56.  
  57. /* Do load */
  58. const loadResult = await load.batch(payload);
  59.  
  60. /* Update task metrics */
  61. const _itemsProcessed = (currentTask._itemsProcessed) ? currentTask._itemsProcessed += payload.length : payload.length;
  62. currentTask._itemsProcessed = _itemsProcessed;
  63. currentTask._itemsExtracted += values.length;
  64. log.info(
  65. "Task : " + key + " " +
  66. ": Batch loaded : Destination response " + loadResult + " " +
  67. ": Extracted / Processed " + currentTask._itemsExtracted + " / " + currentTask._itemsProcessed
  68. );
  69.  
  70. /* Get next batch */
  71. await nextBatch();
  72. })();
  73.  
  74. /* Return task-run stats */
  75. return "Task : " + key + " complete! " +
  76. ": Extracted / Processed " + currentTask._itemsExtracted + " / " + currentTask._itemsProcessed
  77.  
  78. } catch (error) {
  79. return "Task : " + key + " complete with error: " + error.stack;
  80. };
  81. };
Add Comment
Please, Sign In to add comment