[](https://dev.azure.com/apt-x/Digital%20Twin/_build/latest?definitionId=128&branchName=master)
npm install @imec/digital-twin-kafka-utils
Kafka manager is a wrapper around Kafkajs for the Digital Twin project.
It's meant to be available publicly from NPM.
> First of all you need to be a collaborator of the npm package. Ask david.vermeir@imec.be or ismail.kutlu@imec.be to get access.
1. Create a branch
2. Update code
3. Run yarn build
4. Run run npm version
5. Commit changes and the version update
6. Open PR
7. Merge PR
8. Checkout to master
9. Run npm publish on master
``javascript
import KafkaManager, { ProducerRecord, KafkaMessage } from ".";
interface RoadData{
name: string,
age: number
}
interface HobbitData{
name: string,
address: string
}
export default class Example {
kafka!: KafkaManager
async init() {
this.kafka = new KafkaManager({ brokers: ['kafka-server:9092']});
const producerConfig = {} // producer config from kafkajs, optional can be left out
const kafkaProducer = await this.kafka.createProducer(producerConfig);
const consumerConfig = {groupId: "hello"};
const kafkaConsumer = await this.kafka.createConsumer(consumerConfig);
const topic = 'traffic-loop';
const record: ProducerRecord = {
topic,
messages: [
{
headers: {'sequenceId': '2'},
key: 'remove road',
value: {name: 'my awesome name', age: 124}
},
{
headers: {'sequenceId': '2'},
key: 'add hobbit',
value: {name: 'hobbit', address: 'Esgaroth'}
}
]
}
setInterval(() => {
console.log('publishing');
this.kafka.publish(kafkaProducer, record);
}, 5000)
function eachBatchFn(messages: KafkaMessage
messages.forEach(element => {
switch (element.key) {
case "add hobbit":
console.log("Handle the hobbit message");
break;
case "remove road":
console.log("Handle the road message");
break;
default:
console.log("Handle unexpected default");
}
});
console.log("batch messages", messages)
}
function eachMessageFn(message: KafkaMessage
console.log("messages", message)
switch (message.key) {
case "add hobbit":
console.log("Handle the hobbit message");
break;
case "remove road":
console.log("Handle the road message");
break;
default:
console.log("Handle unexpected default");
}
}
// Subscribing to a topic can be done using the two methods below. Note that you cannot have eachMessage and eachBatch in the same subscription! This will cause kafkajs to only run one of the two.
// Use for each message subscription
this.kafka.subscribe
subscribeTopic: {topic},
eachMessage: eachMessageFn
});
// Use batch subscription
this.kafka.subscribe
subscribeTopic: {topic},
eachBatch: eachBatchFn,
});
}
}
``