A unified queue interface supporting RabbitMQ and Redis-SMQ with direct and broadcast modes
npm install ez-queue-pipe> 🎉 v2.2 重磅更新!新增 Kafka/Redpanda 支持,四大后端任你选择!
一个极简的统一队列接口库,支持 RabbitMQ、Redis-SMQ、BullMQ 和 Kafka/Redpanda 四种后端。
- 🎯 极简配置: 只需一个 queue_name 参数,自动判断路由模式
- 不含通配符 → 轮询模式(负载均衡)
- 含通配符 → 广播模式(所有消费者接收)
- 🚀 四大后端: 支持 RabbitMQ、Redis-SMQ、BullMQ 和 Kafka/Redpanda
- RabbitMQ: 企业级首选,功能完善
- Redis-SMQ: 轻量级方案,部署简单
- BullMQ: 高性能,基于 Redis
- Kafka/Redpanda: 超高吞吐量,支持消息回溯 🆕
- ⚡ 超高性能: Kafka/Redpanda 每秒可处理数百万条消息
- 💾 消息持久化: Kafka/Redpanda 支持长期存储和消息回溯
- ⏰ 延迟队列: 支持消息延迟投递
- 🔄 重试机制: 内置消息重试功能(固定延迟/指数退避)
- 🧹 自动清理: 广播队列自动清理,避免僵尸队列
- 🏷️ 智能路由: 支持复杂的 topic 匹配模式(* 和 # 通配符)
- 📦 TypeScript 支持: 完整的类型定义
- 🧪 易于测试: 提供完整的测试用例和示例
``bash`
npm install ez-queue-pipe
多个消费者轮流处理消息,实现负载均衡。
`typescript
import { createQueue } from 'ez-queue-pipe';
// 消费者 1 和 2 共享队列
const consumer1 = await createQueue({
queue_name: 'task', // ✨ 不含通配符 → 自动轮询
host: 'localhost',
username: 'guest',
password: 'guest',
});
await consumer1.setConsumer(async (data) => {
console.log('Consumer 1 processing:', data);
});
const consumer2 = await createQueue({
queue_name: 'task', // 同样的 queue_name
host: 'localhost',
});
await consumer2.setConsumer(async (data) => {
console.log('Consumer 2 processing:', data);
});
// 生产者
const producer = await createQueue({
queue_name: 'task',
host: 'localhost',
});
const p = await producer.getProducer();
await p.publish({ task: 'process_data' });
// 结果:Consumer 1 和 2 轮流处理消息
`
所有消费者都接收到相同的消息。
`typescript
// 消费者 1 和 2 独立队列
const consumer1 = await createQueue({
queue_name: 'notify.*', // ✨ 含通配符 → 自动广播
host: 'localhost',
});
const consumer2 = await createQueue({
queue_name: 'notify.*', // 同样的通配符模式
host: 'localhost',
});
// 生产者
const producer = await createQueue({
queue_name: 'notify.*',
host: 'localhost',
});
const p = await producer.getProducer();
await p.publish({ message: 'System update' });
// 结果:Consumer 1 和 2 都收到消息
`
精确匹配特定的消息类型。
`typescript
// VIP 订单消费者
const vipConsumer = await createQueue({
queue_name: 'order.vip',
host: 'localhost',
});
// 普通订单消费者
const normalConsumer = await createQueue({
queue_name: 'order.normal',
host: 'localhost',
});
// 发送 VIP 订单
const producer = await createQueue({
queue_name: 'order.vip',
host: 'localhost',
});
await (await producer.getProducer()).publish({ orderId: 123 });
// 结果:只有 vipConsumer 收到消息
`
- 极简队列名设计方案 - 推荐阅读
- v1.x 到 v2.0 迁移指南
- BullMQ 使用指南 - BullMQ 用户必读
- Kafka/Redpanda 使用指南 - 🆕 Kafka/Redpanda 用户必读
- Redis-SMQ 使用指南 - Redis-SMQ 用户必读
- Redis-SMQ 广播模式启动顺序 - 生产者/消费者启动顺序最佳实践
- MQ 性能测试指南 - 性能优化必读
- 延迟消息配置指南
- 重试机制指南
- 僵尸队列清理指南
| 格式 | 说明 | 队列类型 | 示例 |
|------|------|---------|------|
| order | 精确名称 | 共享队列(轮询) | 所有消费者轮流处理 |order.vip
| | 精确 topic | 共享队列(轮询) | 特定 topic 的消费者轮流处理 |order.*
| | 单级通配符 | 独占队列(广播) | 每个消费者独立接收 |order.#
| | 多级通配符 | 独占队列(广播) | 每个消费者独立接收 |
- *:匹配一个单词order.*
- 匹配:order.vip、order.normalorder.*
- 不匹配:order、order.vip.china
- #:匹配零个或多个单词order.#
- 匹配:order、order.vip、order.vip.china
`typescript
const producer = await createQueue({
queue_name: 'task',
host: 'localhost',
});
const p = await producer.getProducer();
await p.publish(
{ task: 'delayed_task' },
{
delay: 5000, // 5 秒后投递
}
);
`
`typescript
const consumer = await createQueue({
queue_name: 'task',
host: 'localhost',
default_retry_count: 3, // 默认重试 3 次
retry_delay_strategy: 'exponential', // 指数退避
});
await consumer.setConsumer(async (data) => {
// 处理失败会自动重试
await processTask(data);
});
const producer = await createQueue({
queue_name: 'task',
host: 'localhost',
});
const p = await producer.getProducer();
await p.publish(
{ task: 'important_task' },
{
retry: 5, // 单独配置重试次数
}
);
`
`typescript`
// 广播队列自动清理(60秒无消费者自动删除)
const consumer = await createQueue({
queue_name: 'temp.*', // 含通配符,自动配置清理
host: 'localhost',
queue_ttl: 30000, // 自定义 30 秒
});
`typescript
import { createQueue, QueueBackend } from 'ez-queue-pipe';
// 任务队列(轮询 - Point-to-Point)
const taskQueue = await createQueue({
queue_name: 'task',
host: 'localhost',
port: 6379, // Redis 端口
backend: QueueBackend.REDIS_SMQ,
});
await taskQueue.setConsumer(async (data) => {
console.log('Processing:', data);
});
// 系统广播(Pub/Sub)
const notifyQueue = await createQueue({
queue_name: 'notify.*', // 含通配符 → Pub/Sub
host: 'localhost',
port: 6379,
backend: QueueBackend.REDIS_SMQ,
});
await notifyQueue.setConsumer(async (data) => {
console.log('Notification:', data);
});
// 优先级队列
const priorityProducer = await createQueue({
queue_name: 'priority',
host: 'localhost',
port: 6379,
backend: QueueBackend.REDIS_SMQ,
});
const p = await priorityProducer.getProducer();
await p.publish({ task: 'urgent' }, { priority: 10 }); // 高优先级
await p.publish({ task: 'normal' }, { priority: 5 }); // 中优先级
`
Redis-SMQ 特性:
- ✅ 不含通配符 → Point-to-Point(轮询)
- ✅ 含通配符 → Pub/Sub(广播)
- ✅ 支持延迟消息
- ✅ 支持优先级队列
- ✅ 内置重试机制
- ✅ 轻量级,基于 Redis
`typescript
import { createQueue, QueueBackend } from 'ez-queue-pipe';
// 高吞吐量任务队列
const taskQueue = await createQueue({
queue_name: 'task',
host: 'localhost',
port: 9092, // Kafka/Redpanda 端口
backend: QueueBackend.KAFKA, // 或 QueueBackend.REDPANDA
});
await taskQueue.setConsumer(async (data) => {
console.log('Processing:', data);
});
// 事件流处理
const eventQueue = await createQueue({
queue_name: 'events.*', // 含通配符 → 广播
host: 'localhost',
port: 9092,
backend: QueueBackend.REDPANDA, // Redpanda 性能更好
});
await eventQueue.setConsumer(async (data) => {
console.log('Event:', data);
});
// 发送消息
const producer = await createQueue({
queue_name: 'task',
host: 'localhost',
port: 9092,
backend: QueueBackend.KAFKA,
});
const p = await producer.getProducer();
await p.publish({ taskId: 1, data: 'Task 1' });
`
Kafka/Redpanda 特性:
- ✅ 不含通配符 → Consumer Group(轮询)
- ✅ 含通配符 → 多个 Consumer Group(广播)
- ✅ 超高吞吐量(每秒数百万条消息)
- ✅ 消息持久化和回溯
- ✅ 水平扩展
- ✅ Redpanda 性能更好,无需 JVM
| 参数 | 类型 | 必填 | 描述 |
|------|------|------|------|
| queue_name | string | ✅ | 队列名称(支持通配符 * 和 #) |host
| | string | ✅ | 主机地址 |port
| | number | ❌ | 端口号(RabbitMQ: 5672, Redis: 6379) |password
| | string | ❌ | 密码 |username
| | string | ❌ | 用户名(RabbitMQ) |backend
| | QueueBackend | ❌ | 后端类型:rabbitmq 或 redis-smq(默认:rabbitmq) |db
| | number | ❌ | 数据库编号(Redis) |vhost
| | string | ❌ | 虚拟主机(RabbitMQ,默认:/) |namespace
| | string | ❌ | 命名空间(Redis-SMQ) |instance_id
| | string | ❌ | 实例ID(自动生成) |queue_ttl
| | number | ❌ | 队列过期时间(毫秒),含通配符默认 60000ms |auto_delete
| | boolean | ❌ | 是否自动删除无消费者的队列(含通配符默认true) |default_retry_count
| | number | ❌ | 默认重试次数 |retry_delay_strategy
| | 'fixed' \| 'exponential' | ❌ | 重试延迟策略(默认:exponential) |
| 参数 | 类型 | 描述 |
|------|------|------|
| ttl | number | 消息TTL(毫秒) |delay
| | number | 延迟投递(毫秒) |retry
| | number | 重试次数 |priority
| | number | 优先级 |callback
| | function | 回调函数 |headers
| | Record
系统会自动生成唯一的 topic 名称,格式为:
``
${queue_name}.${hostname}.${process.pid}.${instance_id}
- queue_name: 队列名称hostname
- : 主机名process.pid
- : 进程IDinstance_id
- : 实例ID(可选,用于区分同一进程内的多个实例)
广播模式支持自动清理僵尸队列,避免进程重启后遗留的队列堆积:
typescript
const config: QueueConfig = {
queue_name: 'my_queue',
mode: QueueMode.BROADCAST,
auto_delete: true, // 无消费者时自动删除(默认 true)
queue_ttl: 60000, // 60秒无消费者自动过期(默认 60000ms)
}
`- 广播模式:默认启用
autoDelete=true 和 60秒队列过期时间
- Direct 模式:使用持久化队列,确保消息不丢失重试机制
支持灵活的消息重试策略,自动处理失败消息:
$3
`typescript
const config: QueueConfig = {
queue_name: 'my_queue',
default_retry_count: 3, // 默认重试3次
retry_delay_strategy: 'exponential', // 指数退避策略
}// 发布消息时指定重试次数
await producer.publish(data, {
retry: 3, // 重试3次
delay: 1000, // 首次延迟
})
`$3
1. 指数退避(exponential,默认)
- 重试延迟:1s, 2s, 4s, 8s, 16s...
- 适用于大多数场景
2. 固定延迟(fixed)
- 每次重试固定延迟 1 秒
- 适用于需要快速重试的场景
$3
重试消息会自动添加以下头部信息:
-
x-retry-count: 当前重试次数
- x-max-retries: 最大重试次数
- x-original-queue: 原始队列名称
- x-error-message: 错误信息$3
超过最大重试次数后,消息会自动发送到死信队列(DLX),避免无限重试。
后端支持
$3
- 支持 Direct 和 Broadcast 模式
- 支持消息TTL、延迟投递、优先级
- 需要安装 rabbitmq-delayed-message-exchange 插件以支持延迟投递
- 适用场景:企业级应用,复杂路由规则$3
- 支持 Direct 和 Broadcast 模式
- 内置延迟队列支持
- 支持消息重试和死信队列
- 适用场景:轻量级部署,简单场景$3
- 支持 Direct 和 Broadcast 模式
- 基于 Redis,性能优秀
- 内置延迟和重试机制
- 适用场景:高性能需求,快速开发$3
- 支持 Consumer Group 和多 Consumer Group 模式
- 超高吞吐量(每秒数百万条消息)
- 消息持久化和回溯
- Redpanda 性能更好,无需 JVM
- 适用场景:大规模数据流,事件溯源,实时分析多实例支持
`typescript
// 创建多个实例
const instances = [];
for (let i = 1; i <= 3; i++) {
const config = {
...baseConfig,
instance_id: worker-${i}
};
const queue = await createQueue(config);
await queue.setConsumer(async (data) => {
console.log(Worker ${i} received:, data);
});
instances.push(queue);
}// 使用第一个实例作为生产者
const producer = await instances[0].getProducer();
// 发布消息(会被负载均衡到不同的worker)
for (let i = 1; i <= 10; i++) {
await producer.publish({ task: i, data:
Task ${i} });
}
`错误处理
`typescript
try {
const queue = await createQueue(config);
await queue.setConsumer(async (data) => {
// 处理消息
if (someErrorCondition) {
throw new Error('Processing failed');
}
});
} catch (error) {
console.error('Queue error:', error);
}
`测试
`bash
npm test
``MIT License
欢迎提交 Issue 和 Pull Request!
详见 CHANGELOG.md