Advertisement
Guest User

Untitled

a guest
Mar 25th, 2019
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.35 KB | None | 0 0
  1. /**
  2. * 并行异步队列
  3. * 首次上传时占用最大的并行请求数
  4. * 后续当每个之前的请求结束后,继续执行下一个请求
  5. * 应用场景:分片文件上传
  6. *
  7. */
  8. class WraperTask {
  9. task;
  10. finished;
  11. taskIndex;
  12. constructor(task, finished, taskIndex?) {
  13. this.task = task;
  14. this.finished = finished;
  15. this.taskIndex = taskIndex;
  16. }
  17. }
  18. class Task {
  19. maxParalleTask;
  20. taskResolve;
  21. queueResolve;
  22. taskQueue;
  23. currentTaskIndex;
  24. currentCount;
  25. /**
  26. *
  27. * @param {number} maxParalleTask 最大并行数量
  28. * @param {func} taskResolve 单个 task 的 resolve
  29. * @param {func} queueResolve 所有 task 完成的 resolve
  30. */
  31. constructor(maxParalleTask, taskResolve, queueResolve) {
  32. this.maxParalleTask = maxParalleTask;
  33. this.taskResolve = taskResolve;
  34. this.queueResolve = queueResolve;
  35. this.reset();
  36. }
  37.  
  38. push(asyncTask) {
  39. return this.taskQueue.push(new WraperTask(asyncTask, false));
  40. }
  41.  
  42. getTaskQueue() {
  43. return this.taskQueue.length;
  44. }
  45.  
  46. getUnfinishedTaskQueue() {
  47. return this.taskQueue.reduce(function(accumulator, currentValue) {
  48. accumulator = currentValue.finished ? accumulator : accumulator.concat(currentValue);
  49. return accumulator;
  50. }, []);
  51. }
  52.  
  53. paralleTask() {
  54. this.taskQueue
  55. .slice(0, Math.min(this.taskQueue.length, this.maxParalleTask) - 1)
  56. .forEach((singleTask, index) => {
  57. singleTask.taskIndex = index;
  58. singleTask.task().then(args => this._resolve(args, singleTask));
  59. this.currentTaskIndex += 1;
  60. this.currentCount += 1;
  61. });
  62. }
  63.  
  64. _resolve(args, singleTask) {
  65. singleTask.finished = true;
  66. this.currentCount -= 1;
  67. if (this.taskResolve) {
  68. this.taskResolve(args);
  69. }
  70. // not finished
  71. console.log(`this.getUnfinishedTaskQueue`, this.getUnfinishedTaskQueue());
  72. if (this.getUnfinishedTaskQueue().length !== 0) {
  73. console.log('next task');
  74. if (this.currentCount < this.maxParalleTask) {
  75. this.dispatchNextTask();
  76. }
  77. return;
  78. }
  79. // finished
  80. if (this.queueResolve) {
  81. this.queueResolve();
  82. }
  83. this.reset();
  84. }
  85.  
  86. dispatchNextTask() {
  87. const singleTask = this.taskQueue[this.currentTaskIndex];
  88. if (singleTask && singleTask.task) {
  89. singleTask.task().then(args => this._resolve(args, singleTask));
  90. this.currentTaskIndex += 1;
  91. this.currentCount += 1;
  92. }
  93. }
  94.  
  95. reset() {
  96. this.taskQueue = [];
  97. this.currentTaskIndex = 0; // 执行到第几个 task
  98. this.currentCount = 0; // 当前执行的 task 的总数
  99. }
  100. }
  101.  
  102. function handleAll() {
  103. console.log('all task resolved');
  104. }
  105. function handleResolve(data) {
  106. console.log('task resolved', data);
  107. }
  108.  
  109. function MockTest() {
  110. let task = new Task(4, handleResolve, handleAll);
  111. for (let i = 0; i < 10; i++) {
  112. task.push(signleTask);
  113. }
  114. function signleTask() {
  115. return new Promise((resolve, reject) => {
  116. setTimeout(() => {
  117. resolve(Math.random());
  118. }, Math.random() * 10000);
  119. });
  120. }
  121.  
  122. task.paralleTask();
  123. }
  124.  
  125. MockTest();
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement