MQueue adapter for multi-backend publishing and subscribing
npm install @mqueue/multicastBroadcast a message to multiple different
MQueue
queue backends simultaneously, with the same interface. Publish to
amqplib,
azure-service-bus,
rhea, and
sqs with one call.
``bash`
npm install --save @mqueue/queue @mqueue/multicast # + Adapter(s)...or use pnpm/yarn
`ts
const outgoingQueue = new MQueue.Outgoing(
new MulticastQueue.Outgoing([
await AmqplibOutgoingQueue.connect("amqp://rabbitmq:5271", "queue-name"),
await AmqplibOutgoingQueue.connect("amqp://rabbitmq:5272", "queue-name2"),
]),
);
outgoingQueue.sendMessage({
headers: {
"Account-ID": "123",
},
body: "...",
});
// ...
const incomingQueue = new MQueue.Incoming(
new MulticastQueue.Incoming([
await AmqplibIncomingQueue.connect("amqp://rabbitmq:5271", "queue-name"),
await AmqplibIncomingQueue.connect("amqp://rabbitmq:5272", "queue-name2"),
]),
);
// Start listening to the queue
await incomingQueue.consume(async (payload) => {
const topicOrQueueName = payload.transport.name;
const headers = payload.message.headers;
const data = await payload.message.json();
await payload.accept(); // or await payload.reject();
// ...
});
`
`ts
import MQueue from "@mqueue/queue"; // or require("@mqueue/queue");
import { MulticastQueue } from "@mqueue/multicast"; // or require("@mqueue/multicast");
// Select one random adapter (for example)
const filter = (adapters) => [adapters[randomInt(adapters.length)]];
// Or filter by message detail
cosnt filter = (adapters, message) => {
return message.headers.example === "something" ? adapter[0] : adapter;
};
const outgoingQueue = new MQueue.Outgoing(
new MulticastQueue.Outgoing([
await AmqplibOutgoingQueue.connect("amqp://rabbitmq:5271", "queue-name"),
await AmqplibOutgoingQueue.connect("amqp://rabbitmq:5272", "queue-name2"),
]),
{ filter },
);
// ...
const incomingQueue = new MQueue.Incoming(
new MulticastQueue.Incoming([
await AmqplibIncomingQueue.connect("amqp://rabbitmq:5271", "queue-name"),
await AmqplibIncomingQueue.connect("amqp://rabbitmq:5272", "queue-name2"),
]),
{ filter },
);
// Start listening to the queue
await incomingQueue.consume(async (payload) => {
const topicOrQueueName = payload.transport.name;
const headers = payload.message.headers;
const data = await payload.message.json();
await payload.accept(); // or await payload.reject();
// ...
});
``