a tiny durable work queue and timers
npm install nightcoral
#
NightCoral is a job system in typescript for tiny-to-small servers. It consists of:
- a worker pool (of threads)
- a persistent queue (of jobs)
- persistent timers
It uses sqlite3 for storing the jobs and timers, and optionally reports metrics using crow. Jobs are executed "at least once", meaning any jobs that were in progress during a sudden shutdown (or crash) will be run again when the server restarts. If a job is able to run and complete during a single server session, it may optionally return status information to the primary server thread.
Only one process at a time can run nightcoral; multi-node coordination is a non-goal.
```
npm install
npm test
Here's a quick example that starts up a pool of 5 worker threads, hands them 10 jobs, and sets an 11th job to execute after one minute:
`
type Job = { task: string, value: number };
// put persistent storage in ./data/nightcoral.db. workers will eachstate
// receive their own copy of the object.
const nc = new NightCoral
databaseFilename: "./data/nightcoral.db",
workerConfig: { redisPassword: "welcome" },
});
// start 5 threads, each running "worker.mjs"
await nc.launch("./lib/worker.mjs", 5);
// post 5 jobs, and one timer that will trigger in one minute.
for (let i = 0; i < 10; i++) nc.add({ task: "count", value: i });
nc.addTimer(60_000, { task: "alarm", value: 500 });
`
Configure the job system without launching any tasks. The sqlite database will be created if necessary.
There are two important fields in the options object:
- databaseFilename: string
Filename for the sqlite database. ":memory:" will use a non-persistent in-memory store that evaporates when the process ends, which may be useful for testing.
- workerConfig: any
Each worker will receive this object (passed via JSON) when it starts up, by calling its exported setup function (if it exists). The resulting per-thread state is attached to each incoming job, so it can be used for things like persistent database connections.
And there are several others you might want to change:
- jobIsJson: boolean (true)
The persistent queue normally converts jobs to their JSON string representation in the database. If your jobs are already strings, set this to false.
- cacheJobs: number (50)
The database will batch-read this many jobs at a time, to avoid hitting the database often when the queue is moving fast.
- logger?: (text: string) => voiderrorLogger?: (error: Error | undefined, text: string) => void
- traceLogger?: (text: string) => void
-
These functions are called to log events, if defined. The error logger is called on error, and the trace logger is called for detailed operations that usually only matter if you're debugging a problem.
- notifyError?: (error: WorkerPoolError) => void
This function is called if a worker thread dies or is killed. It may be useful if you have an event log or an alerting system.
- describeJob: (job: J) => string (default: JSON.stringify)
Use this to customize the description of a job in trace logging.
- metrics?: Metrics
To track counters and gauges, a crow-metrics object can be used.
- pingFrequency: number (60000 = 1 minute)pingTimeout: number
- (30000 = 30 seconds)
How often (in milliseconds) should we ping workers to make sure they're responsive, and how long should we wait for a response before considering that worker to be dead?
- replyTimeout: number (30000 = 30 seconds)
How long should a promise wait on an addQuery worker before giving up and rejecting with an error?
- workerDeathThreshold: number (10)workerDeathDuration: number
- (15000 = 15 seconds)
If workerDeathThreshold workers die within workerDeathDuration, assume something is wrong, and shutdown.
- fakeWorker?: (job: J) => Promise
For testing, you can stub out the worker pool completely and use an inline callback.
- localHandler?: (job: J) => A | null
If you'd like to handle certain returned jobs locally, instead of pushing them to new workers, define this function and make it return null if it handled the job locally. Otherwise the job will be handed to workers. Usually this is just for tests.
Start the worker pool, with count workers executing the javascript file at workerCodePath. The promise is fulfilled once all the workers are active and ready to receive jobs. The worker interface is described below.
Post a job to the worker pool. The returned object contains its unique id and a delete() method to try to delete the job before it's executed.
Post a job to the worker pool, like add, but also expect a reply in the reply field of the returned QueuedQueryJob. This is a promise that is fulfilled when the job is complete. The promise will return an error if the worker threw an error, or takes longer than replyTimeout to reply.
Store job until the absolute time in expires. The returned object has a delete() method to cancel the job before the timer expires.
Store job until timeout milliseconds from now. The returned object has a delete() method to cancel the job before the timer expires.
Resolves the next time all workers are idle (no jobs are running).
Shuts down the pool.
The workerCodePath passed to launch must be a javascript file with these exported functions:
- export async function setup (optional)
If defined, this function is called with the workerConfig object passed into nightcoral's constructor, and must return a new object that will be passed to the handler function with each job. This can be used to build global state (like a database connection) that will be used for each job the worker handles.
- export async function handler
This function is called for each job received. A worker is considered "busy" until this function's promise resolves, and won't be given another job until then. state is whatever was returned from the setup function, if that existed; otherwise, it's the original state from the nightcoral constructor.
If the handler returns a job (J), it will be used to fulfill the promise returned from the top-level add() function. This can be a useful optimization for higher-level APIs where you'd like to return a successful status message if the background task completes within a short time: add your status field to the job and return it. Also, this will only work if the job is created and executed within the same server execution -- in other words, if the server hasn't crashed or shutdown in the meantime; otherwise, the job will execute when the server restarts, but there will be no one around to hear the reply.
A WorkerPortal is passed to each function, for communicating with the pool's coordinator. It has these fields:
- workerId: number -- the id of this workerpostJob(job: J): void
- -- post a new job to the queuepostJobs(jobs: J[]): void
- -- post a batch of new jobs to the queuepostJobAfter(timeout: number, job: J): void
- -- post a job to be executed after a delay (on a timer)postJobAt(expires: number, job: J): void
- -- post a job to be executed at (or after) a specific absolute time
- pool-workers: total number of worker threads runningpool-workers-idle
- : number of worker threads available (not currently working)pool-jobs-retired
- : count of jobs executed since startuppool-job-time
- : timing distribution of how long jobs take when executedpool-ping-msec
- : timing distribution of how long workers are taking to respond to periodic pingspool-workers-died
- : count of workers that have died unexpectedly or were killed for not responding to a ping in timequeue_size
- : number of jobs waiting in the queue (normally 0)queue_processing
- : number of jobs currently executingqueue_latency
- : milliseconds the last job sat in the queue before being executedtimer_count
- : number of outstanding (not expired) timerstimer_idle_workers
- : number of tasks currently polling timers (normally 1)timer_latency
- : milliseconds between the last expired timer and executing its job
Apache 2 (open-source) license, included in LICENSE.txt`.
- https://messydesk.social/@robey - Robey Pointer \