A producer-consumer pattern implementation using PQueue for efficient data processing
npm install prod-cons-pqueue基于 PQueue 的高效生产者-消费者模式实现,适用于 Node.js 和现代 JavaScript/TypeScript 环境。
- 🚀 高性能: 基于 PQueue 实现,支持并发控制和任务调度
- 🎯 精确控制: 可配置的槽位数量和并发度
- 🔧 事件驱动: 支持实时状态监控和回调
- 🛡️ 错误处理: 完善的错误处理和恢复机制
- 📊 统计信息: 提供详细的运行状态统计
- 🎨 TypeScript 支持: 完整的类型定义和开发体验
- ⏱️ 等待机制: 支持等待特定事件和缓冲区清空
- 🎮 交互式演示: 提供可视化演示工具
``bash`
npm install prod-cons-pqueue
`javascript
import ProdConsPQueue from 'prod-cons-pqueue';
// 创建生产者-消费者实例
const prodCons = new ProdConsPQueue({
slotAmount: 10, // 缓冲区大小
concurrency: 2 // 消费并发度
});
// 监听状态变化
prodCons.on('free-slot-amount-change', (amount) => {
console.log(可用槽位数: ${amount});
});
prodCons.on('blocked-state-change', (isBlocked) => {
console.log(阻塞状态: ${isBlocked ? '阻塞' : '正常'});
});
// 设置消费者
prodCons.consume(async (data) => {
console.log('消费数据:', data);
await new Promise(resolve => setTimeout(resolve, 100));
// 处理数据...
});
// 生产数据
for (let i = 0; i < 5; i++) {
prodCons.produce(async () => {
console.log('生产数据:', i);
await new Promise(resolve => setTimeout(resolve, 50));
return item-${i};`
});
}
`javascript
import ProdConsPQueue from 'prod-cons-pqueue';
const prodCons = new ProdConsPQueue({
slotAmount: 5,
concurrency: 3
});
// 动态调整槽位数量
prodCons.setSlotAmount(20);
// 获取当前状态
console.log(prodCons.getStats());
// {
// bufferLength: 5,
// freeSlotAmount: 15,
// isBlocked: false,
// pendingJobs: 2,
// runningJobs: 1,
// isPaused: false,
// concurrency: 3,
// slotAmount: 20
// }
// 等待特定事件
await prodCons.waitForEvent('blocked-state-change', (value) => value === true);
// 等待缓冲区清空
await prodCons.waitForEmpty();
// 等待消费完成
await prodCons.waitForConsumption();
// 控制功能
await prodCons.pause(); // 暂停消费
await prodCons.start(); // 继续消费
await prodCons.clear(); // 清空缓冲区
await prodCons.destroy(); // 销毁实例
`
`typescript`
constructor(options: ProdConsOptions = {})
参数:
- options.slotAmount (number): 缓冲区槽位数量,默认为 10options.concurrency
- (number): 消费并发度,默认为 1
#### produce(fn: () => Promise
生产数据并放入缓冲区。
参数:
- fn: 返回 Promise 的异步函数,用于产生数据
#### consume(fn: (data: any) => Promise
设置消费者函数。
参数:
- fn: 处理数据的异步函数
#### setSlotAmount(n: number): void
设置缓冲区槽位数量。
#### getStats(): Stats
获取当前状态统计信息。
返回值:
`typescript`
interface Stats {
bufferLength: number; // 缓冲区长度
freeSlotAmount: number; // 可用槽位数
isBlocked: boolean; // 是否阻塞
pendingJobs: number; // 待处理任务数
runningJobs: number; // 正在运行的任务数
isPaused: boolean; // 是否暂停
concurrency: number; // 并发数
slotAmount: number; // 槽位总数
}
#### waitForEmpty(): Promise
等待缓冲区完全清空。
#### waitForConsumption(): Promise
等待当前所有消费操作完成。
#### waitForEvent(eventName: string, condition?: (value: any) => boolean): Promise
等待特定事件触发,可选择性地设置条件过滤。
#### pause(): Promise
暂停消费操作。
#### start(): Promise
继续消费操作。
#### clear(): Promise
清空缓冲区。
#### destroy(): Promise
销毁实例,释放资源。
#### 事件类型
- 'free-slot-amount-change': 空闲槽位数量变化'blocked-state-change'
- : 阻塞状态变化'destroy'
- : 实例销毁
#### on(event: string, callback: (value: any) => void): void
添加事件监听器。
#### off(event: string, callback: (value: any) => void): void
移除事件监听器。
项目包含一个基于 Vue.js 的交互式演示,可视化展示生产者-消费者队列的工作原理:
`bash`启动演示
npm run preview
演示功能:
- 实时可视化任务执行状态
- 动态调整并发数和缓冲区大小
- 观察队列状态变化
- 批量生产和消费操作
`javascript
const taskQueue = new ProdConsPQueue({
slotAmount: 20,
concurrency: 5
});
// 处理文件上传任务
taskQueue.consume(async (file) => {
await uploadFile(file);
});
// 提交上传任务
for (const file of files) {
taskQueue.produce(async () => file);
}
`
`javascript
const dataProcessor = new ProdConsPQueue({
slotAmount: 50,
concurrency: 10
});
// 批量处理数据
dataProcessor.consume(async (dataBatch) => {
const results = await processDataBatch(dataBatch);
return results;
});
// 模拟数据流生成
async function* dataStream() {
for (let i = 0; i < 1000; i++) {
yield { id: i, data: generateRandomData() };
}
}
// 生产数据
for await (const data of dataStream()) {
await dataProcessor.produce(async () => data);
}
`
`javascript
const rateLimitedQueue = new ProdConsPQueue({
slotAmount: 100,
concurrency: 1
});
// 限流API调用
rateLimitedQueue.consume(async (request) => {
const response = await apiCall(request);
return response;
});
// 高频请求
for (let i = 0; i < 1000; i++) {
rateLimitedQueue.produce(async () => ({
id: i,
data: heavyComputation()
}));
}
`
`javascript
const eventProcessor = new ProdConsPQueue({
slotAmount: 10,
concurrency: 3
});
// 等待特定条件的事件
eventProcessor.on('blocked-state-change', (isBlocked) => {
console.log(队列状态: ${isBlocked ? '阻塞' : '运行中'});
});
// 设置消费者
eventProcessor.consume(async (data) => {
console.log('处理数据:', data);
await process(data);
});
// 生产数据并等待处理完成
await eventProcessor.produce(async () => generateData());
await eventProcessor.waitForEmpty(); // 等待所有数据处理完成
`
`bash安装依赖
npm install
$3
`bash
构建项目
npm run build运行测试
npm run test发布
npm publish
`⚠️ 注意事项
1. 内存管理: 建议在不需要时调用
destroy() 方法来释放资源
2. 错误处理: 消费函数中的错误会被捕获并记录,不会影响队列的正常运行
3. 并发控制: 合理设置 concurrency 参数以平衡性能和资源使用
4. 生产速度: 当生产速度远大于消费速度时,缓冲区会被填满并进入阻塞状态
5. 资源清理: 在应用退出前,确保调用 destroy() 方法
6. 事件等待: 使用 waitForEvent 可以实现更精确的流程控制📄 许可证
MIT License
🤝 贡献
欢迎提交 Issue 和 Pull Request!
📝 更新日志
$3
- 初始发布
- 支持基本的生产者-消费者模式
- 事件驱动架构
- 完整的 TypeScript 支持
- 全面的测试覆盖
- 新增等待机制:waitForEmpty、waitForConsumption、waitForEvent
- 新增控制功能:pause、start、clear`---
生产者-消费者队列,让异步任务处理更简单! 🚀