As all others in this answer thread have pointed out, Promise.all()
won't do the right thing if you need to limit concurrency. But ideally you shouldn't even want to wait until all of the Promises are done before processing them.
Instead, you want to process each result ASAP as soon as it becomes available, so you don't have to wait for the very last promise to finish before you start iterating over them.
So, here's a code sample that does just that, based partly on the answer by Endless and also on this answer by T.J. Crowder.
EDIT: I've turned this little snippet into a library, concurrency-limit-runner.
// example tasks that sleep and return a number
// in real life, you'd probably fetch URLs or something
const tasks = [];
for (let i = 0; i < 20; i++) {
tasks.push(async () => {
console.log(`start ${i}`);
await sleep(Math.random() * 1000);
console.log(`end ${i}`);
return i;
});
}
function sleep(ms) { return new Promise(r => setTimeout(r, ms)); }
(async () => {
for await (let value of runTasks(3, tasks.values())) {
console.log(`output ${value}`);
}
})();
async function* runTasks(maxConcurrency, taskIterator) {
async function* createWorkerIterator() {
// Each AsyncGenerator that this function* creates is a worker,
// polling for tasks from the shared taskIterator. Sharing the
// taskIterator ensures that each worker gets unique tasks.
for (const task of taskIterator) yield await task();
}
const asyncIterators = new Array(maxConcurrency);
for (let i = 0; i < maxConcurrency; i++) {
asyncIterators[i] = createWorkerIterator();
}
yield* raceAsyncIterators(asyncIterators);
}
async function* raceAsyncIterators(asyncIterators) {
async function nextResultWithItsIterator(iterator) {
return { result: await iterator.next(), iterator: iterator };
}
/** @type Map<AsyncIterator<T>,
Promise<{result: IteratorResult<T>, iterator: AsyncIterator<T>}>> */
const promises = new Map();
for (const iterator of asyncIterators) {
promises.set(iterator, nextResultWithItsIterator(iterator));
}
while (promises.size) {
const { result, iterator } = await Promise.race(promises.values());
if (result.done) {
promises.delete(iterator);
} else {
promises.set(iterator, nextResultWithItsIterator(iterator));
yield result.value;
}
}
}
There's a lot of magic in here; let me explain.
This solution is built around async generator functions, which many JS developers may not be familiar with.
A generator function (aka function*
function) returns a "generator," an iterator of results. Generator functions are allowed to use the yield
keyword where you might have normally used a return
keyword. The first time a caller calls next()
on the generator (or uses a for...of
loop), the function*
function runs until it yield
s a value; that becomes the next()
value of the iterator. But the subsequent time next()
is called, the generator function resumes from the yield
statement, right where it left off, even if it's in the middle of a loop. (You can also yield*
, to yield all of the results of another generator function.)
An "async generator function" (async function*
) is a generator function that returns an "async iterator," which is an iterator of promises. You can call for await...of
on an async iterator. Async generator functions can use the await
keyword, as you might do in any async function
.
In the example, we call runTasks()
with an array of task functions; we call .values()
on the array to convert the array into an iterator.
runTasks()
is an async generator function, so we can call it with a for await...of
loop. Each time the loop runs, we'll process the result of the latest completed task.
runTasks()
creates N async iterators, the "workers." Each worker polls for tasks from the shared taskIterator
, ensuring that each worker gets a unique task.
The example calls runTasks
with 3 concurrent workers, so no more than 3 tasks are launched at the same time. When any task completes, we immediately queue up the next task. (This is superior to "batching", where you do 3 tasks at once, await all three of them, and don't start the next batch of three until the entire previous batch has finished.)
runTasks()
concludes by "racing" its async iterators with yield* raceAsyncIterators()
. raceAsyncIterators()
is like Promise.race()
but it races N iterators of Promises instead of just N Promises; it returns an async iterator that yields the results of resolved Promises.
raceAsyncIterators()
starts by defining a promises
Map
from each of the iterators to promises. Each promise is a promise for an iteration result along with the iterator that generated it.
With the promises
map, we can Promise.race()
the values of the map, giving us the winning iteration result and its iterator. If the iterator is completely done
, we remove it from the map; otherwise we replace its Promise in the promises
map with the iterator's next()
Promise and yield result.value
.
In conclusion, runTasks()
is an async generator function that yields the results of racing N concurrent async iterators of tasks, so the end user can just for await (let value of runTasks(3, tasks.values()))
to process each result as soon as it becomes available.
Promise.all
does manage promise progression - the promises do that themselves,Promise.all
just waits for them.Promise
constructor antipattern!