Configurable async task queue, w/ throttling, retries, progress, error handling.
npm install @elite-libs/promise-pool


> A background task processor focused on performance, reliability, and durability.
TLDR; An upgraded Promise queue that's essentially a stateful Promise.all() wrapper.
| Table of Contents
- Smart Multi-Threaded Execution
- Key Promise Pool Features
- Who needs a Promise Pool?
- Features
- API
- Install
- Usage
- Minimal Example
- Singleton Pattern
- Recipes
- AWS Lambda \& Middy
- Express Middleware
- Changelog
- v1.3.1 - April 2023
- v1.3.0 - September 2022
> Diagram of Promise Pool's 'Minimal Blocking' design

Promise Pool strives to excel at 4 key goals:
1. Durability - won't fail in unpredictable ways - even under extreme load.
2. Extensible by Design - supports Promise-based libraries (examples include: retry handling, debounce/throttle)
3. Reliability - control your pool with a total runtime limit (align to max HTTP/Lambda request limit), idle timeout (catch orphan or zombie situations), plus a concurrent worker limit.
4. Performance - the work pool queue uses a speedy Ring Buffer design!*
\* Since this is JavaScript, our Ring Buffer is more like three JS Arrays in a trenchcoat.
- Any Node Services (Lambda functions, etc.) which does background work, defined as:
- Long-running async function(s),
- where the return value isn't used (in the current request.)
- And failures are handled by logging.
- [x] Configurable.
- [x] Concurrency limit.
- [x] Retries. (Use p-retry for this.)
- [x] Zero dependencies.
- [x] Error handling.
- [x] Singleton mode: Option to auto-reset when .done(). (Added .drain() method.)
- [x] Task scheduling & prioritization.
- [x] Support or return hints/stats? (Time in Event Loop? Event Loop wait time? Pending/Complete task counts?)
- [x] Support smart 'Rate Limit' logic.
- Recommended solution: conditionally delay (using await delay(requestedWaitTime)) before (or after) each HTTP call.
- Typically you'll detect Rate Limits via HTTP headers (or Payload/body data.) For example, check for any headers like X-RateLimit-WaitTimeMS.)
PromisePool exposes 3 methods:
- .add(...tasks) - add one (or more) tasks for background processing. (A task is a function that wraps a Promise value. e.g. () => Promise.resolve(1)).
- .drain() - Returns a promise that resolves when all tasks have been processed, or another thread takes over waiting by calling .drain() again.
- .done() - Drains AND 'finalizes' the pool. _No more tasks can be added after this._ Can be called from multiple threads, only runs once.
> See either the Usage Section below, or checkout the /examples folder for more complete examples.
sh
with npm
npm install @elite-libs/promise-pool
or using yarn
yarn add @elite-libs/promise-pool
`Usage
$3
`typescript
import PromisePool from '@elite-libs/promise-pool';
// 1/3: Either use the default instance or create a new one.
const pool = new PromisePool();(async () => {
// 2/3: Add task(s) to the pool as needed.
// PromisePool will execute them in parallel as soon as possible.
pool.add(() => saveToS3(data));
pool.add(() => expensiveBackgroundWork(data));
// 3/3: REQUIRED: in order to ensure your tasks are executed,
// you must await either
pool.drain() or pool.done() at some point in your code (done prevents additional tasks from being added).
await pool.drain();
})();
`$3
> Recommended for virtually all projects. (API, CLI, Lambda, Frontend, etc.)
The singleton pattern creates exactly 1
Promise Pool - as soon as the script is imported (typically on the first run).This ensures the
maxWorkers value will act as a global limit on the number of tasks that can run at the same time.#### File
/services/taskPoolSingleton.ts`typescript
import PromisePool from '@elite-libs/promise-pool';export const taskPool = new PromisePool({
maxWorkers: 6, // Optional. Default is
4.
});
`Recipes
/examples folder for more complete examples.$3
Promise Pool in some middy middleware:#### File 1/2
./services/taskPoolMiddleware.tsNote: The imported
taskPool is a singleton instance defined in the taskPoolSingleton file.`typescript
import { taskPool } from './services/taskPoolSingleton';export const taskPoolMiddleware = () => ({
before: (request) => {
Object.assign(request.context, { taskPool });
},
after: async (request) => {
await request.context.taskPool.drain();
}
});
`Now you can use
taskPool in your Lambda function via event.context.taskPool:#### File 2/2
./handlers/example.handler.ts`typescript
import middy from '@middy/core';
import { taskPoolMiddleware } from './services/taskPoolMiddleware';const handleEvent = (event) => {
const { taskPool } = event.context;
const data = getDataFromEvent(event);
taskPool.add(() => saveToS3(data));
taskPool.add(() => expensiveBackgroundWork(data));
return {
statusCode: 200,
body: JSON.stringify({
message: 'Success',
}),
}
}
export const handler = middy(handleEvent)
.use(taskPoolMiddleware());
`$3
#### File 1/3
/services/taskPoolSingleton.mjs`js
import PromisePool from '@elite-libs/promise-pool'export const taskPool = new PromisePool({
maxWorkers: 6 // Optional. Default is
4.
})
`#### File 2/3
/middleware/taskPool.middleware.mjs`js
import { taskPool } from "../services/taskPoolSingleton.mjs";const taskPoolMiddleware = {
setup: (request, response, next) => {
request.taskPool = taskPool
next()
},
cleanup: (request, response, next) => {
if (request.taskPool && 'drain' in request.taskPool) {
taskPool.drain()
}
next()
}
}
export default taskPoolMiddleware
`To use the
taskPoolMiddleware in your Express app, you'd include taskPoolMiddleware.setup() and taskPoolMiddleware.cleanup().#### File 3/3
/app.mjs`js
import taskPoolMiddleware from "../middleware/taskPool.middleware.mjs"export const app = express()
// Step 1/2: Setup the taskPool
app.use(taskPoolMiddleware.setup)
app.use(express.bodyParser())
app.post('/users/', function post(request, response, next) {
const { taskPool } = request
const data = getDataFromBody(request.body)
// You can .add() tasks wherever needed,
// - they'll run in the background.
taskPool.add(() => logMetrics(data))
taskPool.add(() => saveToS3(request))
taskPool.add(() => expensiveBackgroundWork(data))
// Or, 'schedule' multiple tasks at once.
taskPool.add(
() => logMetrics(data),
() => saveToS3(request),
() => expensiveBackgroundWork(data)
)
next()
})
// Step 2/2: Drain the taskPool
app.use(taskPoolMiddleware.cleanup)
`Changelog
$3
- Upgraded dev dependencies (dependabot).
- Cleaned up README code & description.
$3
- _What?_ Adds Smart
.drain() behavior.
- _Why?_
1. Speeds up concurrent server environments!
1. prevents several types of difficult (ghost) bugs!
- stack overflows/mem access violations,
- exceeding rarely hit OS limits, (e.g. max open handle/file/socket limits)
- exceeding limits of Network hardware (e.g. connections/sec, total open socket limit, etc.)
- uneven observed wait times.
- _How?_ Only awaits the latest .drain() caller.
- _Who?_ Server + Singleton` use cases will see a major benefit to this design.