Skip to content

Instantly share code, notes, and snippets.

@fs-doc
Last active April 12, 2024 10:50
Show Gist options
  • Save fs-doc/1a02b4a510916ac4fb5b779882e69b1e to your computer and use it in GitHub Desktop.
Save fs-doc/1a02b4a510916ac4fb5b779882e69b1e to your computer and use it in GitHub Desktop.
Basic Kafka Consumer using KafkaJS
import {
Kafka,
Consumer,
MessageSetEntry,
RecordBatchEntry,
ConsumerSubscribeTopics,
ITopicConfig
} from 'kafkajs';
import { v4 as uuidv4 } from 'uuid';
export type ConsumerCallback = (message: MessageSetEntry, topic: string, partition?: number ) => Promise<boolean>;
export type ConsumerMessage = MessageSetEntry | RecordBatchEntry;
export class KafkaConsumer{
private kafka: Kafka | null;
private consumer: Consumer | null;
private consumerId: string;
private brokers: string[];
private crashes: number;
private retries: number;
private maxCrashes: number;
constructor(private topics: ConsumerSubscribeTopics['topics'], private groupId: string){
if(process.env.NODE_ENV === "production"){
//Production environment. SSL may be required.
this.brokers = ['your-custom-url:9093']
}else{
//Local environment. No SSL.
this.brokers = ['127.0.0.1:9092'];
}
this.consumerId = uuidv4(); //Unique ID for the consumer
this.kafka = null;
this.consumer = null;
//Optional. This is used to track the number of crashes and retries the consumer has had and stop the process if it crashes too many times.
this.crashes = 0;
this.maxCrashes = 5;
this.retries = 10;
//Initialization
this._initConsumer();
this._eventsListener();
}
_initConsumer(){
try{
//Creates the Kafka client instance, which is connected to the brokers
this.kafka = new Kafka({
clientId: `client.consumer.${this.groupId}.${this.consumerId}`, //Unique ID for the client
brokers: [...this.brokers],
});
//Creates the consumer instance
this.consumer = this.kafka.consumer({
groupId: this.groupId, //Unique ID for the consumer group. This prevents this consumer from reading messages that have already been read by another consumer in the same group.
readUncommitted: true //This allows to process uncommitted messages when the consumer is initiated
});
}catch (e) {
//This is optional. We retry connection every 10 secconds.
//We consider the consumer to be a critical part of the system so we kill the process if there is no connection after 5 retries.
this.crashes++;
if(this.crashes === this.maxCrashes){
console.error('Consumer cannot be initialized', e);
process.exit(1);
}
//Retry connection
setTimeout(() => {
this._initConsumer();
}, 10000);
}
}
_eventsListener(){
/*As we consider the consumer to be a critical part of the system, we kill the process if the consumer stops or disconnects*/
/*We need to listen to relevant consumer events*/
if(!this.consumer) throw new Error('Consumer not initialized');
const { CONNECT, DISCONNECT, REQUEST_TIMEOUT, STOP } = this.consumer.events;
this.consumer.on(STOP, async (e) =>{
//It could be worth to send a message to a monitoring service to alert that the consumer has stopped.
process.exit(1);
});
this.consumer.on(DISCONNECT, async (e) =>{
//It could be worth to send a message to a monitoring service to alert that the consumer has stopped.
process.exit(1);
});
}
async _configureConsumer(){
if(!this.consumer) throw new Error('Unable to initialize consumer instance');
if(!this.kafka) throw new Error('Unable to initialize kafka client instance');
//Connect the consumer to the brokers
await this.consumer.connect();
//Access to admin methods
const admin = this.kafka.admin();
const topicsConfig: ITopicConfig[] = [];
//Get Cluster's info
const cluster = await admin.describeCluster();
//Create the topics in the server.
this.topics.map((topic) =>{
topicsConfig.push(
{
topic: topic.toString(), //Must be a string
numPartitions: 2, // Increases the number of partitions in the topic. Default: 1. Improves scalability.
replicationFactor: cluster.brokers.length, // default: -1 (uses broker `default.replication.factor` configuration)
replicaAssignment: [], // Example: [{ partition: 0, replicas: [0,1,2] }] - default: []
configEntries: [] // Example: [{ name: 'cleanup.policy', value: 'compact' }] - default: []
}
);
});
//createTopics will resolve to true if the topic was created successfully or false if it already exists.
await admin.createTopics({
validateOnly: false,
waitForLeaders: true,
timeout: 30000,
topics: topicsConfig
}).then(async (resolve) =>{
//Subscribes the consumer to the given topics.
await this.consumer.subscribe({ topics: this.topics, fromBeginning: true });
}).catch((err) =>{
//Log any possible error
console.error('ERROR: '+err);
//If the consumer cannot be subscribed to the topics, we kill the process.
process.exit(1);
});
}
async processMessageWithRetry(topic: string, partition: number, message: ConsumerMessage, callback, retryCount = 0) {
try {
await callback(message);
await this.consumer.commitOffsets([{ topic: topic, partition: partition, offset: message.offset + 1 }]);
} catch (error) {
/*
* Here we're retrying the message processing if it fails.
* Manual commit ensures that the message is not lost and the order is maintained.
* */
//Optional: console.error(`Error processing message: ${error.message}`);
if (retryCount < this.retries) {
//Retry after 10 seconds
await new Promise(resolve => setTimeout(resolve, 10000));
await this.processMessageWithRetry(topic, partition, message, callback, retryCount + 1);
} else {
//Max retries exceeded. Restart the consumer at the current offset.
await this.consumer.seek({ topic: topic, partition: partition, offset: message.offset });
//Kill the process. This helps to avoid infinite loops and potential memory leaks.
//There could be an error blocking the processing of the message in the running service and restarting the consumer could help.
process.exit(1);
}
}
};
/*
* The following method is the one user by the serviceConsumer to consume messages.
* A callback function is passed to process the message.
* The callback function must return a boolean value to indicate if the message was processed successfully.
* The callback function logic will depend on the service that is consuming the messages.
* */
async consume(callback: ConsumerCallback){
if(!this.consumer) throw new Error('Kafka consumer client instance not initialized');
//This error kills the process. The consumer is a critical part of the system.
//Configure consumer
await this._configureConsumer();
await this.consumer.run({
autoCommit: false, //We want to commit the offset manually
eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
await this.processMessageWithRetry(topic, partition, message, callback);
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment