Rate limits and concurrency using generators of promiseFactories
npm install @watchable/nevermoreThe nevermore scheduler adds rate-limiting, concurrency control, timeout,
retry and backoff to your async functions without changing their signature or
implementation.
Define limits by passing option values to one of the two core nevermore APIs.
The example below uses the executor API to wrap your own vanilla async
function...
``ts
import { createExecutorStrategy } from "@watchable/nevermore";
import { myFn } from "./myFn.ts";
const { createExecutor } = createExecutorStrategy({
concurrency: 1,
intervalMs: 100,
backoffMs: 50,
timeoutMs: 3000,
retries: 3,
});
const myLimitedFn = createExecutor(myFn);
const myResult = await myLimitedFn("my", "typed", "arguments");
`
nevermore has two core APIs which accept the same strategy options...
- createExecutorStrategy - wraps async functions without changing your codecreateSettlementSequence
- - pulls from generators creating jobs just-in-time
The execution of Jobs is controlled through composable scheduling primitives
known as strategies. Multiple strategies are already implemented as individual
composable blocks which can be freely combined. You can further extend nevermore
by writing your own strategies.
See more detail about the two API signatures in the APIs section later in this
document.
A _concurrency_ Strategy accepts another job only when the number ofconcurrency
pending jobs goes below . When there is a free slot (previouslyconcurrency
pending jobs have settled as resolved or rejected), the strategy will accept a
new pending job. To activate this strategy, provide a number in
the options.
A _rate_ Strategy implements rate limiting by launching the next job onlyintervalMs
when there is a free slot within the . Every execution of a job usesintervalMs
up one slot in the interval. When an interval's slots are exhausted, the
strategy calculates when the next slot will become free, and sleeps for that
duration before accepting the next job. To activate this strategy, provide an number in the options. The default value of intervalLaunches is1 launch per interval.
A _timeout_ Strategy wraps jobs in a timeout job (throwing an error if thetimeoutMs
job hasn't settled before ) then passes the job to downstreamJobSettlement
strategies. On receiving a settlement (fulfilment, rejection or timeout) it
unwraps the timeout job, yielding a pointing to the originaltimeoutMs
job, not the substitute. To activate this strategy, provide a numberTimeoutError
in the options and remember your wrapped function may now throw a nevermore.
A _retry_ Strategy repeatedly calls failing jobs until the number ofretries
failures equals . It wraps jobs in a retry job before launching them,JobResolved
storing the count of retries attempted. settlements are unwrappedJobResolved
yielding a pointing to the original job. By contrast,JobRejected events trigger further retries until reaching the maximum numberretries
of retries for that job, and the last failure is passed back as the job's
settlement. To activate this strategy, provide a number in the
options.
A _backoff_ Strategy repeatedly calls failing jobs with a increasingbackoffMs
backoff delay (based on an exponential function). See the section on 'retry' for
more detail of the approach. To activate this strategy, provide a retries
number in the options. To get eventual feedback from continually failing jobs,
you need to set a option. To get backpressure fromcreateSettlementSequence pulling just-in-time, you need to set a concurrency
option to prevent indefinitely-many jobs being queued.
`zsh`
npm install @watchable/nevermore
An ExecutorStrategy can transform a normal async function into a function thatnevermore
is regulated by a pipeline.
Create a strategy and get back a createExecutor() function. The strategy shown
below exercises most of the options - concurrency-limits, rate-limits, backoff,
timeouts and retries...
`ts`
import { createExecutorStrategy } from "@watchable/nevermore";
const { createExecutor } = createExecutorStrategy({
concurrency: 1,
intervalMs: 100,
timeoutMs: 3000,
backoffMs: 1000,
retries: 3,
});
You can then use createExecutor to turn an ordinary function into a regulated
function that respects the constraints of the strategy you configured...
`tshttps://swapi.dev/api/films/${filmId}/
async function getStarWars(filmId: number) {
return await fetch(, {
method: "get",
});
}
const getStarWarsExecutor = createExecutor(getStarWars);
// the below invocation has intellisense for
// autocompleting args for getStarWars and...
// * will allow only one concurrent retrieval
// * will allow only one retrieval every 100ms
// * will timeout individual attempts after 3000ms
// * will attempt up to 3 times if getStarWars throws
const [episode4, episode5, episode6] = await Promise.allSettled([
getStarWarsExecutor(1),
getStarWarsExecutor(2),
getStarWarsExecutor(3),
]);
`
For batch routines, (or potentially infinite sets), createSettlementSequence
provides an alternative API based on iterable sequences of async functions that
are created on-the-fly as they are needed.
This backpressure limits the growth of memory in your app by limiting the
creation of new tasks according to downstream rate and concurrency limits.
Exactly the same scheduling options (concurrency, retry etc.) are supportedcreateExecutorStrategy
as in the API.
#### Explanation
If you eventually need to satisfy a million requests, you don't want to spawn
them all as pending promises in memory while they are slowly processed at 100
per second. The resources dedicated to pending jobs should be allocated
just-in-time.
The createExecutor approach described above is very convenient for adding
seamless scheduling of hundreds of parallel tasks without having to change your
code. Unfortunately this makes the scheduling opaque and there is therefore no
mechanism to provide backpressure when jobs aren't completing quickly.
By contrast the createSettlementSequence allows developers to respectJob
'backpressure' from a pipeline's limited capacity. Each is yielded from
your iterator just-in-time as capacity becomes available. Between yields your
iterator is halted, holding only its stack in memory. An iteration procedure for
1 million requests will therefore only progress as fast as the pipeline allows,
and the only promises in memory are those which have been scheduled.
An example of a sequence yielding Job callbacks one-by-one is shown below.
`ts
import { createSettlementSequence } from "@watchable/nevermore";
// define a sequence of zero-arg functions
async function* createJobSequence() {
for (;;) {
yield async () => {
const result = await fetch(
https://timeapi.io/api/TimeZone/zone?timeZone=Europe/London
);
if (result.status !== 200) {
throw new Error("Failure retrieving time");
}
return result.json() as { timeZone: string; currentLocalTime: string };
};
}
}
// create a sequence of settlements (limited by specified options)
const settlementSequence = createSettlementSequence(
{
concurrency: 1,
intervalMs: 1000,
timeoutMs: 3000,
retries: 3,
},
createJobSequence
);
// consume the settlements (like Promise.allSettled())
for await (const settlement of settlementSequence) {
if (settlement.status === "fulfilled") {
console.log(Time in London is ${settlement.value.currentLocalTime});Gave up retrying. Last error was: ${settlement.reason?.message}
} else {
console.error(
`
);
}
}
#### Extending Settlement
The type of settlements yielded from a settlement sequence aligns with
Promise.allSettled(),
but with an extra job member.
The type of your job J is preserved in JobSettlement, meaning you can get
the annotations back at settlement time.
Annotating a job, and creating an inferrable J is trivial. Instead of ...
`ts`
yield () => getStarWars(filmId);
Add properties to the yielded no-arg function with Object.assign
`ts`
yield Object.assign(() => getStarWars(filmId), { filmId });
Then you can get the extra information back from the type-safe job in the
settlement...
`tsSuccess for ${filmId} : response was ${settlement.value}
// consume the settlements (like Promise.allSettled())
for await (const settlement of settlementSequence) {
const { filmId } = settlement.job;
if (settlement.status === "fulfilled") {
console.log();Failure for ${filmId}: last error was ${settlement.reason?.message}
} else {
console.error(
`
);
}
}
strategiesDevelopers can add e.g. a CircuitBreaker strategy of their own to extend the
richness of their nevermore pipeline.
For reference a _passthru_ Strategy is included in source. This is a no-opStrategy
strategy that is suitable as a starting point for your own strategies. Its
implementation is shown in full below to illustrate the formalism.
- launchJob() asks to schedule a Job, returning a promise that resolves oncelaunchesDone()
the job has been first invoked.
- is a signal called on your strategy when no further launchesnext(): Promise
will take place, allowing it to track remaining pending jobs, and finally
clean up resources.
- implements anAsyncIterator
allowing your strategy to pass back the eventual settlements
of launched jobs (when they are eventually fulfilled or rejected).
`ts`
export function createPassthruStrategy
downstream: Strategy
) {
return {
launchJob(job) {
return downstream.launchJob(job);
},
launchesDone() {
downstream.launchesDone();
},
next() {
return downstream.next();
},
} satisfies Strategy
}
You can pass piped strategies in the pipes option to be placed upstream ofnevermore
strategies specified in the other options. If there are no other options, it
will simply sequence the pipes you choose. exports factories forcreateConcurrencyPipe()
core pipes to be able to interleave them with your own e.g. and createTimeoutPipe().
This would be needed if you want to sequence your own strategies differently
than the default sequence (found in the core sequence.ts file). For example,executing` jobs. Placing concurrency before backoff means a slot is used by a
in the default sequence backoff is placed before concurrency. This ensures that
backed off tasks don't consume a slot, meaning concurrency only limits
scheduled task during its whole lifecycle (including between retries).
- p-limit
- p-queue
- p-retry
- promise-pool