Skip to content

Instantly share code, notes, and snippets.

@ps-jessejjohnson
Last active January 15, 2025 19:34
Show Gist options
  • Save ps-jessejjohnson/694cbcb89723a7ae3474df3e0c90e6ac to your computer and use it in GitHub Desktop.
Save ps-jessejjohnson/694cbcb89723a7ae3474df3e0c90e6ac to your computer and use it in GitHub Desktop.
Simple Messaging Client Abstraction
export interface IMessagingClient {
connect(): Promise<void>;
disconnect(): Promise<void>;
subscribe(topics: string[], callback: (message: string) => void): Promise<void>;
send(topic: string, message: string): Promise<void>;
}
import { Kafka, Consumer, Producer } from 'kafkajs';
import { IMessagingClient } from './IMessagingClient';
export class KafkaMessagingClient implements IMessagingClient {
private kafka: Kafka;
private producer: Producer;
private consumer: Consumer;
private brokers: string[];
private clientId: string;
constructor(brokers: string[], clientId: string, groupId: string) {
this.brokers = brokers;
this.clientId = clientId;
this.kafka = new Kafka({ brokers: this.brokers, clientId: this.clientId });
this.producer = this.kafka.producer();
this.consumer = this.kafka.consumer({ groupId });
}
async connect(): Promise<void> {
await this.producer.connect();
await this.consumer.connect();
}
async disconnect(): Promise<void> {
await this.producer.disconnect();
await this.consumer.disconnect();
}
async subscribe(topics: string[], callback: (message: unkown) => void): Promise<void> {
for (const topic of topics) {
await this.consumer.subscribe({ topic, fromBeginning: true });
}
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const messageValue = message.value?.toString();
if (messageValue) {
callback(messageValue);
}
},
});
}
async send(topic: string, message: string): Promise<void> {
await this.producer.send({
topic,
messages: [{ value: message }],
});
}
}
import { IMessagingClient } from './IMessagingClient';
export class MockMessagingClient implements IMessagingClient {
private messages: Map<string, string[]>;
private subscribers: Map<string, ((message: string) => void)[]>;
constructor() {
this.messages = new Map();
this.subscribers = new Map();
}
async connect(): Promise<void> {
console.log('MockMessagingClient connected.');
}
async disconnect(): Promise<void> {
this.messages.clear();
this.subscribers.clear();
console.log('MockMessagingClient disconnected.');
}
async subscribe(topics: string[], callback: (message: string) => void): Promise<void> {
for (const topic of topics) {
if (!this.subscribers.has(topic)) {
this.subscribers.set(topic, []);
}
this.subscribers.get(topic)?.push(callback);
}
}
async send(topic: string, message: string): Promise<void> {
if (!this.messages.has(topic)) {
this.messages.set(topic, []);
}
this.messages.get(topic)?.push(message);
const subscribers = this.subscribers.get(topic);
if (subscribers) {
for (const subscriber of subscribers) {
subscriber(message);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment