Library for interacting with Pulse and Taskcluster-Pulse
npm install taskcluster-lib-pulseLibrary for interacting with Pulse and Taskcluster-Pulse. See the
docs for more
information on Pulse.
This library is designed for use in Taskcluster services, both for producing
and consuming pulse messages.
This library defines a Client along with several classes and functions that
base their functionality on a Client. The Client represents an association
with a Pulse service, automatically reconnecting as necessary.
It also provides higher-level components with simplified APIs for common
applications. The higher-level components are:
* PulseConsumer
* PulsePublisher
If you are using one of the higher-level components, then the details of
interacting with a Client are not important -- just construct one and move on.
Create a credentials function, choosing among:
``javascript
const pulse = require('taskcluster-lib-pulse');
let credentials;
// raw AMQP credentials
credentials = pulse.pulseCredentials({
username: 'sendr',
password: 'sekrit',
hostname: 'pulse.mycompany.com',
vhost: '/',
});
// ..or a connection string
credentials = pulse.connectionStringCredentials(
'amqps://me:sekrit@foo.com/%2Fvhost');
`
Next, create a Client to handle (re)connecting to Pulse:
`javascript`
const client = new pulse.Client({
namespace: 'my-service',
credentials, // from above
monitor: .., // taskcluster-lib-monitor instance
});
The Client is responsible for connecting, and re-connecting, to the pulse
server. Once created, it will do so automatically until stopped.
Other options to the constructor:
* recycleInterval - interval on which connections are automatically recycled, in ms. Default: 1 hour.retirementDelay
* - time that a connection remains in the retiring state. Default: 30 seconds.minReconnectionInterval
* - minimum time between connection attempts. Default: 15s.
AMQP is a very connection-oriented protocol, so as a user of this library, you
will need to set up each new connection. To do so, set up an event listener
in a handler for the connected event from the client:
`javascript`
client.onConnected(conn => {
// ...
});
The conn value of this event is a Connection instance, from this library.conn.amqp
The amqplib connection is available as . The listener should create
any necessary channels, declare queues and exchanges, and - if consuming
messages - bind to those queues.
The onConnected method is a shorthand for on('connected', ..) that alsoclient.removeListener
calls the handler immediately if the Client is already connected. Its return
value can be used with just like any other EventEmitter
listener.
Note that declaring non-durable queues in this method may lead to message loss
or duplication: when this connection fails, the server will delete the queues
and any pending messages. If this is not acceptable for your application, use a
durable queue.
The library cannot detect all problems with an existing connection. If any
method produces an error that might be fixed by reconnecting, call the
connection's failed method. This will mark the connection as failed andconnected
begin cretaing a new connection (culminating in another event).
The activeConnection property contains the current Connection, or undefinedonConnected
if no connection exists at the moment. In most cases, you will want to use or withConnection to get access to a Client's connection,
rather than this property.
If you have a one-off task that requires a channel, such as declaring an
exchange, use client.withChannel, which will wait for a connection if
necessary, then run your asynchronous function with an amqplib channel or
confirmChannel. If the function fails, it is not automatically retried, but the
channel is closed.
`javascript`
await client.withChannel(async channel => { .. }, {confirmChannel: true});
await client.withChannel(async channel => { .. }, {confirmChannel: false});
There is also a more general withConnection which returns the Connection
instance without creating a channel.
`javascript`
await client.withConnection(async conn => { .. });
The most common use case for these functions is to declare or delete objects on
the AMQP server. For example:
`javascript`
await client.withChannel(async chan => {
const exchangeName = client.fullObjectName('exchange', 'notable-things');
await chan.assertExchange(exchangeName, 'topic');
});
Note that the example above uses the fullObjectName method. This method willexchanges/
generate an exchange name compatible with the pulse access control model, in
this case .
This method is useful for translating unqualified names like queueName to the
fully qualified names required when working directly with amqplib.
The Client instance will automatically reconnect periodically. This helps
to distribute load across a cluster of servers, and also exerciess the
reconnection logic in the application, avoiding nasty surprises when a network
or server failure occurs.
The Client also has a recycle method that will trigger a retirement and
reconnection.
When a connection is still working, but a new connection is being created, the
old connection spends 30 seconds "retiring". The intent of this delay is to
allow any ongoing message handling to complete before closing the underlying
AMQP connection.
The Connection instance emits a retiring event when retirement begins.retiring
Consumers should respond to this message by cancelling any channel consumers.
The event from the Connection will be followed by aconnected event from the Client for the next connection.
Call the async Client.stop() method to shut the whole thing down. This willConnection
wait until all existing instances are finished their retirement.
To consume messages, listen for connected messages and set up a new
channel on each new connection. Stop consuming from the channel on retirement,
allowing time for any in-flight consumption to complete before the connection
is finished.
The whole thing is wrapped in a try/catch so that any errors in connection
setup are treated as a connection failure.
NOTE: the PulseConsumer class implements a full-featured consumer; this
code is provided merely as an example of Client usage.
`javascript
client.onConnected(async (conn) => {
let channel, consumer;
try {
const amqp = conn.amqp;
channel = await amqp.createChannel();
await channel.assertExchange(exchangeName, 'topic');
await channel.assertQueue(queueName);
await channel.bindQueue(queueName, exchangeName, routingKeyPattern);
consumer = await channel.consume(queueName, (msg) => {
// do something with the message, then ack it..
channel.ack(msg);
});
conn.on('retiring', () => {
// ignore errors in this call: the connection is already retiring..
channel.cancel(consumer.consumerTag).catch(() => {});
});
} catch (err) {
debug('error in connected listener: %s', err);
conn.failed();
}
});
`
The FakeClient class can be used instead of Client in testing situations,PulseConsumer
to avoid the need for an actual AMQP server. The class itself has no
functionality, but serves as a semaphore to activate a "fake" mode when passed
to higher-level components such as : fakeConsumer =
consume({client: new FakeClient(), ..}).
The stop and recycle methods do nothing. The activeConnection propertyonConnected
is always undefined. All of , withConnection, and withChannel
return immediately without invoking their callback.
A fake client has client.isFakeClient set to true.
A PulseConsumer declares a queue and listens for messages on that queue,
invoking a callback for each message.
Construct a PulseConsumer with the async consume function:
`javascriptqueues/
const pulse = require('taskcluster-lib-pulse');
let pc = await pulse.consume({
client, // Client object for connecting to the server
bindings: [{ // exchange/routingKey patterns to bind to
exchange, // Exchange to bind
routingKeyPattern, // Routing key as string
routingKeyReference, // Reference used to parse routing keys (optional)
}, ..],
queueName, // Queue name (without prefix)`
prefetch, // Max number of messages unacknowledged to hold (optional)
maxLength, // Maximum queue size, undefined for none
...queueOptions, // passed to assertQueue
}, async ({payload, exchange, routingKey, redelivered, routes, routing}) => {
// handle message
...
});
This will create a queue using a pulse-compatible queue name based on
queueName (prefixed with queue/).
If routingKeyReference is provided for the exchange from which messages
arrive, then the listener will parse the routing key and make it available as a
dictionary on the message. Note that bindings are easily constructed using the
taskcluster-client library.
The instance starts consuming messages immediately. When the consume function's
promise has resolved, the queue exists and all bindings are in place.
At this time, it is safe to initiate any actions that might generate messages
you wish to receive.
Call await pc.stop() to stop consuming messages. A PulseConsumer cannot bestop
restarted after stopping -- instead, create a new instance. The
method's Promise will not resolve until all message-handling has completed and
the channel is closed.
When a message is received, the message handler (which can be specified as
option handleMessage or as the second positional argument) is called
(asynchronously) with a message of the form:
`javascriptroute.
{
payload, // parsed payload (as JSON)
exchange, // exchange name
routingKey: // primary routing key
redelivered: // true if this message has already been attempted
routes: [..] // additional routes (from CC header, with the `
// prefix stripped)
routing: {} // parsed routes (if routingKeyReference is provided)
}
If the handler fails, the message will be re-queued and re-tried once.
The default mode of operation is to create a "permanent" queue that will
persist across restarts and reconnections, thereby ensuring no messages are
lost. If multiple processes use the same queueName, then a message will only
be delivered to one process, provinding an easy way to distribute
message-processing load.
Some uses of Pulse call instead for a queue for each process, and arranging
that the queue be automatically cleaned up when the process goes away. For
example, some components of an application may notify other components that
some data has changed.
This is referred to as an "ephemeral consumer". The automatic cleanup occurs
on reconnection, meaning that it is possible and common for messages to be lost
when an AMQP connection is recycled. Callers must handle this situation.
To use an ephemeral queue, pass ephemeral: true, do not pass a queueNameonConnected
(as one will be invented for you), and pass an function that willonConnected
be called (asynchronously) every time a new connection is established. Such a
call is a signal that messages may have been skipped. In the example given
above, the callback would assume that a data-change notification
had been missed and perform whatever reconciliation is required.
A binding's routingKeyReference gives reference information for the format ofname
a routing key, and allows the tool to "parse" a message's routing key into
components. It is an array of objects with properties , the name of themultipleWords
component, and if the component can match multiple words.
(joined with a ). Other fields are ignored. Only one component can havemultipleWords. This is compatible with the references produced by
taskcluster services.
`javascript`
routingKeyReference: [
{name: 'routingKeyKind'},
{name: 'someId', multipleWords: true},
]
If this parameter is given, the message will have a routing property
containing the routing key components keyed by their name.
The library assumes that all messages on a given exchange share the same
routing key reference, as it is not practical to determine which
routingKeyPattern matched a particular message.
In some cases, it is necessary to modify the bindings for a queue while it is
still consuming. Use queue.withChannel (above) for this purpose. In thisbindings: []
case, it is simplest to provide an empty to the PulseConsumerwithChannel
constructor and manage bindings entirely via . Note that with this
arrangement, routing key reference is not supported.
If passed a FakeClient, consume will return a fake consumer. That objectfakeMessage
does not interface with an AMQP server, but has an async method
which will call back the message-handling function with the same arguments.
`javascript`
const consumer = consume({
client: new FakeClient(),
...
}, async ({payload, exchange, routingKey, redelivered, routes, routing}) => {
// ...
});
await consumer.fakeMessage({payload: .., exchange: .., ..});
The library provides high-level support for publishing messages, as well. The
support for sending messages is quite simple, but this component also handles
declaring exchanges, message schemas, and so on, and producing a reference
document that can be consumed by client libraries to generate easy-to-use
clients. All of this is similar to what
taskcluster-lib-api does
for HTTP APIs.
Begin by creating an Exchanges instance. This will collect all exchange
definitions for the service.
`javascript
const {Exchanges} = require('taskcluster-lib-pulse');
const exchanges = new Exchanges({
serviceName: 'myservice',
projectName: 'taskcluster-myservice',
version: 'v1',
title: 'Title for Exchanges Docs',
description: [
'Description in markdown.',
'This will available in reference JSON',
].join(''),
});
`
The serviceName should match that passed toprojectName
taskcluster-lib-validate.
The is used to construct exchange and queue names. It should
match the pulse namespace (at least in production deployments) and the name
passed to
taskcluster-lib-monitor,
Having created the exchanges instance, declare exchanges on it:
`javascriptroute.nests.${message.nestId}
exchanges.declare({
exchange: 'egg-hatched',
name: 'eggHatched',
title: 'Egg has Hatched',
description: [
'Description in markdown.',
'This will available in reference JSON',
].join(''),
schema: 'egg-hatched-message.yml',
messageBuilder: ({eggId, nestId, hatchDate}) => ({eggId, nestId, hatchDate}),
routingKeyBuilder: ({eggId}) => ({eggId}),
routingKey: [
{
name: 'routingKeyKind',
summary: 'Routing key kind hardcoded to primary in primary routing-key',
constant: 'primary',
required: true,
}, {
name: 'eggId',
summary: 'The egg that has changed state',
required: true,
maxSize: 22,
multipleWords: false,
}, {
name: 'reserved',
summary: 'Space reserved for future use.',
multipleWords: true,
maxSize: 1,
}
],
CCBuilder: (message) => ([]),`
});
The first few arguments describe the exchange.
* exchange - the name of the exchange. This will be qualified as exchange/.name
* - the camelCased name of the exchange; used for function namestitle
* - short summarydescription
* - longer descriptionschema
* - the schema against which the payload must validate
A message will be sent by calling a method on the publisher named by the namemessageBuilder
argument. The arguments to that method are passed unchanged to, routingKeyBuilder, and CCBuilder.
The messageBuilder returns the message payload. This payload is subsequently
validated against the schema.
The routingKeyBuilder returns the message's routing key either as an objectroutingKey
containing routing key fields. That object is then used along with to construct a routing key. Each element in the routingKey
array generates at least one element in the routing key, based on the following
properties:
* name -- name of the field, drawn from the object returned by routingKeyBuildersummary
* -- short summary of the fieldconstant
* -- if set, the only allowable value for the fieldrequired
* -- if false, the field is not required and will default to _maxSize
* -- maximum number of characters (or bytes, as this is ASCII) in the fieldmultipleWords
* -- if true, the value can contain .; this can occur only once in the routing key.
By convention, the first element of the routing key is always the constant
primary and the last element is a multi-word field named reserved.
Finally, the CCBuilder returns a list of strings giving CC'd routing keys forroute.
this message. Queues bound with routing patterns matching a CC'd routing key
will receive the message even if they do not match the primary routing key.
By convention, these are prefixed with .
The exchanges.reference() method will return a reference document suitable
for publication under .
This is typically passed to taskcluster-lib-docs like this:
`javascript`
Docs.documenter({
references: [
{name: 'events', reference: exchanges.reference()},
], ..
})
To publish messages, create a new pulse publisher:
`javascript`
const publisher = await exchanges.publisher({
rootUrl: cfg.taskcluster.rootUrl,
schemaset, // from taskcluster-lib-validate
client, // taskcluster-lib-pulse Client instance
sendDeadline: 12000,
});
Call the methods declared on the Exchanges instance; for example await
publisher.eggHatched({eggId, nestId, datehatched}). The function's promise
will resolve when the AMQP server has confirmed receipt of the message.
The sendDeadline option gives a time (in ms) after which a send operation
will not be retried. This value is typically chosen so that operations sending
messages (such as REST API handlers) can return an error instead of timing out.
Default is 12 seconds.
For compatibility with the deployment at taskcluster.net, this function alsopublish
accepts parameters and aws, which control publishing the references
to an Amazon S3 bucket.
If given a FakeClient, the resulting publisher will not actually send messages.message
Instead, it is an EventEmitter that will emit a event for every message
it sends, containing the processed exchange name, routing key, and so on:
`javascript`
publisher.on('message', msg => {
assume(msg).to.deeply.equal({
exchange: 'exchange/taskcluster-lib-pulse/v2/egg-hatched',
routingKey: 'badEgg',
payload: {eggId: 'badEgg'},
CCs: [],
});
});
await publisher.eggHatched({eggId: 'badEgg'});
To run the tests, a simple yarn test will do. But it will skip most of the tests!
Better to run against a real RabbitAMQP server. If you have Docker, that's easy:
```
docker run -d -ti --rm -p 5672:5672 rabbitmq:alpine
export PULSE_CONNECTION_STRING=amqp://guest:guest@localhost:5672/