Last active
May 20, 2023 10:55
-
-
Save shivendrasoni/298da557b471f5a851843f118dee76ce to your computer and use it in GitHub Desktop.
Build a basic in-memory queue system: https://drive.google.com/file/d/1rsBxbrH_qwBi8hNu6TdRpJu-cKIYgnFl/view?usp=share_link
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Message: | |
def __init__(self, content): | |
self.content = content | |
self.read_by = set() | |
class Topic: | |
def __init__(self, name): | |
self.name = name | |
self.messages = [] | |
self.consumer_groups = {} | |
def add_message(self, message): | |
self.messages.append(Message(message)) | |
def add_consumer_group(self, consumer_group): | |
self.consumer_groups[consumer_group.name] = consumer_group | |
def read_message(self, consumer_group_name): | |
consumer_group = self.consumer_groups.get(consumer_group_name) | |
if not consumer_group: | |
raise Exception(f"Consumer group {consumer_group_name} does not exist") | |
for message in self.messages: | |
if consumer_group not in message.read_by: | |
message.read_by.add(consumer_group) | |
if len(message.read_by) == len(self.consumer_groups): | |
self.messages.remove(message) | |
return message.content | |
return None | |
class ConsumerGroup: | |
def __init__(self, name): | |
self.name = name | |
class MessageQueue: | |
def __init__(self): | |
self.topics = {} | |
def create_topic(self, topic_name, consumer_group_names): | |
if topic_name not in self.topics: | |
self.topics[topic_name] = Topic(topic_name) | |
for consumer_group_name in consumer_group_names: | |
self.topics[topic_name].add_consumer_group(ConsumerGroup(consumer_group_name)) | |
def push(self, topic_name, message): | |
if topic_name in self.topics: | |
self.topics[topic_name].add_message(message) | |
else: | |
raise Exception(f"Topic {topic_name} does not exist") | |
def poll(self, topic_name, consumer_group_name): | |
if topic_name in self.topics: | |
return self.topics[topic_name].read_message(consumer_group_name) | |
else: | |
raise Exception(f"Topic {topic_name} does not exist") | |
mq = MessageQueue() | |
mq.create_topic("logs", ["analytics", "dev"]) | |
mq.push("logs", "Message 1") | |
mq.push("logs", "Message 2") | |
mq.push("logs", "Message 3") | |
print(mq.poll("logs", "analytics")) # prints "Message 1" | |
print(mq.poll("logs", "dev")) # prints "Message 1" | |
print(mq.poll("logs", "analytics")) # prints "Message 2" | |
print(mq.poll("logs", "dev")) # prints "Message 2" | |
print(mq.poll("logs", "analytics")) # prints "Message 3" | |
print(mq.poll("logs", "dev")) # prints "Message 3" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment