npm install @synonymdev/blocktank-worker2
Microservice module based on Grenache DHT and AMPQlib RabbitMQ messages. Written in Typescript, supports Javascript.
Run DHT for service discovery
``bash`
npm i -g grenache-grape # Only once
grape --dp 20001 --aph 30001 --bn '127.0.0.1:20002'
grape --dp 20002 --aph 40001 --bn '127.0.0.1:20001'
Run RabbitMQ for events
`bash`
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 heidiks/rabbitmq-delayed-message-exchange:3.10.2-management
Open the dashboard http://localhost:15672/ and login with guest/guest.
A Worker consists of
* a server that listens on method calls.
* RabbitMQ event publisher (fanout exchange).
`typescript
import { Worker, WorkerImplementation, waitOnSigint, GrenacheClient } from '@synonymdev/blocktank-worker2';
const client = new GrenacheClient()
class HelloWorldWorkerImplementation extends WorkerImplementation {
/**
* Every method defined in here can be called by other workers/clients.
*/
async helloWorld(name1: string, name2: string) {
return Hello ${name1} and ${name2};
}
async callOtherWorkerUsdToBtc(usd: number) {
const exchangeRate = client.encapsulateWorker('exchange_rate') // Get exchangeRate worker
const btcUsd = await exchangeRate.getRate("BTCUSD") // Call method on exchangeRate worker.
console.log('Current BTCUSD price is', btcUsd)
// Current BTCUSD price is $30,000
return usd/btcUsd
}
}
const runner = new Worker(new HelloWorldWorkerImplementation(), {
name: 'HelloWorldService', // Name of the worker.
})
try {
await runner.start();
await waitOnSigint() // Wait on Ctrl+C
} finally {
await runner.stop()
}
`
* Supports, async and sync and callback functions.
* If callback functions are used, initialize the Worker with callbackSupport: true.Error
* Automatically returns s.
constructor(worker, config?)
worker: WorkerImplementation*
config? GrenacheServerConfig*
name? string* Name of the worker. Announced on DHT. Used to name RabbitMQ queues. Default: Random name.
grapeUrl? string* URL to the grape DHT. Default: http://127.0.0.1:30001.port?
integer* Server port. Default: Random port between 10,000 and 40,000.callbackSupport?
boolean* Allows WorkerImplementation functions to be written with callbacks. Disables the method argument count check. Default: falseconnection?
amp.Connection* RabbitMQ connection. Mutually exclusive with amqpUrl.amqpUrl
string* RabbitMQ connection URL. Mutually exclusive with connection. Default: amqp://localhost:5672.namespace
string* RabbitMQ namespace. All objects like exchanges, queues will start with {namespace}.. Default: blocktank
async start() Starts the worker. Listens on given port.
async stop() Stops the worker. Graceful shutdown.
options? WorkerStopOptions*
cleanupRabbitMq? boolean* Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.
GrenacheClient allows to call other workers without exposing your own server.
constructor(grapeUrl?)
grapeUrl string* URL to the DHT. Default: http://127.0.0.1:30001.
`typescript
import { GrenacheClient } from '@synonymdev/blocktank-worker2'
const client = new GrenaceClient()
// Method 1 - Call function
const method = 'helloWorld';
const args = ['Sepp', 'Pirmin'];
const response1 = await client.call('HelloWorldService', method, args)
console.log(response1) // Hello Sepp and Pirmin
`
async call(workerName, method, args?, opts?) call method of another worker. Returns the worker response.
workerName string* Name of the worker you want to call.
method string* Method name you want to call.
args? any[]* List of arguments. Default: [].
opts?: Partial GrenacheClientCallOptions*
* timeoutMs? Request timeout in milliseconds. Default: 60,000.
encapsulateWorker(workerName) Conveninence wrapper. Returns a worker object that can be called with any worker method.
`typescript`
// Example
const helloWorldService = client.encapsulateWorker('HelloWorldService')
const response = await helloWorldService.helloWorld('Sepp', 'Pirmin')
// Hello Sepp and Pirmin
RabbitPublisher and RabbitConsumer manage all events around the worker.
Events work on a "at least once" delivery basis. If an error is thrown, the even is retried with an exponential backoff.
Checkout RabbitMQ docs to get an overview on the exchange/queue structure.
Consume events from RabbitMQ.
`typescript
const myServiceName = 'MyServiceName'
const consumer = new RabbitConsumer(myServiceName)
await consumer.init()
try {
await consumer.onMessage('HelloWorldService', 'invoicePaid', async event => {
console.log('HelloWorldService.invoicePaid event:', event)
})
await waitOnCtrlC()
} finally {
await consumer.stop() // Graceful shutdown
}
`
async init() Initializes the consumer. Creates the RabbitMQ exchanges and queues.
async stop(cleanupRabbitMq?, timeoutMs?) Stops consuming messages. Graceful shutdown.
cleanupRabbitMq? boolean* Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.
timeoutMs? number* Timeout in milliseconds to wait on currently consumed messages to finish. Default: 20,000.
async onMessage
sourceWorkerName string* Name of the worker that emits the event.
eventName string* Name of the event.
callback function* Callback function that is called when the event is received.
* Type: (msg: RabbitEventMessage
* May be async or sync.
options? RabbitConsumeOptions* options for this specific event type.
backoffFunction: (attempt: number) => number* Function that returns the backoff time in milliseconds. Default: exponential backoff.
Important properties of onMessage
- At-Least-Once-Delivery: Messages can be delivered multiple times and potentially in a different order.
- Retries: If an error is thrown, the event is retried with an exponential backoff. The backoff function can be customized.
Publish events without a consumer.
constructor(myWorkerName, options?)
myWorkerName string* Name of the worker that emits the event.
options? RabbitConnectionOptions*
connection? amp.Connection* RabbitMQ connection. Mutually exclusive with amqpUrl.amqpUrl
string* RabbitMQ connection URL. Mutually exclusive with connection. Default: amqp://localhost:5672.namespace
string* RabbitMQ namespace. All objects like exchanges, queues will start with {namespace}.. Default: blocktank
async init() Initializes the producer. Creates the main RabbitMq Exchange.
async stop(cleanupRabbitMq?) Stops the connection.
cleanupRabbitMq? boolean* Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.
async publish(eventName: string, data: any) Publishes an event.
eventName string* Name of the event.
data any* Any json serializable data that is sent with the event.
The Worker and RabbitConsumer take a logger option.
logger pino.Logger | boolean Default: false. If set to true, a default logger is used. If set to false, no logging is done. If set to a pino logger, this logger is used.
> Experimental The goal of MongoDatabase is to provide convenient methods for testing. This class is not mature though so it might change in the future.
Run mongo locally:
`bash`
docker run -it --rm -p 27017:27017 --name ma-mongo-db mongo:latest
Checkout the MongoDB Compass if you need a UI.
Define entities in its own folder:
`typescript
import { Entity, PrimaryKey, Property, SerializedPrimaryKey } from "@mikro-orm/core";
import {randomUUID} from 'crypto'
import { ObjectId } from "@mikro-orm/mongodb";
@Entity()
export class SampleAuthor {
@PrimaryKey({ name: "_id" })
id: string = randomUUID();
@Property()
name!: string;
}
`
Create a mikro-orm.config.ts file to configure your database connection.
`typescript
import { MikroORMOptions, ReflectMetadataProvider } from '@mikro-orm/core';
import { MongoDriver } from '@mikro-orm/mongodb';
import entities from './1_database/entities';
import { AppConfig } from './0_config/AppConfig';
const appConfig = AppConfig.get()
const config: Partial
entities: entities,
clientUrl: appConfig.dbUrl,
metadataProvider: ReflectMetadataProvider,
debug: false,
type: 'mongo',
migrations: {
path: 'dist/1_database/migrations',
pathTs: 'src/1_database/migrations',
transactional: false
}
};
export default config;
`
- See this mikro-orm.config.ts for an example config.
- Checkout the mikro-orm docs for more info to set up the ORM.
- You may choose to use another ORM. In that case, make sure you manage test integrations yourself.
- Checkout this example Entity SampleAuthor.ts.
`typescript
import {MongoDatabase} from '@synonymdev/blocktank-worker2';
import config from './mikro-orm.config.ts'
try {
await MongoDatabase.connect(config)
await MongoDatabase.migrateUp()
const em = MongoDatabase.createEntityManager()
const author = new SampleAuthor()
author.name = 'Sepp'
await em.persistAndFlush(author)
} finally {
await MongoDatabase.close()
}
`
MongoDatabase provides a InMemory database for testing. Checkout the example MongoDatabase.test.ts for more details on how to use the inMemory database to run independent tests.
MikroORM comes with a cli. To use the cli, add this config to your package.json:
``
"mikro-orm": {
"useTsNode": true,
"configPaths": [
"./src/mikro-orm.config.ts",
"./dist/mikro-orm.config.js"
]
},
- Use npx mikro-orm migration:create to create a new migration.npx mikro-orm migration:up
- Use to run migrations.
. Checkout vscode jest to selectively run tests.Make tests independent + cleanup RabbitMQ:
`ts
import { Worker } from "@synonymdev/blocktank-worker2";// Use a random RabbitMQ namespace to avoid any conflicts between tests:
const runner = new Worker(worker, {
namespace: Worker.randomNamespace()
});
try {
await runner.start()
// Do your tests here
} finally {
// Cleanup all existing rabbitMQ objects
await runner.stop({cleanupRabbitMq: true})
}
`$3
1. Increase version in
package.json.
2. Add changes to CHANGELOG.md.
3. Commit changes.
4. Tag new version: git tag v0.1.0.
5. Push tag git push origin v0.1.0.
6. Publish to npm: npm publish`.