Retry failed attempts to consume a message, with increasing delays between each attempt. For the delay use binary delay message arhitecture.
npm install @gatehub/amqplib-binary-retryamqplib-binary-retryRetry failed attempts to consume a message, with increasing delays between each attempt. For the delay use binary delay message arhitecture.
The problem is because messages are only removed from the head of the queue. You cannot use a single wait queue for any back-off strategy. If you have a message with a TTL of 10 minutes at the head of the queue and a message with a TTL of 1 minute behind it, the second message will wait for ten minutes.
By default we create 16 delay layers. Each layer delays the message for the 2^ seconds. Then we set custom headers on the each retryed message to know in which layers the messages should be delayed. Last delay layer forwards the message to the 'ready' queue. Then that message is republished with original headers to the original consumer queue.
npm install amqplib-binary-retry
``javascript
(async () => {
const amqplib = require('amqplib');
const { retryer } = require('amqplib-binary-retry');
const CONSUMER_QUEUE = "example-service";
const connection = await amqplib.connect('amqp://localhost:5672');
const channel = await connection.createChannel();
// Create the client queue
await channel.assertQueue(CONSUMER_QUEUE, { durable: false, autoDelete: true });
// Define a message handler
const handler = function (msg) {
// no need to 'ack' or 'nack' messages
// messages that generate an exception (or a rejected Promise) will be retried
console.log(msg);
};
// Define the optional delayFunction
// const delayFunction = function (attempt) {
// // After three retries fail the message
// if (attempt > 3) {
// return -1;
// }
//
// // Delay for 5 seconds
// return 5000;
// };
// Use retryer as a consumer
// For more configuration options check the 'Options' section
channel.consume(CONSUMER_QUEUE, retryer({
channel,
consumerQueue: CONSUMER_QUEUE,
handler,
// delayFunction,
}));
})();
`
| Parameter Name | Required | Default | Description |
|-|-|-|-|
| channel | X | | Amqplib channel |
| consumerQueue | X | | Name of the queue that holds the amqp messages that need to be processed. |
| handler | X | | Callback to be invoked with each message. |
| failureQueue | | | Name of the queue that holds the amqp messages that could not be processed in spite of the retries. |Math.pow(2, <# of attempts>)
| delayFunction | | | Delay in milliseconds between retries. The function accepts the number of retry attempts. |delay_exchange_
| exchangePrefixName | | | Name of the exchanges used on the delay levels. |delay_queue_
| queuePrefixName | | | Name of the queues used on the delay levels. |2^
| delayLevels | | 16 | Number of delay layers used. Max avaliable seconds for the delay is |
Setup the required services.
docker-compose up
Transpile the typescript source.
npm run build
Run tests.
npm run test`