<!-- markdownlint-disable MD033 --> <p align="center"> <a href="https://www.npmjs.com/package/@mgcrea/prisma-queue"> <img src="https://img.shields.io/npm/v/@mgcrea/prisma-queue.svg?style=for-the-badge" alt="npm version" /> </a> <a href="https://
npm install @mgcrea/prisma-queueSimple, reliable and efficient concurrent work queue for Prisma + PostgreSQL
- Leverages PostgreSQL SKIP LOCKED feature to reliably dequeue jobs
- Supports crontab syntax for complex scheduled jobs
- Written in TypeScript for static type checking with exported types along the library.
- Built by tsup to provide both CommonJS and ESM packages.
``bash`
npm install @mgcrea/prisma-queue --saveor
pnpm add @mgcrea/prisma-queue
1. If you use an old version of Prisma ranging from 2.29.0 to 4.6.1 (included), you must first add "interactiveTransactions" to your schema.prisma client configuration:
`prisma`
generator client {
provider = "prisma-client-js"
previewFeatures = ["interactiveTransactions"]
}
1. Append the QueueJob model to your schema.prisma file
1. Create your queue
`ts
type JobPayload = { email: string };
type JobResult = { status: number };
export const emailQueue = createQueue
const { id, payload } = job;
console.log(Processing job#${id} with payload=${JSON.stringify(payload)}));Failed for some unknown reason
// await someAsyncMethod();
await job.progress(50);
const status = 200;
if (Math.random() > 0.5) {
throw new Error();Finished job#${id} with status=${status}
}
console.log();`
return { status };
});
- Queue a job
`ts
import { emailQueue } from "./emailQueue";
const main = async () => {
const job = await emailQueue.enqueue({ email: "foo@bar.com" });
};
main();
`
- Schedule a recurring job
`ts
import { emailQueue } from "./emailQueue";
const main = async () => {
const nextJob = await queue.schedule(
{ key: "email-schedule", cron: "5 5 *" },
{ email: "foo@bar.com" },
);
};
main();
`
- Start queue processing (usually in another process)
`ts
import { emailQueue } from "./emailQueue";
const main = async () => {
await queue.start();
};
main();
`
When using this library in edge environments (Cloudflare Workers, Vercel Edge Functions, etc.) where Prisma's DMMF (Datamodel Meta Format) may not be available, you should explicitly provide the tableName option:
`ts`
export const emailQueue = createQueue
{
name: "email",
tableName: "queue_job", // Explicit table name for edge environments
},
async (job, client) => {
// ...
},
);
The library will automatically fall back to a snake_case conversion of the model name (e.g., QueueJob → queue_job) if DMMF is unavailable, but providing tableName explicitly is recommended for edge deployments.
You can easily spin of your workers in separate threads using worker_threads (Node.js >= 12.17.0).
It enables you to fully leverage your CPU cores and isolate your main application queue from potential memory leaks or crashes.
`ts
import { JobPayload, JobResult, PrismaJob } from "@mgcrea/prisma-queue";
import { Worker } from "node:worker_threads";
import { ROOT_DIR } from "src/config/env";
import { log } from "src/config/log";
const WORKER_SCRIPT = ${ROOT_DIR}/dist/worker.js;
export const processInWorker = async
(
job: PrismaJob
,
): Promise
new Promise((resolve, reject) => {
const workerData = getJobWorkerData(job);
log.debug(Starting worker thread for job id=${job.id} in queue=${job.record.queue});
try {
const worker = new Worker(WORKER_SCRIPT, {
workerData,
});
worker.on("message", resolve);
worker.on("error", reject);
worker.on("exit", (code) => {
if (code !== 0) {
reject(
new Error(
Worker for job id=${job.id} in queue=${job.record.queue} stopped with exit code ${code},
),
);
}
});
} catch (error) {
reject(error as Error);
}
});
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type JobWorkerData
= {
id: bigint;
payload: P;
queue: string;
};
const getJobWorkerData =
(job: PrismaJob
): JobWorkerData => {
// Prepare the job data for structured cloning in worker thread
return {
id: job.id,
payload: job.payload,
queue: job.record.queue,
};
};
`
- worker.ts
`ts
import { parentPort, workerData } from "node:worker_threads";
import { log } from "src/config/log";
import { workers } from "src/queue";
import { type JobWorkerData } from "src/utils/queue";
import { logMemoryUsage } from "./utils/system";
log.info(Worker thread started with data=${JSON.stringify(workerData)});
const typedWorkerData = workerData as JobWorkerData;
const { queue } = typedWorkerData;
const workerName = queue.replace(/Queue$/, "Worker") as keyof typeof workers;
log.debug(Importing worker ${workerName} for queue=${queue});No worker found for queue=${queue}
const jobWorker = workers[workerName];
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (!jobWorker) {
log.error();
process.exit(1);
}
log.info(Running worker for queue=${queue});Worker for queue=${queue} completed with result=${JSON.stringify(result)}
const result = await jobWorker(typedWorkerData);
log.info();`
parentPort?.postMessage(result);
process.exit(0);
- Olivier Louvignes <
Inspired by
- pg-queue by
`txt
The MIT License
Copyright (c) 2022 Olivier Louvignes
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
``