Kafka-based messaging for kaapi
npm install @kaapi/kafka-messaging@kaapi/kafka-messaging is a lightweight wrapper around kafkajs that integrates with the Kaapi framework to provide a clean and consistent message publishing and consuming interface.
publish(topic, message) and publishBatch(topic, messages) APIs
subscribe(topic, handler, config) with custom groupId, error handling, and offset tracking
ILogger
KafkaMessaging class to publish and consume messages with Apache Kafka.
bash
npm install @kaapi/kafka-messaging kafkajs
`
---
$3
`ts
import { KafkaMessaging } from '@kaapi/kafka-messaging';
const messaging = new KafkaMessaging({
clientId: 'my-app',
brokers: ['localhost:9092'],
name: 'my-service',
address: 'service-1',
logger: createLogger() // optional, use Kaapi ILogger
});
`
The constructor accepts a KafkaMessagingConfig object, which extends KafkaConfig from kafkajs:
| Option | Type | Description |
| ---------- | ---------------- | ------------------------------------------------------------------------- |
| brokers | string[] | List of Kafka broker addresses (e.g. ['localhost:9092']). Required. |
| clientId | string | Unique client identifier for Kafka. |
| logger | ILogger | Optional logger implementing Kaapi's ILogger interface. |
| address | string | Optional unique service address for routing and identification. |
| name | string | Optional human-readable name for service tracking/monitoring. |
| producer | ProducerConfig | Optional default KafkaJS producer configuration. |
---
$3
`ts
await messaging.createTopic({
topic: 'my-topic',
numPartitions: 1,
replicationFactor: 1,
}, {
waitForLeaders: true
});
// ensure the topic is ready before publishing
const timeoutMs = 10000;
const checkIntervalMs = 200;
await messaging.waitForTopicReady('my-topic', timeoutMs, checkIntervalMs);
`
---
$3
#### Single Message
publish(topic, message) sends a message to a given Kafka topic.
`ts
await messaging.publish('my-topic', {
userId: '123',
action: 'login',
});
`
Messages can be:
- Objects โ automatically JSON-serialized
- Strings โ sent as-is
- Buffers โ sent as-is (for binary data)
- null โ sent as null (tombstone messages)
#### Batch Publishing
publishBatch(topic, messages) sends multiple messages in a single request for better throughput.
`ts
await messaging.publishBatch('user-events', [
{ value: { event: 'user.created', userId: '1' } },
{ value: { event: 'user.created', userId: '2' } },
{ value: { event: 'user.updated', userId: '3' }, key: 'user-3' },
{ value: { event: 'user.deleted', userId: '4' }, headers: { priority: 'high' } },
]);
`
Each message in the batch can include:
- value โ the message payload (required)
- key โ optional partition key
- partition โ optional specific partition
- headers โ optional custom headers
---
$3
subscribe(topic, handler, config?) subscribes to a Kafka topic and calls the provided handler on each message.
`ts
await messaging.subscribe('my-topic', async (message, context) => {
console.log('Received:', message);
console.log('Offset:', context.offset);
console.log('Timestamp:', context.timestamp);
}, {
fromBeginning: true
});
`
#### Subscribe Configuration
| Option | Type | Description |
| --------------- | ---------- | --------------------------------------------------------------------------- |
| groupId | string | Custom consumer group ID. Overrides auto-generated ID. |
| groupIdPrefix | string | Prefix for auto-generated group ID (default: service name). |
| fromBeginning | boolean | Start consuming from the beginning of the topic. |
| logOffsets | boolean | Log partition offsets on subscribe (adds admin overhead). Default: false. |
| onReady | function | Callback invoked when the consumer is ready. |
| onError | function | Callback invoked when a message handler throws an error. |
#### Consumer Group ID Resolution
The consumer group ID is resolved in this order:
1. groupId if provided
2. {groupIdPrefix}.{topic} if prefix provided
3. {name}.{topic} using the service name from config
4. group.{topic} as fallback
`ts
// Using custom group ID
await messaging.subscribe('user-events', handler, {
groupId: 'my-custom-consumer-group'
});
// Using custom prefix โ "analytics.user-events"
await messaging.subscribe('user-events', handler, {
groupIdPrefix: 'analytics'
});
`
#### Error Handling
Use the onError callback to handle errors from message handlers without crashing:
`ts
await messaging.subscribe('user-events', async (message) => {
await processMessage(message); // might throw
}, {
onError: async (error, message, context) => {
console.error('Failed to process message:', error);
console.error('Message:', message);
console.error('Offset:', context.offset);
// Log to external service, send to DLQ, etc.
await alertService.notify(error);
}
});
`
The onError callback receives:
- error โ the error thrown by the handler
- message โ the parsed message that failed
- context โ the message context (offset, headers, timestamp, etc.)
#### Consumer Ready Callback
`ts
await messaging.subscribe('my-topic', handler, {
onReady: (consumer) => {
console.log('Consumer is ready!');
// Access the raw KafkaJS consumer if needed
}
});
`
---
$3
`ts
const offsets = await messaging.fetchTopicOffsets('my-topic');
offsets?.forEach((partition) => {
console.log(Partition ${partition.partition}: offset=${partition.offset}, high=${partition.high}, low=${partition.low});
});
`
---
$3
`ts
const result = await messaging.shutdown();
console.log(Disconnected ${result.successProducers} producers);
console.log(Disconnected ${result.successConsumers} consumers);
console.log(Disconnected ${result.successAdmins} admins);
console.log(Errors: ${result.errorCount});
`
This will disconnect all tracked producers, consumers, and admin clients safely.
`ts
// Example: graceful shutdown on SIGTERM
process.on('SIGTERM', async () => {
const result = await messaging.shutdown();
console.log(Shutdown complete: ${result.errorCount} errors);
process.exit(0);
});
`
---
๐งฑ Example Usage
`ts
// messaging.ts
import { Kaapi } from '@kaapi/kaapi'
import { KafkaMessaging } from '@kaapi/kafka-messaging';
const messaging = new KafkaMessaging({
clientId: 'my-app',
brokers: ['localhost:9092'],
name: 'my-service',
address: 'service-1'
});
/**
* Initialize the Kaapi app with messaging
*/
const app = new Kaapi({
port: 3000,
host: 'localhost',
messaging,
});
/**
* Demonstrates how to subscribe and publish a message
*/
async function runExample(): Promise {
/**
* Option 1: Use Kaapi app (recommended in app lifecycle)
*/
// Publish a message
await app.publish('my-topic', { event: 'user.created', userId: 456 });
// Subscribe to messages
await app.subscribe('my-topic', async (message, context) => {
console.log('Received:', message);
console.log('Offset:', context.offset);
});
/**
* Option 2: Use messaging directly (standalone)
*/
// Publish a message
await messaging.publish('my-topic', { event: 'user.created', userId: 123 });
// Subscribe with error handling
await messaging.subscribe('my-topic', async (message, context) => {
console.log('Received:', message);
console.log('Offset:', context.offset);
}, {
fromBeginning: true,
onError: (error, message, context) => {
console.error('Handler failed:', error);
}
});
// Batch publish
await messaging.publishBatch('my-topic', [
{ value: { event: 'user.created', userId: 1 } },
{ value: { event: 'user.created', userId: 2 } },
]);
}
runExample().catch((err) => {
console.error('โ Messaging example failed:', err);
});
`
---
Public API Contract
The KafkaMessaging class provides a safe and resilient interface for interacting with Kafka. Developers should use the following methods to ensure proper lifecycle management, resource tracking, and graceful shutdown.
$3
| Method | Purpose |
| ------------------------------------------------- | ---------------------------------------------------------------------------- |
| createProducer(config?) | Creates and connects a Kafka producer. Automatically tracked. |
| createConsumer(groupId, config?) | Creates and connects a Kafka consumer. Automatically tracked. |
| createAdmin(config?) | Creates and connects a Kafka admin client. Tracked for shutdown. |
| getProducer() | Gets or creates the singleton producer (race-condition safe). |
| publish(topic, message) | Sends a message to the specified topic. |
| publishBatch(topic, messages) | Sends multiple messages in a single batch. |
| subscribe(topic, handler, config?) | Subscribes to a topic and processes messages with the given handler. |
| fetchTopicOffsets(topic) | Fetches partition offsets for a topic. |
| createTopic(topicConfig, options?) | Creates a Kafka topic with optional validation and leader wait. |
| waitForTopicReady(topic, timeoutMs?, intervalMs?) | Waits for a topic to be ready (has partitions). |
| shutdown() | Gracefully disconnects all tracked clients. Returns a summary. |
| safeDisconnect(client, timeoutMs?) | Disconnects a Kafka client with timeout protection. |
| disconnectProducer() | Disconnects the singleton producer. |
$3
| Property | Type | Description |
| ----------------- | ----------------------- | ---------------------------------- |
| activeProducers | ReadonlySet | Currently tracked producers. |
| activeConsumers | ReadonlySet | Currently tracked consumers. |
$3
| Method | Status | Reason |
| ---------------- | --------- | ------------------------------------------------------------- |
| getKafka() | Protected | Used internally to instantiate Kafka clients. |
| getSharedAdmin() | Protected | Lazy-initialized shared admin for internal operations. |
$3
- Always use createProducer, createConsumer, or createAdmin to ensure proper tracking.
- Use getProducer() for the singleton producer pattern (recommended for most use cases).
- Avoid accessing the raw Kafka instance directly.
- Call shutdown() during application teardown to release resources.
- Use createTopic() and waitForTopicReady() in tests or dynamic topic scenarios.
- Use onError callback in subscribe() to handle message processing failures gracefully.
---
๐ ๏ธ Requirements
* Node.js 18+
* A running Kafka instance
* Optional: integrate into a Kaapi service lifecycle
---
๐ Related
* KafkaJS โ the underlying Kafka client
* Kaapi โ framework powering this abstraction
* @kaapi/kaapi
---
๐งช Testing
`bash
Run mock tests (no Kafka required)
pnpm test
Run integration tests (requires Kafka broker)
pnpm test:integration
Run all tests
pnpm test:all
`
You can run Kafka locally using Docker:
`bash
docker run -d --name kafka \
-p 9092:9092 \
apache/kafka:latest
``