PT-BR | [EN](#english)
PT-BR | EN
Biblioteca TypeScript para consumir eventos via RabbitMQ (AMQP) usando amqplib.
Ela fornece:
- LhispEventbus: utilitário leve para conectar, criar channels, assertExchange e publishMessage.
- LhispEventbusConsumer: classe abstrata que configura exchange/queue/bind/consume e delega o processamento para o seu método consume().
``bash`
npm i lhisp-eventbus-consumer
`ts`
import {
LhispEventbus,
LhispEventbusConsumer,
type LhispEventbusConsumerConstructorParams,
type LhispBaseEvent,
type EventBusMessage,
type Logger,
} from "lhisp-eventbus-consumer";
- Exchange de entrada: onde sua aplicação consome eventos.
- Queue: fila que será vinculada (bind) à exchange.
- Routing key / pattern: padrão usado no bindQueue.queuePattern = ""
- Por padrão, .noAck = true
- Ack/Nack:
- Por padrão, (RabbitMQ considera a mensagem entregue sem necessidade de ack).noAck = false
- Se , a classe dá ack quando o processamento termina sem erro; em erro/timeout, dá nack.maxParallelProcesses > 0
- Prefetch (paralelismo):
- Se , é aplicado channel.prefetch(maxParallelProcesses).
A classe LhispEventbusConsumer recebe um objeto com:
- eventBusUrl (obrigatório): URL do AMQP, por exemplo amqp://user:pass@host:5672.exchangeName
- (obrigatório): exchange de entrada.queueName
- (obrigatório): nome da fila.exchangeType
- (opcional, default fanout).exchangeOptions
- (opcional, default { durable: true }).queuePattern
- (opcional, default "").queueOptions
- (opcional, default { durable: true, exclusive: false, autoDelete: false }).noAck
- (opcional, default true).maxParallelProcesses
- (opcional, default 0).consumeTimeout
- (opcional, default 60000 ms): timeout do processamento por mensagem.callbackModulo
- Callback/Status exchange (opcionais):
- (default ""): se definido, publica um evento de callback quando ocorrer erro no consumo.callbackExchangeName
- (default DaemonStatus).callbackExchangeType
- (default fanout).syncExchangeName
- Sync exchange (opcionais):
- (default SyncEvents).syncExchangeType
- (default topic).logger
- (opcional): precisa implementar a interface Logger.
Crie uma classe que estende LhispEventbusConsumer e implemente:
- consume(acao, evento, msg, logger): processamento do evento.handleChannelError(error)
- : handler de erro do channel.
`ts
import { LhispEventbusConsumer, type EventBusMessage, type LhispBaseEvent, type Logger } from "lhisp-eventbus-consumer";
interface UserCreatedEvent extends LhispBaseEvent {
payload: {
id: string;
email: string;
};
}
class UserCreatedConsumer extends LhispEventbusConsumer
protected handleChannelError(error: any): void {
this.logger.error({ message: "Channel Error", error });
}
protected async consume(
acao: string,
evento: UserCreatedEvent,
_msg: EventBusMessage,
logger: Logger,
): Promise
logger.info({ message: "Consuming event", acao, evento });
// Seu processamento aqui
// - use acao (routing key)evento.dbname
// - use , evento.EmpresaId, evento.payload
// Se você precisar responder eventos síncronos:
// if (evento.uuid) await this.replySyncEvent(evento.uuid, { ok: true });
}
}
async function main() {
const consumer = new UserCreatedConsumer({
eventBusUrl: process.env.EVENTBUS_URL ?? "amqp://localhost:5672",
exchangeName: "UserEvents",
queueName: "user-created-consumer",
queuePattern: "user.created",
noAck: false,
maxParallelProcesses: 10,
consumeTimeout: 60_000,
// Opcional: publicar callback em caso de erro
// callbackModulo: "my-service",
// callbackExchangeName: "DaemonStatus",
});
consumer.banner();
await consumer.start();
}
main().catch((err) => {
console.error(err);
process.exit(1);
});
`
A classe LhispEventbus pode ser usada para publicar mensagens (fire-and-forget) em uma exchange.
`ts
import { LhispEventbus } from "lhisp-eventbus-consumer";
async function publishExample() {
const bus = new LhispEventbus({
eventBusUrl: process.env.EVENTBUS_URL ?? "amqp://localhost:5672",
});
await bus.publishMessage("UserEvents", "user.created", {
dbname: "tenant_db",
EmpresaId: "1",
payload: { id: "123", email: "a@b.com" },
});
}
`
- Rodar testes: npm testnpm run test:watch
- Rodar testes em watch: npm run build
- Build:
---
TypeScript library to consume RabbitMQ (AMQP) events using amqplib.
It provides:
- LhispEventbus: a small helper to connect, create channels, assertExchange, and publishMessage.LhispEventbusConsumer
- : an abstract class that sets up exchange/queue/bind/consume and delegates processing to your consume() method.
`bash`
npm i lhisp-eventbus-consumer
`ts`
import {
LhispEventbus,
LhispEventbusConsumer,
type LhispEventbusConsumerConstructorParams,
type LhispBaseEvent,
type EventBusMessage,
type Logger,
} from "lhisp-eventbus-consumer";
- Input exchange: where your application consumes events from.
- Queue: the queue that will be bound to the exchange.
- Routing key / pattern: pattern used in bindQueue.queuePattern = ""
- Default is .noAck = true
- Ack/Nack:
- Default is (RabbitMQ auto-acknowledges delivery).noAck = false
- If , the consumer will ack when processing succeeds; on error/timeout it will nack.maxParallelProcesses > 0
- Prefetch (parallelism):
- If , channel.prefetch(maxParallelProcesses) is applied.
LhispEventbusConsumer constructor receives:
- eventBusUrl (required): AMQP URL, e.g. amqp://user:pass@host:5672.exchangeName
- (required): input exchange name.queueName
- (required): queue name.exchangeType
- (optional, default fanout).exchangeOptions
- (optional, default { durable: true }).queuePattern
- (optional, default "").queueOptions
- (optional, default { durable: true, exclusive: false, autoDelete: false }).noAck
- (optional, default true).maxParallelProcesses
- (optional, default 0).consumeTimeout
- (optional, default 60000 ms): per-message processing timeout.callbackModulo
- Callback/Status exchange (optional):
- (default ""): when provided, publishes a callback event when consumption fails.callbackExchangeName
- (default DaemonStatus).callbackExchangeType
- (default fanout).syncExchangeName
- Sync exchange (optional):
- (default SyncEvents).syncExchangeType
- (default topic).logger
- (optional): must implement the Logger interface.
Create a class extending LhispEventbusConsumer and implement:
- consume(action, event, msg, logger): your message handler.handleChannelError(error)
- : channel error handler.
`ts
import { LhispEventbusConsumer, type EventBusMessage, type LhispBaseEvent, type Logger } from "lhisp-eventbus-consumer";
interface UserCreatedEvent extends LhispBaseEvent {
payload: {
id: string;
email: string;
};
}
class UserCreatedConsumer extends LhispEventbusConsumer
protected handleChannelError(error: any): void {
this.logger.error({ message: "Channel Error", error });
}
protected async consume(
action: string,
event: UserCreatedEvent,
_msg: EventBusMessage,
logger: Logger,
): Promise
logger.info({ message: "Consuming event", action, event });
// Your business logic here.
// If you need to reply a sync request:
// if (event.uuid) await this.replySyncEvent(event.uuid, { ok: true });
}
}
async function main() {
const consumer = new UserCreatedConsumer({
eventBusUrl: process.env.EVENTBUS_URL ?? "amqp://localhost:5672",
exchangeName: "UserEvents",
queueName: "user-created-consumer",
queuePattern: "user.created",
noAck: false,
maxParallelProcesses: 10,
consumeTimeout: 60_000,
// Optional: publish callback events on error
// callbackModulo: "my-service",
// callbackExchangeName: "DaemonStatus",
});
consumer.banner();
await consumer.start();
}
main().catch((err) => {
console.error(err);
process.exit(1);
});
`
Use LhispEventbus for fire-and-forget publishing.
`ts
import { LhispEventbus } from "lhisp-eventbus-consumer";
async function publishExample() {
const bus = new LhispEventbus({
eventBusUrl: process.env.EVENTBUS_URL ?? "amqp://localhost:5672",
});
await bus.publishMessage("UserEvents", "user.created", {
dbname: "tenant_db",
EmpresaId: "1",
payload: { id: "123", email: "a@b.com" },
});
}
`
- Run tests: npm testnpm run test:watch
- Run tests (watch): npm run build`
- Build: