QueueBear SDK for message queues, scheduled jobs, and durable workflows
npm install queuebearQueueBear SDK for building durable workflows and managing message queues.
``bash`
npm install queuebear
`typescript
import { QueueBear, serve } from "queuebear";
const qb = new QueueBear({
apiKey: "qb_live_xxx",
projectId: "proj_xxx",
});
`
The SDK provides access to all QueueBear APIs:
| API | Description |
| -------------- | ----------------------------------------------- |
| qb.messages | Publish and manage webhook messages |qb.schedules
| | Create and manage cron-based recurring jobs |qb.dlq
| | Manage failed messages in the dead letter queue |qb.workflows
| | Trigger and manage durable workflows |
---
Publish messages to be delivered to webhook destinations with automatic retries.
`typescript`
const { messageId } = await qb.messages.publish(
"https://api.example.com/webhook",
{ event: "user.created", userId: "123" },
{
delay: "30s", // Delay before delivery
retries: 5, // Number of retry attempts
method: "POST", // HTTP method
headers: { "X-API-Key": "secret" }, // Headers to forward
callbackUrl: "https://...", // Success callback
failureCallbackUrl: "https://...", // Failure callback
deduplicationId: "unique-id", // Prevent duplicate messages
}
);
`typescript`
const message = await qb.messages.get(messageId);
console.log(message.status); // "pending" | "completed" | "failed"
console.log(message.deliveryLogs); // Delivery attempt history
`typescript`
const { messages, pagination } = await qb.messages.list({
status: "pending",
limit: 20,
offset: 0,
});
`typescript`
await qb.messages.cancel(messageId);
`typescript`
const message = await qb.messages.publishAndWait(
"https://api.example.com/webhook",
{ event: "user.created" },
{ timeoutMs: 30000 }
);
console.log(message.status); // "completed"
---
Create cron-based recurring jobs.
`typescript`
const schedule = await qb.schedules.create({
destination: "https://api.example.com/cron-job",
cron: "0 9 *", // Daily at 9 AM
timezone: "America/New_York",
method: "POST",
body: JSON.stringify({ type: "daily-report" }),
headers: { "Content-Type": "application/json" },
retries: 3,
metadata: { jobName: "daily-report" },
});
| Expression | Description |
| ------------- | ----------------------- |
| * | Every minute |0
| | Every hour |0 9 *
| | Daily at 9:00 AM |0 9 1-5
| | Weekdays at 9:00 AM |0 0 1
| | First day of each month |0 /6
| | Every 6 hours |
`typescript`
const { schedules } = await qb.schedules.list();
`typescript`
await qb.schedules.pause(scheduleId);
await qb.schedules.resume(scheduleId);
`typescript`
await qb.schedules.delete(scheduleId);
---
Manage messages that failed all retry attempts.
`typescript${entry.id}: ${entry.failureReason}
const { entries } = await qb.dlq.list();
for (const entry of entries) {
console.log();`
}
`typescript`
const entry = await qb.dlq.get(dlqId);
console.log(entry.body); // Original message body
console.log(entry.totalAttempts); // Number of failed attempts
`typescript`
const result = await qb.dlq.retry(dlqId);
console.log(result.newMessageId); // New message created
`typescript`
await qb.dlq.delete(dlqId);
await qb.dlq.purge(); // Delete all entries
`typescriptRetried ${results.length} entries
const results = await qb.dlq.retryAll();
console.log();`
---
Build durable, fault-tolerant workflows with automatic step caching.
Workflows consist of two parts:
1. Workflow endpoint - Created with serve(), handles workflow executionqb.workflows
2. Client - Uses to trigger and manage workflow runs
---
The serve() function creates an HTTP handler for your workflow. It receives requests from QueueBear, executes your workflow code, and manages step caching automatically.
`typescript
import { serve } from "queuebear";
export const POST = serve
// Your workflow logic here
return result;
}, options);
`
Parameters:
| Parameter | Type | Description |
| --------- | --------------------------------------------- | ---------------------- |
| handler | (context: WorkflowContext | Your workflow function |options
| | ServeOptions | Optional configuration |
Options:
| Option | Type | Description |
| --------------- | -------- | --------------------------------------------- |
| signingSecret | string | Secret to verify requests come from QueueBear |
Next.js (App Router)
`typescript
// app/api/workflows/my-workflow/route.ts
import { serve } from "queuebear";
export const POST = serve(async (context) => {
await context.run("step-1", async () => {
/ ... /
});
return { success: true };
});
`
Express
`typescript
import express from "express";
import { serve } from "queuebear";
const app = express();
app.use(express.json());
const handler = serve(async (context) => {
await context.run("step-1", async () => {
/ ... /
});
return { success: true };
});
app.post("/api/workflows/my-workflow", async (req, res) => {
const response = await handler(
new Request(req.url, {
method: "POST",
headers: req.headers as HeadersInit,
body: JSON.stringify(req.body),
})
);
res.status(response.status).json(await response.json());
});
`
Hono
`typescript
import { Hono } from "hono";
import { serve } from "queuebear";
const app = new Hono();
const handler = serve(async (context) => {
await context.run("step-1", async () => {
/ ... /
});
return { success: true };
});
app.post("/api/workflows/my-workflow", async (c) => {
const response = await handler(c.req.raw);
return response;
});
`
---
`typescript
// app/api/workflows/onboarding/route.ts
import { serve } from "queuebear";
interface OnboardingInput {
userId: string;
email: string;
}
export const POST = serve
async (context) => {
const { userId, email } = context.input;
// Step 1: Send welcome email (cached if already done)
await context.run("send-welcome", async () => {
await sendEmail(email, "welcome");
});
// Step 2: Wait 3 days
await context.sleep("wait-3-days", 60 60 24 * 3);
// Step 3: Send tips email
await context.run("send-tips", async () => {
await sendEmail(email, "tips");
});
return { completed: true };
},
{
signingSecret: process.env.QUEUEBEAR_SIGNING_SECRET,
}
);
`
`typescript`
const { runId } = await qb.workflows.trigger(
"user-onboarding",
"https://your-app.com/api/workflows/onboarding",
{ userId: "123", email: "user@example.com" },
{
idempotencyKey: "onboarding-user-123",
maxDuration: 60 60 24 * 7, // 7 day timeout
}
);
`typescript`
const status = await qb.workflows.getStatus(runId);
console.log(status.status); // "running" | "sleeping" | "completed"
console.log(status.steps); // Array of step details
`typescript`
const result = await qb.workflows.waitForCompletion(runId, {
pollIntervalMs: 2000,
timeoutMs: 60000,
});
`typescript`
const result = await qb.triggerAndWait(
"user-onboarding",
"https://your-app.com/api/workflows/onboarding",
{ userId: "123" },
{ timeoutMs: 120000 }
);
console.log(result.result); // Workflow output
`typescript`
await qb.workflows.cancel(runId);
await qb.workflows.retry(runId); // Resume from last completed step
`typescript
// In workflow: await context.waitForEvent("order-approved", "order.approved")
// From external code:
await qb.workflows.sendEvent("order.approved", {
eventKey: "order-123",
payload: { status: "approved" },
});
`
---
Available in serve() handlers:
Execute a step with automatic caching.
`typescript`
const result = await context.run("fetch-user", async () => {
return await db.users.findById(userId);
});
Pause workflow for specified duration.
`typescript`
await context.sleep("wait-1-hour", 3600);
Pause until a specific date/time.
`typescript`
await context.sleepUntil("wait-until-tomorrow", new Date("2024-01-15"));
Make an HTTP call as a cached step.
`typescript`
const data = await context.call("fetch-api", {
url: "https://api.example.com/data",
method: "POST",
headers: { Authorization: "Bearer xxx" },
body: { key: "value" },
});
Wait for an external event.
`typescript`
const payload = await context.waitForEvent("wait-approval", "order.approved", {
eventKey: "order-123",
timeoutSeconds: 86400, // 1 day
});
Send fire-and-forget event.
`typescript`
await context.notify("user.onboarded", { userId: "123" });
Execute steps in parallel.
`typescript`
const [user, orders, preferences] = await context.parallel([
{ name: "fetch-user", fn: () => fetchUser(userId) },
{ name: "fetch-orders", fn: () => fetchOrders(userId) },
{ name: "fetch-preferences", fn: () => fetchPreferences(userId) },
]);
Get all completed steps for debugging.
`typescriptCompleted ${steps.length} steps
const steps = await context.getCompletedSteps();
console.log();`
---
Verify that workflow requests come from your QueueBear instance:
`typescript`
export const POST = serve(handler, {
signingSecret: process.env.QUEUEBEAR_SIGNING_SECRET,
});
The signing secret is available in your QueueBear project settings. When configured, requests without a valid signature will be rejected with a 401 error.
---
When developing locally, your webhook endpoints run on localhost which isn't accessible from QueueBear's servers. Use Tunnelmole to expose your local server - it's free and requires no signup.
Linux, macOS, Windows WSL:
`bash`
curl -O https://install.tunnelmole.com/t357g/install && sudo bash install
Node.js (all platforms, requires Node 16+):
`bash`
npm install -g tunnelmole
`bash`
tmole 3000Output: https://xxxx.tunnelmole.com is forwarding to localhost:3000
`typescript
// Use tunnelmole URL instead of localhost
await qb.messages.publish("https://xxxx.tunnelmole.com/api/webhooks", {
event: "user.created",
userId: "123"
});
// Works for workflows too
await qb.workflows.trigger(
"onboarding",
"https://xxxx.tunnelmole.com/api/workflows/onboarding",
{ userId: "123" }
);
`
- Store your tunnel URL in .env for easy switching between local and productioncallbackUrl
- Both and failureCallbackUrl` need public URLs for local testing
- Tunnel URLs change on restart
---
MIT