Skip to content

Instantly share code, notes, and snippets.

@kgantsov
Last active November 12, 2025 07:23
Show Gist options
  • Select an option

  • Save kgantsov/60f197a469b5af099f0d5cf9e2f3d685 to your computer and use it in GitHub Desktop.

Select an option

Save kgantsov/60f197a469b5af099f0d5cf9e2f3d685 to your computer and use it in GitHub Desktop.
Leader election with Consul

Leader election with Consul

Install the Python Consul client:

pip install python-consul

Run consul

consul agent -dev

Run multiple instances to see leader election:

# Terminal 1
python main.py node-1

# Terminal 2
python main.py node-2

# Terminal 3
python main.py node-3

Then stop the leader and see that one of the follower becomes a new leader

import consul
import time
import sys
import signal
import logging
from threading import Thread, Event
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger(__name__)
class LeaderElection:
def __init__(self, consul_host='127.0.0.1', consul_port=8500, key='service/leader', node_id='node'):
self.client = consul.Consul(host=consul_host, port=consul_port)
self.key = key
self.node_id = node_id
self.session_id = None
self.is_leader = False
self.stop_event = Event()
self.renew_thread = None
def create_session(self):
"""Create a Consul session for this node"""
try:
session_id = self.client.session.create(
name=f'leader-election-{self.node_id}',
ttl=10, # 10 seconds TTL
behavior='delete', # Delete keys when session expires
lock_delay=5 # 5 second lock delay
)
self.session_id = session_id
logger.info(f"[{self.node_id}] Created session: {session_id}")
return True
except Exception as e:
logger.error(f"[{self.node_id}] Failed to create session: {e}")
return False
def try_acquire_lock(self):
"""Attempt to acquire the leader lock"""
try:
acquired = self.client.kv.put(
self.key,
self.node_id.encode('utf-8'),
acquire=self.session_id
)
return acquired
except Exception as e:
logger.error(f"[{self.node_id}] Error acquiring lock: {e}")
return False
def release_lock(self):
"""Release the leader lock"""
try:
self.client.kv.put(
self.key,
self.node_id.encode('utf-8'),
release=self.session_id
)
logger.info(f"[{self.node_id}] Released lock")
except Exception as e:
logger.error(f"[{self.node_id}] Error releasing lock: {e}")
def renew_session(self):
"""Periodically renew the session to keep it alive"""
while not self.stop_event.is_set():
try:
if self.session_id:
result = self.client.session.renew(self.session_id)
if result is None:
logger.warning(f"[{self.node_id}] Failed to renew session")
self.is_leader = False
break
except Exception as e:
logger.error(f"[{self.node_id}] Session renewal error: {e}")
self.is_leader = False
break
time.sleep(5) # Renew every 5 seconds
def do_leader_work(self):
"""Perform work as the leader"""
logger.info(f"[{self.node_id}] Doing leader work...")
while not self.stop_event.is_set() and self.is_leader:
logger.info(f"[{self.node_id}] ⚡ Leader: performing periodic task")
time.sleep(2)
if not self.is_leader:
logger.info(f"[{self.node_id}] Lost leadership, stopping work")
def run_election(self):
"""Main election loop"""
if not self.create_session():
return
# Start session renewal thread
self.renew_thread = Thread(target=self.renew_session, daemon=True)
self.renew_thread.start()
leader_work_thread = None
try:
while not self.stop_event.is_set():
if not self.is_leader:
acquired = self.try_acquire_lock()
if acquired:
self.is_leader = True
logger.info(f"[{self.node_id}] BECAME LEADER")
# Start leader work in a separate thread
leader_work_thread = Thread(target=self.do_leader_work, daemon=True)
leader_work_thread.start()
else:
logger.info(f"[{self.node_id}] Follower - waiting for leadership")
time.sleep(3)
finally:
self.cleanup()
def cleanup(self):
"""Clean up resources"""
if self.is_leader:
self.release_lock()
if self.session_id:
try:
self.client.session.destroy(self.session_id)
logger.info(f"[{self.node_id}] Destroyed session")
except Exception as e:
logger.error(f"[{self.node_id}] Error destroying session: {e}")
def stop(self):
"""Stop the election process"""
logger.info(f"[{self.node_id}] Stopping...")
self.stop_event.set()
def signal_handler(signum, frame):
"""Handle shutdown signals"""
logger.info("Received shutdown signal")
if 'election' in globals():
election.stop()
sys.exit(0)
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python main.py <node-id>")
sys.exit(1)
node_id = sys.argv[1]
# Set up signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Create and run election
election = LeaderElection(
consul_host='127.0.0.1',
consul_port=8500,
key='service/leader',
node_id=node_id
)
logger.info(f"[{node_id}] Starting leader election...")
election.run_election()
logger.info(f"[{node_id}] Shutdown complete")
python-consul==1.1.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment