Last active
January 15, 2025 19:34
-
-
Save ps-jessejjohnson/694cbcb89723a7ae3474df3e0c90e6ac to your computer and use it in GitHub Desktop.
Simple Messaging Client Abstraction
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export interface IMessagingClient { | |
connect(): Promise<void>; | |
disconnect(): Promise<void>; | |
subscribe(topics: string[], callback: (message: string) => void): Promise<void>; | |
send(topic: string, message: string): Promise<void>; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Kafka, Consumer, Producer } from 'kafkajs'; | |
import { IMessagingClient } from './IMessagingClient'; | |
export class KafkaMessagingClient implements IMessagingClient { | |
private kafka: Kafka; | |
private producer: Producer; | |
private consumer: Consumer; | |
private brokers: string[]; | |
private clientId: string; | |
constructor(brokers: string[], clientId: string, groupId: string) { | |
this.brokers = brokers; | |
this.clientId = clientId; | |
this.kafka = new Kafka({ brokers: this.brokers, clientId: this.clientId }); | |
this.producer = this.kafka.producer(); | |
this.consumer = this.kafka.consumer({ groupId }); | |
} | |
async connect(): Promise<void> { | |
await this.producer.connect(); | |
await this.consumer.connect(); | |
} | |
async disconnect(): Promise<void> { | |
await this.producer.disconnect(); | |
await this.consumer.disconnect(); | |
} | |
async subscribe(topics: string[], callback: (message: unkown) => void): Promise<void> { | |
for (const topic of topics) { | |
await this.consumer.subscribe({ topic, fromBeginning: true }); | |
} | |
await this.consumer.run({ | |
eachMessage: async ({ topic, partition, message }) => { | |
const messageValue = message.value?.toString(); | |
if (messageValue) { | |
callback(messageValue); | |
} | |
}, | |
}); | |
} | |
async send(topic: string, message: string): Promise<void> { | |
await this.producer.send({ | |
topic, | |
messages: [{ value: message }], | |
}); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { IMessagingClient } from './IMessagingClient'; | |
export class MockMessagingClient implements IMessagingClient { | |
private messages: Map<string, string[]>; | |
private subscribers: Map<string, ((message: string) => void)[]>; | |
constructor() { | |
this.messages = new Map(); | |
this.subscribers = new Map(); | |
} | |
async connect(): Promise<void> { | |
console.log('MockMessagingClient connected.'); | |
} | |
async disconnect(): Promise<void> { | |
this.messages.clear(); | |
this.subscribers.clear(); | |
console.log('MockMessagingClient disconnected.'); | |
} | |
async subscribe(topics: string[], callback: (message: string) => void): Promise<void> { | |
for (const topic of topics) { | |
if (!this.subscribers.has(topic)) { | |
this.subscribers.set(topic, []); | |
} | |
this.subscribers.get(topic)?.push(callback); | |
} | |
} | |
async send(topic: string, message: string): Promise<void> { | |
if (!this.messages.has(topic)) { | |
this.messages.set(topic, []); | |
} | |
this.messages.get(topic)?.push(message); | |
const subscribers = this.subscribers.get(topic); | |
if (subscribers) { | |
for (const subscriber of subscribers) { | |
subscriber(message); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment