Last active
April 12, 2024 10:50
-
-
Save fs-doc/1a02b4a510916ac4fb5b779882e69b1e to your computer and use it in GitHub Desktop.
Basic Kafka Consumer using KafkaJS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
Kafka, | |
Consumer, | |
MessageSetEntry, | |
RecordBatchEntry, | |
ConsumerSubscribeTopics, | |
ITopicConfig | |
} from 'kafkajs'; | |
import { v4 as uuidv4 } from 'uuid'; | |
export type ConsumerCallback = (message: MessageSetEntry, topic: string, partition?: number ) => Promise<boolean>; | |
export type ConsumerMessage = MessageSetEntry | RecordBatchEntry; | |
export class KafkaConsumer{ | |
private kafka: Kafka | null; | |
private consumer: Consumer | null; | |
private consumerId: string; | |
private brokers: string[]; | |
private crashes: number; | |
private retries: number; | |
private maxCrashes: number; | |
constructor(private topics: ConsumerSubscribeTopics['topics'], private groupId: string){ | |
if(process.env.NODE_ENV === "production"){ | |
//Production environment. SSL may be required. | |
this.brokers = ['your-custom-url:9093'] | |
}else{ | |
//Local environment. No SSL. | |
this.brokers = ['127.0.0.1:9092']; | |
} | |
this.consumerId = uuidv4(); //Unique ID for the consumer | |
this.kafka = null; | |
this.consumer = null; | |
//Optional. This is used to track the number of crashes and retries the consumer has had and stop the process if it crashes too many times. | |
this.crashes = 0; | |
this.maxCrashes = 5; | |
this.retries = 10; | |
//Initialization | |
this._initConsumer(); | |
this._eventsListener(); | |
} | |
_initConsumer(){ | |
try{ | |
//Creates the Kafka client instance, which is connected to the brokers | |
this.kafka = new Kafka({ | |
clientId: `client.consumer.${this.groupId}.${this.consumerId}`, //Unique ID for the client | |
brokers: [...this.brokers], | |
}); | |
//Creates the consumer instance | |
this.consumer = this.kafka.consumer({ | |
groupId: this.groupId, //Unique ID for the consumer group. This prevents this consumer from reading messages that have already been read by another consumer in the same group. | |
readUncommitted: true //This allows to process uncommitted messages when the consumer is initiated | |
}); | |
}catch (e) { | |
//This is optional. We retry connection every 10 secconds. | |
//We consider the consumer to be a critical part of the system so we kill the process if there is no connection after 5 retries. | |
this.crashes++; | |
if(this.crashes === this.maxCrashes){ | |
console.error('Consumer cannot be initialized', e); | |
process.exit(1); | |
} | |
//Retry connection | |
setTimeout(() => { | |
this._initConsumer(); | |
}, 10000); | |
} | |
} | |
_eventsListener(){ | |
/*As we consider the consumer to be a critical part of the system, we kill the process if the consumer stops or disconnects*/ | |
/*We need to listen to relevant consumer events*/ | |
if(!this.consumer) throw new Error('Consumer not initialized'); | |
const { CONNECT, DISCONNECT, REQUEST_TIMEOUT, STOP } = this.consumer.events; | |
this.consumer.on(STOP, async (e) =>{ | |
//It could be worth to send a message to a monitoring service to alert that the consumer has stopped. | |
process.exit(1); | |
}); | |
this.consumer.on(DISCONNECT, async (e) =>{ | |
//It could be worth to send a message to a monitoring service to alert that the consumer has stopped. | |
process.exit(1); | |
}); | |
} | |
async _configureConsumer(){ | |
if(!this.consumer) throw new Error('Unable to initialize consumer instance'); | |
if(!this.kafka) throw new Error('Unable to initialize kafka client instance'); | |
//Connect the consumer to the brokers | |
await this.consumer.connect(); | |
//Access to admin methods | |
const admin = this.kafka.admin(); | |
const topicsConfig: ITopicConfig[] = []; | |
//Get Cluster's info | |
const cluster = await admin.describeCluster(); | |
//Create the topics in the server. | |
this.topics.map((topic) =>{ | |
topicsConfig.push( | |
{ | |
topic: topic.toString(), //Must be a string | |
numPartitions: 2, // Increases the number of partitions in the topic. Default: 1. Improves scalability. | |
replicationFactor: cluster.brokers.length, // default: -1 (uses broker `default.replication.factor` configuration) | |
replicaAssignment: [], // Example: [{ partition: 0, replicas: [0,1,2] }] - default: [] | |
configEntries: [] // Example: [{ name: 'cleanup.policy', value: 'compact' }] - default: [] | |
} | |
); | |
}); | |
//createTopics will resolve to true if the topic was created successfully or false if it already exists. | |
await admin.createTopics({ | |
validateOnly: false, | |
waitForLeaders: true, | |
timeout: 30000, | |
topics: topicsConfig | |
}).then(async (resolve) =>{ | |
//Subscribes the consumer to the given topics. | |
await this.consumer.subscribe({ topics: this.topics, fromBeginning: true }); | |
}).catch((err) =>{ | |
//Log any possible error | |
console.error('ERROR: '+err); | |
//If the consumer cannot be subscribed to the topics, we kill the process. | |
process.exit(1); | |
}); | |
} | |
async processMessageWithRetry(topic: string, partition: number, message: ConsumerMessage, callback, retryCount = 0) { | |
try { | |
await callback(message); | |
await this.consumer.commitOffsets([{ topic: topic, partition: partition, offset: message.offset + 1 }]); | |
} catch (error) { | |
/* | |
* Here we're retrying the message processing if it fails. | |
* Manual commit ensures that the message is not lost and the order is maintained. | |
* */ | |
//Optional: console.error(`Error processing message: ${error.message}`); | |
if (retryCount < this.retries) { | |
//Retry after 10 seconds | |
await new Promise(resolve => setTimeout(resolve, 10000)); | |
await this.processMessageWithRetry(topic, partition, message, callback, retryCount + 1); | |
} else { | |
//Max retries exceeded. Restart the consumer at the current offset. | |
await this.consumer.seek({ topic: topic, partition: partition, offset: message.offset }); | |
//Kill the process. This helps to avoid infinite loops and potential memory leaks. | |
//There could be an error blocking the processing of the message in the running service and restarting the consumer could help. | |
process.exit(1); | |
} | |
} | |
}; | |
/* | |
* The following method is the one user by the serviceConsumer to consume messages. | |
* A callback function is passed to process the message. | |
* The callback function must return a boolean value to indicate if the message was processed successfully. | |
* The callback function logic will depend on the service that is consuming the messages. | |
* */ | |
async consume(callback: ConsumerCallback){ | |
if(!this.consumer) throw new Error('Kafka consumer client instance not initialized'); | |
//This error kills the process. The consumer is a critical part of the system. | |
//Configure consumer | |
await this._configureConsumer(); | |
await this.consumer.run({ | |
autoCommit: false, //We want to commit the offset manually | |
eachMessage: async ({ topic, partition, message, heartbeat, pause }) => { | |
await this.processMessageWithRetry(topic, partition, message, callback); | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment