Created
January 7, 2025 13:51
-
-
Save AdamOlszewskiIT/cc843c4a5f62282dce1fe7291c4c8479 to your computer and use it in GitHub Desktop.
Redis - master slave election manager
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import threading | |
import uuid | |
from fastapi import FastAPI | |
from redis import Redis | |
# Configuration | |
REDIS_HOST = "localhost" | |
REDIS_PORT = 6379 | |
MASTER_KEY = "master" | |
MASTER_TTL = 10 # Time-to-live for the master key in seconds | |
INSTANCE_ID = str(uuid.uuid4()) # Unique ID for this instance | |
# Globals | |
app = FastAPI() | |
redis_client = Redis(host=REDIS_HOST, port=REDIS_PORT) | |
is_master = False # Flag to track if this instance is the master | |
@app.on_event("startup") | |
def startup(): | |
"""Start the master election process.""" | |
threading.Thread(target=master_election, daemon=True).start() | |
@app.on_event("shutdown") | |
def shutdown(): | |
"""Clean up on shutdown.""" | |
if is_master: | |
release_master() | |
@app.get("/health") | |
def health_check(): | |
"""Health check endpoint.""" | |
return {"status": "healthy", "is_master": is_master, "instance_id": INSTANCE_ID} | |
def master_election(): | |
"""Attempt to become the master and maintain leadership.""" | |
global is_master | |
while True: | |
try: | |
# Attempt to set the master key | |
acquired = redis_client.set(MASTER_KEY, INSTANCE_ID, nx=True, ex=MASTER_TTL) | |
if acquired: | |
# Successfully became the master | |
is_master = True | |
print(f"{INSTANCE_ID} is now the master!") | |
notify_new_master() | |
maintain_master_key() | |
else: | |
# Check who is the current master | |
current_master = redis_client.get(MASTER_KEY) | |
if current_master: | |
print(f"{INSTANCE_ID} sees {current_master.decode()} as the master.") | |
else: | |
print(f"{INSTANCE_ID} sees no master. Retrying election...") | |
is_master = False | |
time.sleep(2) # Retry or renew every 2 seconds | |
except Exception as e: | |
print(f"Error in master election: {e}") | |
time.sleep(2) # Backoff on error | |
def maintain_master_key(): | |
"""Keep renewing the master key's TTL.""" | |
global is_master | |
while is_master: | |
try: | |
# Extend the TTL to keep the master key alive | |
redis_client.expire(MASTER_KEY, MASTER_TTL) | |
print(f"{INSTANCE_ID} renewed master key TTL.") | |
time.sleep(MASTER_TTL // 2) # Renew halfway through TTL | |
except Exception as e: | |
print(f"Error maintaining master key: {e}") | |
is_master = False | |
def release_master(): | |
"""Release the master role.""" | |
try: | |
if is_master: | |
redis_client.delete(MASTER_KEY) | |
print(f"{INSTANCE_ID} released master role.") | |
except Exception as e: | |
print(f"Error releasing master role: {e}") | |
def notify_new_master(): | |
"""Notify all instances of the new master using Redis Pub/Sub.""" | |
try: | |
redis_client.publish("master_notifications", f"New master: {INSTANCE_ID}") | |
print(f"{INSTANCE_ID} published master notification.") | |
except Exception as e: | |
print(f"Error publishing master notification: {e}") | |
def listen_for_notifications(): | |
"""Listen for master notifications using Redis Pub/Sub.""" | |
pubsub = redis_client.pubsub() | |
pubsub.subscribe("master_notifications") | |
for message in pubsub.listen(): | |
if message["type"] == "message": | |
print(f"Notification received: {message['data'].decode()}") | |
# Start listening for notifications in a separate thread | |
threading.Thread(target=listen_for_notifications, daemon=True).start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment