requestPull[currentRequestNumber] = {
"currentRequestNumber": currentRequestNumber, //текущий номер итерации
"promise": run(), //возвращает промис при выполнение
"data": [] //входные данные
"result": {} //результат выполнение run() для каждых данных
}
class AwaitTaskProcess {
constructor() {
this.tasks = [];
this.results = [];
this.counter = 0;
}
addTask(data, handler = () => { }) {
console.log("поступила очередная порция данных:", data);
this.tasks.push({
currentRequestNumber: this.counter,
promise: handler,
data
});
this.counter++;
}
async process() {
console.log("!!!! запущен процесс обработки данных данных !!!!");
const promise = new Promise((resolve, reject) => {
// При старте обработки первые две секунды происходят некоторые действия.
setTimeout(async () => {
while (this.tasks.length) {
const task = this.tasks.shift();
task.result = await task.promise(task.data);
this.results.push(task);
}
resolve(this.results);
}, 2000);
});
return promise;
}
}
// имитация обработчика производящего долгие действия с данными
async function handler(data) {
const promise = new Promise((resolve, reject) => {
setTimeout(
() => {
console.log("обработана очередная порция данных:", data);
// в качестве результата строка
resolve("processed " + data);
},
// срабатывание от одной до 5 секунд
1000 + Math.floor(4000 * Math.random())
);
});
return promise;
}
// создаем экземпляр ассинхронной очереди
const tasksQueue = new AwaitTaskProcess();
let count = 0;
// функция имитирует нерегулярное поступление нескольких наборов данных для обработки
function start() {
// пусть будет 10 пакетов неравномерно поступающих данных
// если счетчик достиг 10 перестаем генерировать задачи
if (count >= 10) return;
tasksQueue.addTask(count, handler);
count++;
setTimeout(
start,
// срабатывание от одной до 3 секунд
1000 + Math.floor(2000 * Math.random())
);
}
// тестируем все это гавно
// запускаем процесс неравномерного получения данных и отправки их в очередь на обработку
start();
// запускаем процесс обработки данных
tasksQueue
.process()
.then(results => {
console.log("\nвсе данные обработаны:")
console.log(results)
});
поступила очередная порция данных: 0
!!!! запущен процесс обработки данных данных !!!!
поступила очередная порция данных: 1
поступила очередная порция данных: 2
обработана очередная порция данных: 0
поступила очередная порция данных: 3
поступила очередная порция данных: 4
обработана очередная порция данных: 1
поступила очередная порция данных: 5
поступила очередная порция данных: 6
обработана очередная порция данных: 2
поступила очередная порция данных: 7
поступила очередная порция данных: 8
обработана очередная порция данных: 3
поступила очередная порция данных: 9
обработана очередная порция данных: 4
обработана очередная порция данных: 5
обработана очередная порция данных: 6
обработана очередная порция данных: 7
обработана очередная порция данных: 8
обработана очередная порция данных: 9
все данные обработаны:
[
{
currentRequestNumber: 0,
promise: [AsyncFunction: handler],
data: 0,
result: 'processed 0'
},
{
currentRequestNumber: 1,
promise: [AsyncFunction: handler],
data: 1,
result: 'processed 1'
},
{
currentRequestNumber: 2,
promise: [AsyncFunction: handler],
data: 2,
result: 'processed 2'
},
{
currentRequestNumber: 3,
promise: [AsyncFunction: handler],
data: 3,
result: 'processed 3'
},
{
currentRequestNumber: 4,
promise: [AsyncFunction: handler],
data: 4,
result: 'processed 4'
},
{
currentRequestNumber: 5,
promise: [AsyncFunction: handler],
data: 5,
result: 'processed 5'
},
{
currentRequestNumber: 6,
promise: [AsyncFunction: handler],
data: 6,
result: 'processed 6'
},
{
currentRequestNumber: 7,
promise: [AsyncFunction: handler],
data: 7,
result: 'processed 7'
},
{
currentRequestNumber: 8,
promise: [AsyncFunction: handler],
data: 8,
result: 'processed 8'
},
{
currentRequestNumber: 9,
promise: [AsyncFunction: handler],
data: 9,
result: 'processed 9'
}
]