Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- * 并行异步队列
- * 首次上传时占用最大的并行请求数
- * 后续当每个之前的请求结束后,继续执行下一个请求
- * 应用场景:分片文件上传
- *
- */
- class WraperTask {
- task;
- finished;
- taskIndex;
- constructor(task, finished, taskIndex?) {
- this.task = task;
- this.finished = finished;
- this.taskIndex = taskIndex;
- }
- }
- class Task {
- maxParalleTask;
- taskResolve;
- queueResolve;
- taskQueue;
- currentTaskIndex;
- currentCount;
- /**
- *
- * @param {number} maxParalleTask 最大并行数量
- * @param {func} taskResolve 单个 task 的 resolve
- * @param {func} queueResolve 所有 task 完成的 resolve
- */
- constructor(maxParalleTask, taskResolve, queueResolve) {
- this.maxParalleTask = maxParalleTask;
- this.taskResolve = taskResolve;
- this.queueResolve = queueResolve;
- this.reset();
- }
- push(asyncTask) {
- return this.taskQueue.push(new WraperTask(asyncTask, false));
- }
- getTaskQueue() {
- return this.taskQueue.length;
- }
- getUnfinishedTaskQueue() {
- return this.taskQueue.reduce(function(accumulator, currentValue) {
- accumulator = currentValue.finished ? accumulator : accumulator.concat(currentValue);
- return accumulator;
- }, []);
- }
- paralleTask() {
- this.taskQueue
- .slice(0, Math.min(this.taskQueue.length, this.maxParalleTask) - 1)
- .forEach((singleTask, index) => {
- singleTask.taskIndex = index;
- singleTask.task().then(args => this._resolve(args, singleTask));
- this.currentTaskIndex += 1;
- this.currentCount += 1;
- });
- }
- _resolve(args, singleTask) {
- singleTask.finished = true;
- this.currentCount -= 1;
- if (this.taskResolve) {
- this.taskResolve(args);
- }
- // not finished
- console.log(`this.getUnfinishedTaskQueue`, this.getUnfinishedTaskQueue());
- if (this.getUnfinishedTaskQueue().length !== 0) {
- console.log('next task');
- if (this.currentCount < this.maxParalleTask) {
- this.dispatchNextTask();
- }
- return;
- }
- // finished
- if (this.queueResolve) {
- this.queueResolve();
- }
- this.reset();
- }
- dispatchNextTask() {
- const singleTask = this.taskQueue[this.currentTaskIndex];
- if (singleTask && singleTask.task) {
- singleTask.task().then(args => this._resolve(args, singleTask));
- this.currentTaskIndex += 1;
- this.currentCount += 1;
- }
- }
- reset() {
- this.taskQueue = [];
- this.currentTaskIndex = 0; // 执行到第几个 task
- this.currentCount = 0; // 当前执行的 task 的总数
- }
- }
- function handleAll() {
- console.log('all task resolved');
- }
- function handleResolve(data) {
- console.log('task resolved', data);
- }
- function MockTest() {
- let task = new Task(4, handleResolve, handleAll);
- for (let i = 0; i < 10; i++) {
- task.push(signleTask);
- }
- function signleTask() {
- return new Promise((resolve, reject) => {
- setTimeout(() => {
- resolve(Math.random());
- }, Math.random() * 10000);
- });
- }
- task.paralleTask();
- }
- MockTest();
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement