pip install python-consulconsul agent -dev# Terminal 1
python main.py node-1
# Terminal 2
python main.py node-2
# Terminal 3
python main.py node-3Then stop the leader and see that one of the follower becomes a new leader
pip install python-consulconsul agent -dev# Terminal 1
python main.py node-1
# Terminal 2
python main.py node-2
# Terminal 3
python main.py node-3Then stop the leader and see that one of the follower becomes a new leader
| import consul | |
| import time | |
| import sys | |
| import signal | |
| import logging | |
| from threading import Thread, Event | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(message)s', | |
| datefmt='%H:%M:%S' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class LeaderElection: | |
| def __init__(self, consul_host='127.0.0.1', consul_port=8500, key='service/leader', node_id='node'): | |
| self.client = consul.Consul(host=consul_host, port=consul_port) | |
| self.key = key | |
| self.node_id = node_id | |
| self.session_id = None | |
| self.is_leader = False | |
| self.stop_event = Event() | |
| self.renew_thread = None | |
| def create_session(self): | |
| """Create a Consul session for this node""" | |
| try: | |
| session_id = self.client.session.create( | |
| name=f'leader-election-{self.node_id}', | |
| ttl=10, # 10 seconds TTL | |
| behavior='delete', # Delete keys when session expires | |
| lock_delay=5 # 5 second lock delay | |
| ) | |
| self.session_id = session_id | |
| logger.info(f"[{self.node_id}] Created session: {session_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"[{self.node_id}] Failed to create session: {e}") | |
| return False | |
| def try_acquire_lock(self): | |
| """Attempt to acquire the leader lock""" | |
| try: | |
| acquired = self.client.kv.put( | |
| self.key, | |
| self.node_id.encode('utf-8'), | |
| acquire=self.session_id | |
| ) | |
| return acquired | |
| except Exception as e: | |
| logger.error(f"[{self.node_id}] Error acquiring lock: {e}") | |
| return False | |
| def release_lock(self): | |
| """Release the leader lock""" | |
| try: | |
| self.client.kv.put( | |
| self.key, | |
| self.node_id.encode('utf-8'), | |
| release=self.session_id | |
| ) | |
| logger.info(f"[{self.node_id}] Released lock") | |
| except Exception as e: | |
| logger.error(f"[{self.node_id}] Error releasing lock: {e}") | |
| def renew_session(self): | |
| """Periodically renew the session to keep it alive""" | |
| while not self.stop_event.is_set(): | |
| try: | |
| if self.session_id: | |
| result = self.client.session.renew(self.session_id) | |
| if result is None: | |
| logger.warning(f"[{self.node_id}] Failed to renew session") | |
| self.is_leader = False | |
| break | |
| except Exception as e: | |
| logger.error(f"[{self.node_id}] Session renewal error: {e}") | |
| self.is_leader = False | |
| break | |
| time.sleep(5) # Renew every 5 seconds | |
| def do_leader_work(self): | |
| """Perform work as the leader""" | |
| logger.info(f"[{self.node_id}] Doing leader work...") | |
| while not self.stop_event.is_set() and self.is_leader: | |
| logger.info(f"[{self.node_id}] ⚡ Leader: performing periodic task") | |
| time.sleep(2) | |
| if not self.is_leader: | |
| logger.info(f"[{self.node_id}] Lost leadership, stopping work") | |
| def run_election(self): | |
| """Main election loop""" | |
| if not self.create_session(): | |
| return | |
| # Start session renewal thread | |
| self.renew_thread = Thread(target=self.renew_session, daemon=True) | |
| self.renew_thread.start() | |
| leader_work_thread = None | |
| try: | |
| while not self.stop_event.is_set(): | |
| if not self.is_leader: | |
| acquired = self.try_acquire_lock() | |
| if acquired: | |
| self.is_leader = True | |
| logger.info(f"[{self.node_id}] BECAME LEADER") | |
| # Start leader work in a separate thread | |
| leader_work_thread = Thread(target=self.do_leader_work, daemon=True) | |
| leader_work_thread.start() | |
| else: | |
| logger.info(f"[{self.node_id}] Follower - waiting for leadership") | |
| time.sleep(3) | |
| finally: | |
| self.cleanup() | |
| def cleanup(self): | |
| """Clean up resources""" | |
| if self.is_leader: | |
| self.release_lock() | |
| if self.session_id: | |
| try: | |
| self.client.session.destroy(self.session_id) | |
| logger.info(f"[{self.node_id}] Destroyed session") | |
| except Exception as e: | |
| logger.error(f"[{self.node_id}] Error destroying session: {e}") | |
| def stop(self): | |
| """Stop the election process""" | |
| logger.info(f"[{self.node_id}] Stopping...") | |
| self.stop_event.set() | |
| def signal_handler(signum, frame): | |
| """Handle shutdown signals""" | |
| logger.info("Received shutdown signal") | |
| if 'election' in globals(): | |
| election.stop() | |
| sys.exit(0) | |
| if __name__ == '__main__': | |
| if len(sys.argv) < 2: | |
| print("Usage: python main.py <node-id>") | |
| sys.exit(1) | |
| node_id = sys.argv[1] | |
| # Set up signal handlers | |
| signal.signal(signal.SIGINT, signal_handler) | |
| signal.signal(signal.SIGTERM, signal_handler) | |
| # Create and run election | |
| election = LeaderElection( | |
| consul_host='127.0.0.1', | |
| consul_port=8500, | |
| key='service/leader', | |
| node_id=node_id | |
| ) | |
| logger.info(f"[{node_id}] Starting leader election...") | |
| election.run_election() | |
| logger.info(f"[{node_id}] Shutdown complete") |
| python-consul==1.1.0 |