Skip to content

Instantly share code, notes, and snippets.

@mvandermeulen
Forked from AdamOlszewskiIT/election-manager.py
Created August 17, 2025 03:18
Show Gist options
  • Save mvandermeulen/afa4364bac58ef425b7aa039592ed177 to your computer and use it in GitHub Desktop.
Save mvandermeulen/afa4364bac58ef425b7aa039592ed177 to your computer and use it in GitHub Desktop.
Redis - master slave election manager
import time
import threading
import uuid
from fastapi import FastAPI
from redis import Redis
# Configuration
REDIS_HOST = "localhost"
REDIS_PORT = 6379
MASTER_KEY = "master"
MASTER_TTL = 10 # Time-to-live for the master key in seconds
INSTANCE_ID = str(uuid.uuid4()) # Unique ID for this instance
# Globals
app = FastAPI()
redis_client = Redis(host=REDIS_HOST, port=REDIS_PORT)
is_master = False # Flag to track if this instance is the master
@app.on_event("startup")
def startup():
"""Start the master election process."""
threading.Thread(target=master_election, daemon=True).start()
@app.on_event("shutdown")
def shutdown():
"""Clean up on shutdown."""
if is_master:
release_master()
@app.get("/health")
def health_check():
"""Health check endpoint."""
return {"status": "healthy", "is_master": is_master, "instance_id": INSTANCE_ID}
def master_election():
"""Attempt to become the master and maintain leadership."""
global is_master
while True:
try:
# Attempt to set the master key
acquired = redis_client.set(MASTER_KEY, INSTANCE_ID, nx=True, ex=MASTER_TTL)
if acquired:
# Successfully became the master
is_master = True
print(f"{INSTANCE_ID} is now the master!")
notify_new_master()
maintain_master_key()
else:
# Check who is the current master
current_master = redis_client.get(MASTER_KEY)
if current_master:
print(f"{INSTANCE_ID} sees {current_master.decode()} as the master.")
else:
print(f"{INSTANCE_ID} sees no master. Retrying election...")
is_master = False
time.sleep(2) # Retry or renew every 2 seconds
except Exception as e:
print(f"Error in master election: {e}")
time.sleep(2) # Backoff on error
def maintain_master_key():
"""Keep renewing the master key's TTL."""
global is_master
while is_master:
try:
# Extend the TTL to keep the master key alive
redis_client.expire(MASTER_KEY, MASTER_TTL)
print(f"{INSTANCE_ID} renewed master key TTL.")
time.sleep(MASTER_TTL // 2) # Renew halfway through TTL
except Exception as e:
print(f"Error maintaining master key: {e}")
is_master = False
def release_master():
"""Release the master role."""
try:
if is_master:
redis_client.delete(MASTER_KEY)
print(f"{INSTANCE_ID} released master role.")
except Exception as e:
print(f"Error releasing master role: {e}")
def notify_new_master():
"""Notify all instances of the new master using Redis Pub/Sub."""
try:
redis_client.publish("master_notifications", f"New master: {INSTANCE_ID}")
print(f"{INSTANCE_ID} published master notification.")
except Exception as e:
print(f"Error publishing master notification: {e}")
def listen_for_notifications():
"""Listen for master notifications using Redis Pub/Sub."""
pubsub = redis_client.pubsub()
pubsub.subscribe("master_notifications")
for message in pubsub.listen():
if message["type"] == "message":
print(f"Notification received: {message['data'].decode()}")
# Start listening for notifications in a separate thread
threading.Thread(target=listen_for_notifications, daemon=True).start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment