Async Pool: Promise.all for Bulk Operations

Max Greenwald | 16 Jan 2023

Background

Promises and async/await in JavaScript are awesome. They give developers straightforward primitives to build complex concurrent code without threading. Because most concurrent code is I/O bound, JavaScript's single-threaded execution model is usually not the blocker when running complex operations, with the exception of CPU-bound calculations.

To handle multiple async operations, JavaScript gives us these static Promise methods:

  • Promise.all
  • Promise.any
  • Promise.race
  • Promise.allSettled

These methods are each useful in different circumstances, but for now we will focus on Promise.all. MDN says:

The Promise.all() static method takes an iterable of promises as input and returns a single Promise. This returned promise fulfills when all of the input's promises fulfill (including when an empty iterable is passed), with an array of the fulfillment values. It rejects when any of the input's promises rejects, with this first rejection reason.

This method is extremely useful when we want to fetch or update multiple things in parallel. An example of a basic usage is:

const [user, plan, photos] = const await Promise.all([
getUser(userId),
getPlan(planId),
getUserPhotos(userId)
]);
return { user, plan, photos }

Promise.all is also used frequently in conjunction with Array.map. For example, if we wanted to get all users and then send them an email:

const users = await getUsers();
await Promise.all(users.map(async (user) => {
await sendEmail(user.email, {
template: "Happy New Year 2023",
subject: "Happy New Year!"
});
}));

The Problem

While Promise.all will work when the number of users we have is less than 100, it starts to fall apart at scale. Some problems:

  • Rate Limits: Our sendEmail function is likely using a third party service like SendGrid or Mandrill which has rate limits. If we have 100K users and we try do send 100K emails all at once, we will start getting 429s.
  • Network: Starting 100K requests all at once means that we are potentially opening 100K TCP / QUIC streams all at once! Hopefully our HTTP client reuses the same connection, but if it does not then we may start to see network congestion from all that traffic. In any case, sending 100K HTTP requests all at once will take some time, though it might work on a cloud provider where there's a lot more network throughput available
  • Memory: Starting all those network requests may also cause the memory usage of our program to spike, and this could cause a slowdown or crash depending on the total memory of the system we are using. Even on a bulky dev machine, this could cause problems!

One potential solution to this is "array chunking", where we break our array up into chunks of a predetermined and then run the update chunk-by-chunk. Example:

const users = await getUsers();
const chunks = chunkArray(users, 20);
for (const chunk of chunks) {
await Promise.all(chunk.map(async (user) => {
await sendEmail(user.email, {
template: "Happy New Year 2023",
subject: "Happy New Year!"
});
}));
}

While this effectively solves the three major problems listed above, it introduces a new problem: each chunk must completely finish before the next chunk can start. If the p50 of sendEmail is 300ms but the p95 is 1s (not that outlandish from my experience), then each chunk will take at least 1s, even though most of the operations are done after 300ms.

Introducing...Async Pool!

asyncPool is a utility with the same functionality as Promise.all + Array.map that keeps the number of concurrent executions at or below a set number. Example:

const users = await getUsers();
await asyncPool(users, 20, async (user) => {
// This function will execute a maximum of 20 instances concurrently
await sendEmail(user.email, {
template: "Happy New Year 2023",
subject: "Happy New Year!"
});
})

asyncPool is a simple way to speed up the execution of bulk operations while giving you control over concurrent executions and rate limiting. Unlike the array chunking method above, asyncPool initially starts up the number of executions that you define. Then, when one execution finishes, another one is immediately started. Once all executions are completed, the results are returned in an array, just like Promise.all.

This technique additionally lets us address the rate limiting problem! If the sendEmail API we are using has a rate limit of 60 calls per second, then we can adjust our asyncPool to maximize up to that limit without crossing it using a setTimeout. By adjusting the pool limit as well as the minimum time per execution, we strike a balance of reducing spiky traffic while also not exceeding the rate limiter.

const users = await getUsers();
// Attempt to hit a 60 calls / second rate limit:
// 20 at a time, each runs for a minimum of 334ms
await asyncPool(users, 20, async (user) => {
await sendEmail(user.email, {
template: "Happy New Year 2023",
subject: "Happy New Year!"
});
// Minimum time per execution: 334 ms
await new Promise((resolve) => setTimeout(resolve, 334));
})

I have found asyncPool useful in a variety of settings, primarily in scripts but also in application code. If there is ever a chance that the number of executions exceeds ~20 at once, I will instinctively reach for asyncPool. Because it's identical to Promise.all when the number of executions is less than the pool size, its a no-brainer.

Implementation

And finally, here it is, coming in at just 24 sparse lines! I take no credit for inventing this idea or even writing the code. While I modified the code slightly and added TypeScript support, the original implementation was done by Rafael Xavier de Souza for his async-pool library. I'm not thrilled by the 2.0 version of the library that uses the for await...of syntax. For arguably the same readability, the syntax is significantly less composable and functional. However, the same motivation for the library remains, and I'm glad I discovered it!

Because asyncPool is such a small utility, I recommend copying this code into your own project directly as opposed to installing it via npm. That way, you can tweak it to your own needs and adjust its API as needed. Maybe you want to make it more like Promise.allSettled instead of Promise.all! The pool is your oyster.

export async function asyncPool<T, V>(
array: T[],
poolLimit: number,
fn: (item: T) => Promise<V>
): Promise<V[]> {
const results: Promise<V>[] = [];
const executing = new Set<Promise<V>>();
for (const item of array) {
const prom = Promise.resolve().then(() => fn(item));
results.push(prom);
if (poolLimit <= array.length) {
executing.add(prom);
prom.then(() => executing.delete(prom));
if (executing.size >= poolLimit) {
await Promise.race(executing);
}
}
}
return Promise.all(results);
}