Utilities for JavaScript Promise and AsyncFunction
npm install extra-promisePromise and async functions.sh
npm install --save extra-promise
or
yarn add extra-promise
`API
`ts
interface INonBlockingChannel {
send(value: T): void
receive(): AsyncIterable
close: () => void
}interface IBlockingChannel {
send(value: T): Promise
receive(): AsyncIterable
close: () => void
}
interface IDeferred {
resolve(value: T): void
reject(reason: unknown): void
}
`$3
#### isPromise
`ts
function isPromise(val: unknown): val is Promise
function isntPromise(val: T): val is Exclude>
`#### isPromiseLike
`ts
function isPromiseLike(val: unknown): val is PromiseLike
function isntPromiseLike(val: T): val is Exclude>
`#### delay
`ts
function delay(timeout: number, signal?: AbortSignal): Promise
`A simple wrapper for
setTimeout.#### timeout
`ts
function timeout(ms: number, signal?: AbortSignal): Promise
`It throws a
TimeoutError after ms milliseconds.`ts
try {
result = await Promise.race([
fetchData()
, timeout(5000)
])
} catch (e) {
if (e instanceof TimeoutError) ...
}
`#### pad
`ts
function pad(ms: number, fn: () => Awaitable): Promise
`Run a function, but wait at least
ms milliseconds before returning.#### parallel
`ts
function parallel(
tasks: Iterable<() => Awaitable>
, concurrency: number = Infinity
): Promise
`Perform tasks in parallel.
The value range of
concurrency is [1, Infinity].
Invalid values will throw Error.#### parallelAsync
`ts
function parallelAsync(
tasks: AsyncIterable<() => Awaitable>
, concurrency: number // concurrency must be finite number
): Promise
`Same as
parallel, but tasks is an AsyncIterable.#### series
`ts
function series(
tasks: Iterable<() => Awaitable>
| AsyncIterable<() => Awaitable>
): Promise
`Perform tasks in order.
Equivalent to
parallel(tasks, 1).#### waterfall
`ts
function waterfall(
tasks: Iterable<(result: unknown) => Awatiable>
| AsyncIterable<(result: unknown) => Awaitable>
): Promise
`Perform tasks in order, the return value of the previous task will become the parameter of the next task. If
tasks is empty, return Promise.#### each
`ts
function each(
iterable: Iterable
, fn: (element: T, i: number) => Awaitable
, concurrency: number = Infinity
): Promise
`The async
each operator for Iterable.The value range of
concurrency is [1, Infinity].
Invalid values will throw Error.#### eachAsync
`ts
function eachAsync(
iterable: AsyncIterable
, fn: (element: T, i: number) => Awaitable
, concurrency: number // concurrency must be finite number
): Promise
`Same as
each, but iterable is an AsyncIterable.#### map
`ts
function map(
iterable: Iterable
, fn: (element: T, i: number) => Awaitable
, concurrency: number = Infinity
): Promise
`The async
map operator for Iterable.The value range of
concurrency is [1, Infinity].
Invalid values will throw Error.#### mapAsync
`ts
function mapAsync(
iterable: AsyncIterable
, fn: (element: T, i: number) => Awaitable
, concurrency: number // concurrency must be finite number
): Promise
`Same as
map, but iterable is an AsyncIterable.#### filter
`ts
function filter(
iterable: Iterable
, fn: (element: T, i: number) => Awaitable
, concurrency: number = Infinity
): Promise
`The async
filter operator for Iterable.The value range of
concurrency is [1, Infinity].
Invalid values will throw Error.#### filterAsync
`ts
function filterAsync(
iterable: AsyncIterable
, fn: (element: T, i: number) => Awaitable
, concurrency: number // concurrency must be finite number
): Promise
`Same as
filter, but iterable is an AsyncIterable.#### all
`ts
function all }>(
obj: T
): Promise<{ [Key in keyof T]: UnpackedPromiseLike }>
`It is similar to
Promise.all, but the first parameter is an object.`ts
const { task1, task2 } = await all({
task1: invokeTask1()
, task2: invokeTask2()
})
`#### promisify
`ts
type Callback = (err: any, result?: T) => voidfunction promisify(
fn: (...args: [...args: Args, callback?: Callback]) => unknown
): (...args: Args) => Promise
`The well-known
promisify function.#### callbackify
`ts
type Callback = (err: any, result?: T) => voidfunction callbackify(
fn: (...args: Args) => Awaitable
): (...args: [...args: Args, callback: Callback]) => void
`The
callbackify function, as opposed to promisify.#### asyncify
`ts
function asyncify(
fn: (this: This, ...args: Args) => Awaitable
): (this: This, ...args: Promisify) => Promise
`Turn sync functions into async functions.
`ts
const a = 1
const b = Promise.resolve(2)const add = (a: number, b: number) => a + b
// BAD
add(a, await b) // 3
// GOOD
const addAsync = asyncify(add) // (a: number | PromiseLike, b: number | PromiseLike) => Promise
await addAsync(a, b) // Promise<3>
`It can also be used to eliminate the call stack:
`ts
// OLD
function count(n: number, i: number = 0): number {
if (i < n) return count(n, i + 1)
return i
}count(10000) // RangeError: Maximum call stack size exceeded
// NEW
const countAsync = asyncify((n: number, i: number = 0): Awaitable => {
if (i < n) return countAsync(n, i + 1)
return i
})
await countAsync(10000) // 10000
`#### spawn
`ts
function spawn(
num: number
, create: (id: number) => Awaitable
): Promise
`A sugar for create multiple values in parallel.
The parameter
id is from 1 to num.#### limitConcurrencyByQueue
`ts
function limitConcurrencyByQueue(
concurrency: number
, fn: (...args: Args) => PromiseLike
): (...args: Args) => Promise
`Limit the number of concurrency, calls that exceed the number of concurrency will be delayed in order.
#### reusePendingPromises
`ts
type VerboseResult = [value: T, isReuse: boolean]interface IReusePendingPromisesOptions {
createKey?: (args: Args) => unknown
verbose?: true
}
function reusePendingPromises(
fn: (...args: Args) => PromiseLike
, options: IReusePendingPromisesOptions & { verbose: true }
): (...args: Args) => Promise>
function reusePendingPromises(
fn: (...args: Args) => PromiseLike
, options: IReusePendingPromisesOptions & { verbose: false }
): (...args: Args) => Promise
function reusePendingPromises(
fn: (...args: Args) => PromiseLike
, options: Omit, 'verbose'>
): (...args: Args) => Promise
function reusePendingPromises(
fn: (...args: Args) => PromiseLike
): (...args: Args) => Promise
`Returns a function that will return the same
Promise for calls with the same parameters if the Promise is pending.It generates cache keys based on the
options.createKey function,
The default value of options.createKey is a stable JSON.stringify implementation.$3
#### StatefulPromise
`ts
enum StatefulPromiseState {
Pending = 'pending'
, Fulfilled = 'fulfilled'
, Rejected = 'rejected'
}class StatefulPromise extends Promise {
static from(promise: PromiseLike): StatefulPromise
get state(): StatefulPromiseState
constructor(
executor: (
resolve: (value: T) => void
, reject: (reason: any) => void
) => void
)
isPending(): boolean
isFulfilled(): boolean
isRejected(): boolean
}
`A subclass of
Promise used for testing, helps you understand the state of Promise.#### Channel
`ts
class Channel implements IBlockingChannel
`Implement MPMC(multi-producer, multi-consumer) FIFO queue communication with
Promise and AsyncIterable.-
send
Send value to the channel, block until data is taken out by the consumer.
- receive
Receive value from the channel.
- close
Close the channel.If the channel closed,
send and receive will throw ChannelClosedError.
AsyncIterator that have already been created do not throw ChannelClosedError,
but return { done: true }.`ts
const chan = new Channel()
queueMicrotask(() => {
await chan.send('hello')
await chan.send('world')
})
for await (const value of chan.receive()) {
console.log(value)
}
`#### BufferedChannel
`ts
class BufferedChannel implements IBlockingChannel {
constructor(bufferSize: number)
}
`Implement MPMC(multi-producer, multi-consumer) FIFO queue communication with
Promise and AsyncIterable.
When the amount of data sent exceeds bufferSize, send will block until data in buffer is taken out by the consumer.-
send
Send value to the channel.
If the buffer is full, block.
- receive
Receive value from the channel.
- close
Close the channel.If the channel closed,
send and receive will throw ChannelClosedError.
AsyncIterator that have already been created do not throw ChannelClosedError,
but return { done: true }.`ts
const chan = new BufferedChannel(1)queueMicrotask(() => {
await chan.send('hello')
await chan.send('world')
})
for await (const value of chan.receive()) {
console.log(value)
}
`#### UnlimitedChannel
`ts
class UnlimitedChannel implements INonBlockingChannel
`Implement MPMC(multi-producer, multi-consumer) FIFO queue communication with
Promise and AsyncIterable.UnlimitedChannel return a tuple includes three channel functions:
- send
Send value to the channel.
There is no size limit on the buffer, all sending will return immediately.
- receive
Receive value from the channel.
- close
Close the channel.If the channel closed,
send and receive will throw ChannelClosedError.
AsyncIterator that have already been created do not throw ChannelClosedError,
but return { done: true }.`ts
const chan = new UnlimitedChannel()queueMicrotask(() => {
chan.send('hello')
chan.send('world')
})
for await (const value of chan.receive()) {
console.log(value)
}
`#### Deferred
`ts
class Deferred implements PromiseLike, IDeferred
`Deferred is a Promise that separates resolve() and reject() from the constructor.#### MutableDeferred
`ts
class MutableDeferred implements PromiseLike, IDefrred
`MutableDeferred is similar to Deferred,
but its resolve() and reject() can be called multiple times to change the value.`ts
const deferred = new MutableDeferred()
deferred.resolve(1)
deferred.resolve(2)await deferred // resolved(2)
`#### ReusableDeferred
`ts
class ReusableDeferred implements PromiseLike, IDeferred
`ReusableDeferred is similar to MutableDeferred,
but its internal Deferred will be overwritten with a new pending Deferred after each call.`ts
const deferred = new ReusableDeferred()
deferred.resolve(1)
queueMicrotask(() => deferred.resolve(2))await deferred // pending, resolved(2)
`#### DeferredGroup
`ts
class DeferredGroup implements IDeferred {
add(deferred: IDeferred): void
remove(deferred: IDeferred): void
clear(): void
}
`#### LazyPromise
`ts
class LazyPromise implements PromiseLike {
then: PromiseLike['then'] constructor(
executor: (resolve: (value: T) => void
, reject: (reason: any) => void) => void
)
}
`LazyPromise constructor is the same as Promise.The difference with
Promise is that LazyPromise only performs executor after then method is called.#### Semaphore
`ts
type Release = () => voidclass Semaphore {
constructor(count: number)
acquire(): Promise
acquire(handler: () => Awaitable): Promise
}
`#### Mutex
`ts
type Release = () => voidclass Mutex extends Semaphore {
acquire(): Promise
acquire(handler: () => Awaitable): Promise
}
`#### DebounceMicrotask
`ts
class DebounceMicrotask {
queue(fn: () => void): void
cancel(fn: () => void): boolean
}
`queue can create a microtask,
if the microtask is not executed, multiple calls will only queue it once.cancel can cancel a microtask before it is executed.#### DebounceMacrotask
`ts
class DebounceMacrotask {
queue(fn: () => void): void
cancel(fn: () => void): boolean
}
`queue can create a macrotask,
if the macrotask is not executed, multiple calls will only queue it once.cancel can cancel a macrotask before it is executed.#### TaskRunner
`ts
class TaskRunnerDestroyedError extends CustomError {}class TaskRunner {
constructor(
concurrency: number = Infinity
, rateLimit?: {
duration: number
limit: number
}
)
/**
* @throws {TaskRunnerDestroyedError}
*/
run(task: (signal: AbortSignal) => Awaitable, signal?: AbortSignal): Promise
destroy(): void
}
``A task runner, it will execute tasks in FIFO order.