Promise并发调度
题目
实现一个scheduler,能对Promise的并发数量进行控制。补充下面的代码:
class Scheduler {
// ...
}
const timeout = (time) => new Promise((resolve) => setTimeout(resolve, time));
const scheduler = new Scheduler(2);
const addTask = (time, order) => {
scheduler.add(() => timeout(time).then(()=>{ console.log(order); }) );
}
addTask(1000, 1);
addTask(500, 2);
addTask(300, 3);
addTask(400, 4);
scheduler.start();
// 输出:2 3 1 41. 先写构造函数
首先看timeout,timeout最终返回一个Promise对象,然后在addTask中被调用,addTask往调度器scheduler中添加的是一个function,该函数经过time后返回一个status=fulfilled的Promise对象。
考虑到Promise自带控制并发的Promise.all与Promise.race都不能满足限制个数的并发,因此需要换一个考虑的方式。我的思路是,Scheduler中使用队列存储所有的Task即addTask中的箭头函数,每次执行队列中的第一个Task,只要有一个Task执行成功,就下一Task出队并且执行。但是需要保证的是,一次性在队列外执行的Task的数量需要少于limit,可以使用running来表示运行中的Task,即limit >= running即可。
class Scheduler {
constructor(limit) {
this.limit = limit;
this.running = 0;
this.tasklist = new Array();
}
// ...
}2. 然后看add函数
add比较简单直接全部入队等待处理就好,task是一个function,该函数经过time后返回一个status=fulfilled的Promise对象。
class Scheduler {
// constructor(){}
add(task) {
this.tasklist.push(task);
}
}3. start函数
start表示入队后做什么,入队后当然是需要把队列中的funciton执行,那么这里当然是选择limit个task执行。并且需要考虑如何执行呢?首先,把tasklist当做是等待执行队列,shift出来的task就是执行中的队列了,每次shift后,running++表示运行中task的数量加一,如果running === limit就暂停出队,当然,队列为空也不可能出队。这里把执行的逻辑交给一个新的函数run,start只需要控制初始条件下run的个数即可。
class Scheduler {
// constructor(){}
// add(){}
start() {
let n = this.limit;
// 这里也可以写成for形式,while形式不可以直接this.limit--,因为limit需要保持不变
while(n--) {
this.run();
}
}
run() {
// 上面已经比较清晰了,不run的条件就是队空或者running达到limit
if(!this.tasklist.length || this.running >= this.limit) return;
// task退出等待队列开始运行,因此运行中的个数增加
const task = this.tasklist.shift();
this.running++;
// 注意task是一个函数,返回一个Promise,因此返回新的Promise后,onFulfilled后运行下一个task就好,即在then里面this.run()
task().then(() => {
// 当前任务运行完成,下一个任务准备运行,运行中任务数量减少
this.running--;
this.run();
})
}
}至此,本题完毕。
4. 代码总结
class Scheduler {
constructor(limit) {
this.limit = limit;
this.tasklist = new Array();
this.running = 0;
}
add(task) {
this.tasklist.push(task);
}
start() {
for (let i = 0; i < this.limit; i++) {
this.run();
}
}
run() {
if (!this.tasklist.length || this.running >= this.limit) return;
this.running++;
const task = this.tasklist.shift();
task().then(() => {
this.running--;
this.run();
});
}
}
// 定时任务
const timeout = (time) => new Promise((resolve) => setTimeout(resolve, time));
// 实例化调度器
const scheduler = new Scheduler(2);
// 添加任务
const addTask = (time, order) => {
scheduler.add(() => timeout(time).then(() => console.log(order)));
};
// 最后结果测试:2 3 1 4