Skip to content

Instantly share code, notes, and snippets.

@puuble
Last active January 17, 2023 13:58
Show Gist options
  • Select an option

  • Save puuble/badcfcdfaaff2d831ef4bed98574799f to your computer and use it in GitHub Desktop.

Select an option

Save puuble/badcfcdfaaff2d831ef4bed98574799f to your computer and use it in GitHub Desktop.
python event bus
from collections import defaultdict
from threading import Lock
class EventBus:
def __init__(self):
self.events = defaultdict(list)
self.lock = Lock()
def subscribe(self, event, callback):
with self.lock:
self.events[event].append(callback)
def unsubscribe(self, event, callback):
with self.lock:
self.events[event].remove(callback)
def emit(self, event, *args, **kwargs):
with self.lock:
for callback in self.events[event]:
callback(*args, **kwargs)
from event_bus import EventBus
event_bus = EventBus()
event_bus.emit('myEvent', {'data': 'hello'})
from event_bus import EventBus
event_bus = EventBus()
def handle_event(event_data):
print(event_data)
event_bus.subscribe('myEvent', handle_event)
import asyncio
#if async
async def handle_event(event_data):
# Some async operation
result = await some_async_operation()
print(result)
event_bus.subscribe('myEvent', handle_event)
import json
from collections import defaultdict
from threading import Lock
class EventBus:
def __init__(self):
self.events = defaultdict(list)
self.lock = Lock()
def subscribe(self, event, callback):
with self.lock:
self.events[event].append(callback)
def unsubscribe(self, event, callback):
with self.lock:
self.events[event].remove(callback)
def emit(self, event, *args, **kwargs):
with self.lock:
for callback in self.events[event]:
callback(*args, **kwargs)
event_bus = EventBus()
# Read the file in chunks
with open('large_file.json', 'r') as f:
while True:
chunk = f.read(1024 * 1024) # read 1MB at a time
if not chunk:
break
# Send the chunk to the event bus
event_bus.emit('dataChunk', json.loads(chunk))
import json
import boto3 #pip install boto3
sqs = boto3.client('sqs')
queue_url = 'YOUR_QUEUE_URL'
# Read the file in chunks
with open('large_file.json', 'r') as f:
while True:
chunk = f.read(1024 * 1024) # read 1MB at a time
if not chunk:
break
# Send the chunk to SQS
sqs.send_message(QueueUrl=queue_url, MessageBody=json.dumps(chunk))
from pymongo import MongoClient
# Connect to MongoDB
client = MongoClient("mongodb://localhost:27017/")
# Select the database
db = client["mydb"]
# Enable sharding on the collection
db.admin.command({"enableSharding": "mydb"})
# Choose the shard key
shard_key = {"user_id": "hashed"}
# Shard the collection
db.admin.command({"shardCollection": "mydb.bigCollection", "key": shard_key})
db.admin.command({"shardCollection": "mydb.bigCollection", "key": shard_key, "numInitialChunks":10})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment