Skip to content

Promise并发调度

题目

实现一个scheduler,能对Promise的并发数量进行控制。补充下面的代码:

js
class Scheduler {
    // ...
}

const timeout = (time) => new Promise((resolve) => setTimeout(resolve, time));

const scheduler = new Scheduler(2);

const addTask = (time, order) => {
    scheduler.add(() => timeout(time).then(()=>{ console.log(order); }) );
}

addTask(1000, 1);
addTask(500, 2);
addTask(300, 3);
addTask(400, 4);

scheduler.start();

// 输出:2 3 1 4
class Scheduler {
    // ...
}

const timeout = (time) => new Promise((resolve) => setTimeout(resolve, time));

const scheduler = new Scheduler(2);

const addTask = (time, order) => {
    scheduler.add(() => timeout(time).then(()=>{ console.log(order); }) );
}

addTask(1000, 1);
addTask(500, 2);
addTask(300, 3);
addTask(400, 4);

scheduler.start();

// 输出:2 3 1 4

1. 先写构造函数

​ 首先看timeouttimeout最终返回一个Promise对象,然后在addTask中被调用,addTask往调度器scheduler中添加的是一个function,该函数经过time后返回一个status=fulfilledPromise对象。

​ 考虑到Promise自带控制并发的Promise.allPromise.race都不能满足限制个数的并发,因此需要换一个考虑的方式。我的思路是,Scheduler中使用队列存储所有的TaskaddTask中的箭头函数,每次执行队列中的第一个Task,只要有一个Task执行成功,就下一Task出队并且执行。但是需要保证的是,一次性在队列外执行的Task的数量需要少于limit,可以使用running来表示运行中的Task,即limit >= running即可。

js
class Scheduler {
	constructor(limit) {
        this.limit = limit;
        this.running = 0;
        this.tasklist = new Array();
    }
    
    // ...
}
class Scheduler {
	constructor(limit) {
        this.limit = limit;
        this.running = 0;
        this.tasklist = new Array();
    }
    
    // ...
}

2. 然后看add函数

add比较简单直接全部入队等待处理就好,task是一个function,该函数经过time返回一个status=fulfilledPromise对象。

js
class Scheduler {
    // constructor(){}
    add(task) {
        this.tasklist.push(task);
    }
}
class Scheduler {
    // constructor(){}
    add(task) {
        this.tasklist.push(task);
    }
}

3. start函数

start表示入队后做什么,入队后当然是需要把队列中的funciton执行,那么这里当然是选择limittask执行。并且需要考虑如何执行呢?首先,把tasklist当做是等待执行队列,shift出来的task就是执行中的队列了,每次shift后,running++表示运行中task的数量加一,如果running === limit就暂停出队,当然,队列为空也不可能出队。这里把执行的逻辑交给一个新的函数runstart只需要控制初始条件下run的个数即可。

js
class Scheduler {
    // constructor(){}
    // add(){}
    start() {
        let n = this.limit;
        // 这里也可以写成for形式,while形式不可以直接this.limit--,因为limit需要保持不变
        while(n--) {
            this.run();
        }
    }
    run() {
        // 上面已经比较清晰了,不run的条件就是队空或者running达到limit
        if(!this.tasklist.length || this.running >= this.limit) return;
        // task退出等待队列开始运行,因此运行中的个数增加
        const task = this.tasklist.shift();
        this.running++;
        // 注意task是一个函数,返回一个Promise,因此返回新的Promise后,onFulfilled后运行下一个task就好,即在then里面this.run()
        task().then(() => {
            // 当前任务运行完成,下一个任务准备运行,运行中任务数量减少
            this.running--;
            this.run();
        })
    }
}
class Scheduler {
    // constructor(){}
    // add(){}
    start() {
        let n = this.limit;
        // 这里也可以写成for形式,while形式不可以直接this.limit--,因为limit需要保持不变
        while(n--) {
            this.run();
        }
    }
    run() {
        // 上面已经比较清晰了,不run的条件就是队空或者running达到limit
        if(!this.tasklist.length || this.running >= this.limit) return;
        // task退出等待队列开始运行,因此运行中的个数增加
        const task = this.tasklist.shift();
        this.running++;
        // 注意task是一个函数,返回一个Promise,因此返回新的Promise后,onFulfilled后运行下一个task就好,即在then里面this.run()
        task().then(() => {
            // 当前任务运行完成,下一个任务准备运行,运行中任务数量减少
            this.running--;
            this.run();
        })
    }
}

至此,本题完毕。

4. 代码总结

js
class Scheduler {
  constructor(limit) {
    this.limit = limit;
    this.tasklist = new Array();
    this.running = 0;
  }

  add(task) {
    this.tasklist.push(task);
  }

  start() {
    for (let i = 0; i < this.limit; i++) {
      this.run();
    }
  }

  run() {
    if (!this.tasklist.length || this.running >= this.limit) return;
    this.running++;
    const task = this.tasklist.shift();
    task().then(() => {
      this.running--;
      this.run();
    });
  }
}

// 定时任务
const timeout = (time) => new Promise((resolve) => setTimeout(resolve, time));

// 实例化调度器
const scheduler = new Scheduler(2);

// 添加任务
const addTask = (time, order) => {
  scheduler.add(() => timeout(time).then(() => console.log(order)));
};


// 最后结果测试:2 3 1 4
class Scheduler {
  constructor(limit) {
    this.limit = limit;
    this.tasklist = new Array();
    this.running = 0;
  }

  add(task) {
    this.tasklist.push(task);
  }

  start() {
    for (let i = 0; i < this.limit; i++) {
      this.run();
    }
  }

  run() {
    if (!this.tasklist.length || this.running >= this.limit) return;
    this.running++;
    const task = this.tasklist.shift();
    task().then(() => {
      this.running--;
      this.run();
    });
  }
}

// 定时任务
const timeout = (time) => new Promise((resolve) => setTimeout(resolve, time));

// 实例化调度器
const scheduler = new Scheduler(2);

// 添加任务
const addTask = (time, order) => {
  scheduler.add(() => timeout(time).then(() => console.log(order)));
};


// 最后结果测试:2 3 1 4

Released under the MIT License.