Last active
March 6, 2024 07:17
-
-
Save krzysztofantczak/27b2ed098d88417032916ba4dfeeadbd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import signal | |
import socket | |
import sys | |
import threading | |
import time | |
from prometheus_client import start_http_server, Gauge, Counter | |
class Peer: | |
def __init__(self, name): | |
self.name = name | |
self.connections = {} # Dictionary to store client connections | |
self.latencies = {} # Dictionary to store latency measurements | |
self.packet_loss_rates = {} # Dictionary to store packet loss rates | |
self.sent_pings = {} # Dictionary to store the number of sent ping messages | |
self.received_pongs = {} # Dictionary to store the number of received pong messages | |
self.lock = threading.Lock() # Lock to synchronize access to shared resources | |
self.server_socket = None # Server socket for accepting incoming connections | |
self.running = True # Flag to control the main loop | |
# Prometheus metrics to expose latency measurements | |
self.latency_seconds_gauge = Gauge('peer_latency_seconds', 'Latency to peers in seconds', ['peer']) | |
self.latency_milliseconds_gauge = Gauge('peer_latency_milliseconds', 'Latency to peers in milliseconds', | |
['peer']) | |
self.latency_microseconds_gauge = Gauge('peer_latency_microseconds', 'Latency to peers in microseconds', | |
['peer']) | |
# Prometheus metric to expose packet loss rates | |
self.packet_loss_rate_gauge = Gauge('peer_packet_loss_rate', 'Packet loss rate to peers', ['peer']) | |
# Prometheus counter to track the number of times a peer goes offline | |
self.peer_offline_counter = Counter('peer_offline_counter', 'Number of times a peer goes offline', ['peer']) | |
def start(self): | |
# Start server and background threads | |
threading.Thread(target=self.start_server).start() | |
threading.Thread(target=self.connect_to_peers).start() | |
threading.Thread(target=self.send_ping).start() | |
# Start Prometheus HTTP server on a port incremented by 1 from the peer's port | |
port = self.get_peer_address(self.name)[1] + 1 | |
print(f"Starting metrics server on port {port}") | |
start_http_server(port) | |
def start_server(self): | |
# Bind server socket to the peer's address and listen for incoming connections | |
host, port = self.get_peer_address(self.name) | |
while True: | |
try: | |
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.server_socket.bind((host, port)) | |
self.server_socket.listen(5) | |
print(f"{self.name} is listening on {host}:{port}") | |
break | |
except OSError as e: | |
if e.errno == 98: | |
print(f"Port {port} is already in use. Retrying in 5 seconds...") | |
time.sleep(5) | |
else: | |
raise | |
while self.running: | |
try: | |
# Accept incoming connections and handle them in separate threads | |
client_socket, addr = self.server_socket.accept() | |
threading.Thread(target=self.handle_client, args=(client_socket, addr)).start() | |
except KeyboardInterrupt: | |
self.running = False | |
break | |
def handle_client(self, client_socket, addr): | |
# Handle communication with a client | |
with self.lock: | |
# Store client connection and initialize latency and packet loss rate to None and 0, respectively | |
self.connections[addr] = client_socket | |
self.latencies[addr] = None | |
self.packet_loss_rates[addr] = 0.0 | |
print(f"New connection from {addr}") | |
previously_online = True # Track if the peer was previously online | |
while self.running: | |
try: | |
data = client_socket.recv(1024).decode() | |
if not data: | |
break | |
print(f"Received message from {addr}: {data}") | |
# Ping/Pong mechanism | |
if data.startswith("ping"): | |
parts = data.split() | |
if len(parts) == 5 and parts[3] == 'at': | |
peer_name, timestamp = parts[2], float(parts[4]) | |
pong_message = f"pong from {self.name} at {time.time()}" | |
client_socket.send(pong_message.encode()) | |
latency = time.time() - timestamp | |
with self.lock: | |
self.latencies[addr] = latency | |
self.latency_milliseconds_gauge.labels(peer=peer_name).set( | |
latency * 1000) # Convert to milliseconds | |
self.latency_microseconds_gauge.labels(peer=peer_name).set( | |
latency * 1000000) # Convert to microseconds | |
# Calculate packet loss rate | |
sent_pings = self.sent_pings.get(addr, 0) | |
received_pongs = self.received_pongs.get(addr, 0) | |
if sent_pings > 0: | |
packet_loss_rate = (sent_pings - received_pongs) / sent_pings * 100.0 | |
self.packet_loss_rates[addr] = packet_loss_rate | |
# Check if latency switches from non-zero to -1 | |
if previously_online and latency == -1: | |
self.peer_offline_counter.labels(peer=peer_name).inc() | |
previously_online = False | |
elif not previously_online and latency > 0: | |
previously_online = True | |
except Exception as e: | |
print(f"An error occurred while handling client {addr}: {e}") | |
break | |
# When client disconnects, set the latency metric to -1 | |
with self.lock: | |
if addr in self.connections: | |
del self.connections[addr] | |
if addr in self.latencies: | |
self.latency_milliseconds_gauge.labels(peer=peer_name).set(-1) # Set latency to -1 | |
self.latency_microseconds_gauge.labels(peer=peer_name).set(-1) # Set latency to -1 | |
del self.latencies[addr] | |
if previously_online: | |
self.peer_offline_counter.labels(peer=peer_name).inc() | |
print(f"Connection with {addr} closed") | |
try: | |
self.connections[addr].close() | |
except KeyError: | |
print(f"Error: Connection with {addr} already closed.") | |
def close_connection(self, addr): | |
# Close connection and clean up resources | |
with self.lock: | |
if addr in self.connections: | |
del self.connections[addr] | |
if addr in self.latencies: | |
del self.latencies[addr] | |
print(f"Connection with {addr} closed") | |
try: | |
self.connections[addr].close() | |
except KeyError: | |
print(f"Error: Connection with {addr} already closed.") | |
def handle_reconnect(self, addr): | |
# Attempt to reconnect to a peer | |
host, port = addr | |
retry_delay = 1 | |
while self.running: | |
try: | |
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
client_socket.connect((host, port)) | |
print(f"Reconnected to {addr}") | |
self.handle_client(client_socket, addr) | |
break | |
except ConnectionRefusedError: | |
print(f"Connection to {addr} refused. Peer may be unavailable. Retrying in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
retry_delay = min(retry_delay * 2, 30) | |
except Exception as e: | |
print(f"An error occurred while reconnecting to {addr}: {e}") | |
break | |
def connect_to_peer(self, peer_name, host, port): | |
# Connect to a peer | |
retry_delay = 1 | |
while self.running: | |
try: | |
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
client_socket.connect((host, port)) | |
print(f"Connected to {peer_name} at {host}:{port}") | |
self.handle_client(client_socket, (host, port)) | |
return | |
except ConnectionRefusedError: | |
print( | |
f"Connection to {peer_name} at {host}:{port} refused. Peer may be unavailable. Retrying in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
retry_delay = min(retry_delay * 2, 30) | |
except Exception as e: | |
print(f"An error occurred while connecting to {peer_name}: {e}") | |
break | |
def connect_to_peers(self): | |
# Connect to all peers | |
peers = os.getenv("PEERS", "").split(";") | |
threads = [] | |
for peer_info in peers: | |
peer_name, peer_address = peer_info.split("=") | |
if peer_name == self.name: | |
continue | |
host, port = peer_address.split(":") | |
port = int(port) | |
thread = threading.Thread(target=self.connect_to_peer, args=(peer_name, host, port)) | |
threads.append(thread) | |
thread.start() | |
# Wait for all threads to finish | |
for thread in threads: | |
thread.join() | |
def send_ping(self): | |
# Send ping messages to all connected peers | |
while self.running: | |
with self.lock: | |
for addr, client_socket in list(self.connections.items()): | |
try: | |
start_time = time.time() | |
client_socket.send(f"ping from {self.name} at {start_time}".encode()) | |
# Update sent pings count | |
self.sent_pings[addr] = self.sent_pings.get(addr, 0) + 1 | |
except Exception as e: | |
print(f"Error sending ping to {addr}: {e}") | |
time.sleep(5) | |
def get_peer_address(self, name): | |
# Get address of a peer by name | |
peers = os.getenv("PEERS", "").split(";") | |
for peer_info in peers: | |
peer_name, peer_address = peer_info.split("=") | |
if peer_name == name: | |
host, port = peer_address.split(":") | |
return host, int(port) | |
raise ValueError(f"No peer found with name {name}") | |
def get_latency_metrics(self): | |
# Generator function to yield latency metrics | |
with self.lock: | |
for addr, latency in self.latencies.items(): | |
yield addr, latency | |
def signal_handler(sig, frame): | |
# Signal handler to gracefully exit the program | |
global peer | |
print('Exiting...') | |
peer.running = False | |
peer.server_socket.close() | |
for addr in peer.connections.keys(): | |
peer.close_connection(addr) | |
sys.exit(0) | |
if __name__ == "__main__": | |
# Main entry point of the program | |
import argparse | |
parser = argparse.ArgumentParser(description='Latency Exporter') | |
parser.add_argument('name', type=str, help='Name of the current exporter instance') | |
args = parser.parse_args() | |
signal.signal(signal.SIGINT, signal_handler) | |
peer = Peer(args.name) | |
peer.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment