Skip to content

Instantly share code, notes, and snippets.

@fs-doc
Created April 12, 2024 12:15
Show Gist options
  • Save fs-doc/505ea04c0b6ae179f9f0e33b6896ddee to your computer and use it in GitHub Desktop.
Save fs-doc/505ea04c0b6ae179f9f0e33b6896ddee to your computer and use it in GitHub Desktop.
import { Kafka, Producer, Partitioners } from 'kafkajs';
export class KafkaProducer{
private kafka: Kafka;
private producer: Producer;
private brokers: string[];
private crashes: number;
private maxCrashes: number;
constructor(private clientName: string){
if(process.env.NODE_ENV === "production"){
//Production environment. SSL may be required.
this.brokers = ['your-custom-url:9093']
}else{
//Local environment. No SSL.
this.brokers = ['127.0.0.1:9092'];
}
this.kafka = new Kafka({
clientId: this.clientName,
brokers: [...this.brokers],
});
this.crashes = 0;
this.maxCrashes = 5;
this._connectProducer();
this._eventsListener();
}
_eventsListener(){
//More about available events here: https://kafka.js.org/docs/1.12.0/instrumentation-events#producer
const { REQUEST_TIMEOUT } = this.producer.events;
this.producer.on(REQUEST_TIMEOUT, (e) =>{
//How to handle a producer timeout is totally up to you.
//For this example, we are just logging the event, but can be used to send an alert to a monitoring service or event stop the process.
console.log('Producer Timed-Out');
});
}
async _connectProducer(){
try{
//Create the producer instance
//Add config params if needed, check https://kafka.js.org/docs/1.12.0/producing
this.producer = this.kafka.producer();
//Connect the instance to the Kafka cluster
await this.producer.connect();
}catch (e){
//We're killing the process if the producer cannot connect to the Kafka cluster.
//We consider the producer to be a critical part of the system.
process.exit(1);
}
}
async produce(topic: string, messages: {key: string | Buffer, value: string | Buffer}[], retryCallback?: (messages: {key: string | Buffer, value: string | Buffer}[]) => boolean){
if(!this.producer){
//If the producer is not connected, we try to connect again.
await this._connectProducer();
}
try{
//Broadcast the messages to the topic
await this.producer.send({
topic: topic,
messages: messages,
})
}catch (e){
// If the producer crashes 5 times, it will disconnect the producer and retry the connection and process at a later time.
if(this.crashes === this.maxCrashes){
console.error(e);
//This is optional. You can add a callback to retry the process at a later time, using for example a queue system.
retryCallback(messages);
await this.producer.disconnect();
}
setTimeout(() => {
this.crashes++;
console.log('Unable to produce messages, trying again in 10 seconds');
this.produce(topic, messages);
}, 10000);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment