Created
April 12, 2024 12:15
-
-
Save fs-doc/505ea04c0b6ae179f9f0e33b6896ddee to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Kafka, Producer, Partitioners } from 'kafkajs'; | |
export class KafkaProducer{ | |
private kafka: Kafka; | |
private producer: Producer; | |
private brokers: string[]; | |
private crashes: number; | |
private maxCrashes: number; | |
constructor(private clientName: string){ | |
if(process.env.NODE_ENV === "production"){ | |
//Production environment. SSL may be required. | |
this.brokers = ['your-custom-url:9093'] | |
}else{ | |
//Local environment. No SSL. | |
this.brokers = ['127.0.0.1:9092']; | |
} | |
this.kafka = new Kafka({ | |
clientId: this.clientName, | |
brokers: [...this.brokers], | |
}); | |
this.crashes = 0; | |
this.maxCrashes = 5; | |
this._connectProducer(); | |
this._eventsListener(); | |
} | |
_eventsListener(){ | |
//More about available events here: https://kafka.js.org/docs/1.12.0/instrumentation-events#producer | |
const { REQUEST_TIMEOUT } = this.producer.events; | |
this.producer.on(REQUEST_TIMEOUT, (e) =>{ | |
//How to handle a producer timeout is totally up to you. | |
//For this example, we are just logging the event, but can be used to send an alert to a monitoring service or event stop the process. | |
console.log('Producer Timed-Out'); | |
}); | |
} | |
async _connectProducer(){ | |
try{ | |
//Create the producer instance | |
//Add config params if needed, check https://kafka.js.org/docs/1.12.0/producing | |
this.producer = this.kafka.producer(); | |
//Connect the instance to the Kafka cluster | |
await this.producer.connect(); | |
}catch (e){ | |
//We're killing the process if the producer cannot connect to the Kafka cluster. | |
//We consider the producer to be a critical part of the system. | |
process.exit(1); | |
} | |
} | |
async produce(topic: string, messages: {key: string | Buffer, value: string | Buffer}[], retryCallback?: (messages: {key: string | Buffer, value: string | Buffer}[]) => boolean){ | |
if(!this.producer){ | |
//If the producer is not connected, we try to connect again. | |
await this._connectProducer(); | |
} | |
try{ | |
//Broadcast the messages to the topic | |
await this.producer.send({ | |
topic: topic, | |
messages: messages, | |
}) | |
}catch (e){ | |
// If the producer crashes 5 times, it will disconnect the producer and retry the connection and process at a later time. | |
if(this.crashes === this.maxCrashes){ | |
console.error(e); | |
//This is optional. You can add a callback to retry the process at a later time, using for example a queue system. | |
retryCallback(messages); | |
await this.producer.disconnect(); | |
} | |
setTimeout(() => { | |
this.crashes++; | |
console.log('Unable to produce messages, trying again in 10 seconds'); | |
this.produce(topic, messages); | |
}, 10000); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment