Advanced Node.js/TypeScript toolkit for Kafka with exactly-once semantics (EOS), transactional and idempotent producers, dynamic consumer groups, retry and dead-letter pipelines, producer pool management, multi-cluster support, and graceful shutdown. Full
npm install kafkakit
npm i kafkakit
`
Usage
$3
`js
import { kafkaConfig } from "kafkakit";
kafkaConfig.setLogger(logger); // Your customized logger
`
$3
`js
import { Kafka } from "kafkajs";
kafkaClient = new Kafka({
clientId: "example",
brokers: [],
});
`
$3
- ### Functions Available
- The connect() function allows you to connect your non-transactional producer. Regarding transational ones, they're automatically created by the consumers.
- The disconnect() allows you to gracefully shut down your producers by waiting for each to finish its jobs, ensuring that no in-flight messages are dropped and all internal buffers are flushed before the process exits.
- The reset() mechanism is specifically designed for Kafka rebalances. It "drains" the current promise queues for transactional producers, allowing pending sends to complete before clearing the pool. This prevents "Zombie Producers" from hanging around after partition ownership has shifted.
Note: This is an internal safeguard and is not intended for direct use.
- The send() method publishes messages using a shared idempotent producer and is intended for non-transactional flows. It ensures durability with acks: -1 but does not tie message production to consumer offset commits.
- The runInTransaction() method enables exactly-once semantics by wrapping message production and offset commits inside a single Kafka transaction. If the task succeeds, both the produced messages and the offset commit are finalized atomically; if it fails, the transaction is aborted and no offsets are advanced.
Note: This is typically used within consumers and is already configured for you in the consumer examples below, as the package handles the setup automatically.
- ### Example
`js
import { Producer, ProducerConfig } from "kafkakit";
/*
* transactionalIdPrefix & maxPoolSize => Must remain stable across pod restarts to avoid transactional fencing and PID invalidation
*/
const config: ProducerConfig = {
transactionalIdPrefix: env.POD_NAME,
createPartitioner?: ICustomPartitioner
retry?: RetryOptions
metadataMaxAge?: number
allowAutoTopicCreation?: boolean
transactionTimeout?: number
maxInFlightRequests?: number
}
const maxPoolSize = 5
const producer = new Producer(kafkaClient,config,maxPoolSize);
await producer.connect()
await producer.disconnect()
await producer.reset() // Not meant for manual usage
await producer.send("topic",[{
key: "test",
value: JSON.stringify({
event: "OrderCreated",
data: {
name: "example"
}
})
}])
// Either both topics get the messages or none do.
// This is meant for consume -> process -> produce -> commit offset atomically (can't be used manually).
await producer.runInTransaction(async (send)=> {
// Any processing
const result = await db.findById(exampleId)
// If you want to send a message to another topic
await send("topic-1",[
{
event: "OrderCreated",
data: {
result
}
}
])
await send("topic-2",[
{
event: "OrderCreated",
data: {
result
}
}
])
},{
groupId: "example-group",
topic,
partition,
offset: (BigInt(message.offset) + 1n).toString(),
})
`
$3
- ### Functions Available
- The connect() function connects and starts the consumers belonging to their group.
- The disconnect() function gracefully shuts down the consumers in the group by waiting for all in-flight messages to finish processing before shutting down.
- The subscribe() function registers consumers in a group to a specific topic and attaches the corresponding event handlers.
$3
- ### Properties & Functions Available
- The topic property returns the topic name.
- The on function registers an event handler for a specific event within a topic.
- The getSubscription() function returns the topic along with its registered event handlers, ready to be subscribed by a consumer group.
$3
- Everyone might have their own way of writing the code and initializing things, but here is an illustrative example.
`js
import { ConsumerGroup, TopicEvents, ConsumerGroupConfigs, ConsumerConfigs } from "kafkakit";
interface OrderCreated {
products: string[]
}
const setupOrdersTopic = async () => {
/*
* const db = db client (for example);
*/
const topicEvents = new TopicEvents("orders");
topicEvents.on("OrderCreated",async({
key, // if no key was set on the message its undefined
data, // data object without the event
ctx: {
producedAt, // timestamp when message was produced
receivedAt, // timestamp when consumer started processing
headers, // message header -> might be undefined if no header was set
metadata: { // additional context
consumerId, groupId, isLeader, // base metadata of the consumer -> might be undefined before the consumer joins the group
// context provided by you
region
},
send // Sends messages either transactionally or non-transactionally, depending on the topic configuration
}
})=> {
// Example: Only the leader consumer performs certain actions
if (isLeader)
console.log("I am the leader, performing leader-only logic...");
// Example: Store the region that processed the actions
db.store(region)
await send("Payment",[{
key: "example",
headers: {
source: "example-service"
},
value: JSON.stringify({
event: "OrderCreated",
data: {
orderId: key
}
})
}])
})
return topicEvents
};
const topicsHandlerSetupMap: Record<
string,
() => Promise>
> = {
orders: setupOrdersTopic
}
// Consumer Group
const consumerConfig: ConsumerConfigs = {
dlqTopic: "dead-letter-queue";
sessionTimeout?: number;
heartbeatInterval?: number;
rebalanceTimeout?: number;
partitionsConsumedConcurrently?: number;
/*
* Additional context that will be passed to consumers.
* This will run on each message.
* For best practices, avoid making database or external API calls here to prevent delays during message processing.
*/
meta?: ()=> ({
region: env.POD_REGION
})
}
const consumerGroupConfig:ConsumerGroupConfigs = {
groupId: "example",
totalConsumers: 3,
producer: // Producer cluster instance we created
kafkaClient // kafka client
consumerConfig:consumerConfig,
topics: [
{
topic: "orders",
useTransaction: true,
retries: {
count: 3,
retrialTopic: "orders-retry"
}
},
{
topic: "payment",
useTransaction: false,
// retries is optional
}
]
}
const consumerGroup = new ConsumerGroup(consumerGroupConfig)
await Promise.all(consumerGroupConfig.topics.map(async ({topic})=> {
const setupFn = topicsHandlerSetupMap[topic]
if(!setupFn) return
const topicEvents = await setupFn();
const subscription = topicEvents.getSubscription();
consumerGroup.subscribe(topic, subscription.handler);
}))
await consumerGroup.connect()
await consumerGroup.disconnect()
``