Created
February 13, 2025 15:28
-
-
Save shubham0204/2369b0b09d3e6b1cad1fe561587bb205 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import psutil | |
import platform | |
from prometheus_client import start_http_server, Gauge, Counter, Info | |
import logging | |
import sys | |
import argparse | |
from typing import Dict, Any | |
import socket | |
from datetime import datetime | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
stream=sys.stdout | |
) | |
logger = logging.getLogger(__name__) | |
class ProcessMetricsCollector: | |
"""Collector for process-specific metrics""" | |
def __init__(self, system_labels: Dict[str, str]): | |
self.sys_labels = system_labels | |
# Process resource metrics | |
self.process_cpu = Gauge('process_cpu_percent', | |
'Process CPU usage percentage', | |
['host', 'pid', 'name', 'username']) | |
self.process_memory = Gauge('process_memory_bytes', | |
'Process memory usage in bytes', | |
['host', 'pid', 'name', 'username', 'type']) | |
self.process_io = Counter('process_io_bytes_total', | |
'Process I/O operations in bytes', | |
['host', 'pid', 'name', 'username', 'operation']) | |
self.process_threads = Gauge('process_threads_total', | |
'Number of threads in the process', | |
['host', 'pid', 'name', 'username']) | |
self.process_fds = Gauge('process_file_descriptors_total', | |
'Number of file descriptors', | |
['host', 'pid', 'name', 'username']) | |
self.process_connections = Gauge('process_connections_total', | |
'Number of network connections', | |
['host', 'pid', 'name', 'username']) | |
# Process metadata | |
self.process_info = Info('process_metadata', | |
'Process metadata information', | |
['host', 'pid', 'name']) | |
def collect_process_metrics(self) -> None: | |
"""Collect metrics for all running processes""" | |
try: | |
# Clear gauge metrics (not counters) to handle terminated processes | |
self.process_cpu._metrics.clear() | |
self.process_memory._metrics.clear() | |
self.process_threads._metrics.clear() | |
self.process_fds._metrics.clear() | |
self.process_connections._metrics.clear() | |
for proc in psutil.process_iter(['pid', 'name', 'username']): | |
try: | |
# Get basic process info | |
pid = str(proc.info['pid']) | |
name = proc.info['name'] | |
username = proc.info['username'] | |
base_labels = { | |
'host': self.sys_labels['host'], | |
'pid': pid, | |
'name': name, | |
'username': username | |
} | |
# Collect process metadata | |
self.collect_process_metadata(proc, base_labels) | |
# CPU and memory metrics | |
with proc.oneshot(): # Efficiently get multiple info items | |
# CPU usage | |
cpu_percent = proc.cpu_percent() | |
self.process_cpu.labels(**base_labels).set(cpu_percent) | |
# Memory information | |
mem_info = proc.memory_info() | |
self.process_memory.labels(**base_labels, type='rss').set(mem_info.rss) | |
self.process_memory.labels(**base_labels, type='vms').set(mem_info.vms) | |
# Thread count | |
self.process_threads.labels(**base_labels).set(proc.num_threads()) | |
# File descriptors | |
try: | |
self.process_fds.labels(**base_labels).set(proc.num_fds()) | |
except (psutil.NoSuchProcess, psutil.AccessDenied): | |
pass | |
# Network connections | |
try: | |
self.process_connections.labels(**base_labels).set( | |
len(proc.connections()) | |
) | |
except (psutil.NoSuchProcess, psutil.AccessDenied): | |
pass | |
# I/O counters | |
try: | |
io_counters = proc.io_counters() | |
self.process_io.labels( | |
**base_labels, | |
operation='read' | |
).inc(io_counters.read_bytes) | |
self.process_io.labels( | |
**base_labels, | |
operation='write' | |
).inc(io_counters.write_bytes) | |
except (psutil.NoSuchProcess, psutil.AccessDenied): | |
pass | |
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess) as e: | |
logger.debug(f"Error collecting metrics for process {pid}: {str(e)}") | |
continue | |
except Exception as e: | |
logger.error(f"Error collecting process metrics: {str(e)}") | |
def collect_process_metadata(self, proc: psutil.Process, base_labels: Dict[str, str]) -> None: | |
"""Collect detailed metadata for a process""" | |
try: | |
with proc.oneshot(): | |
metadata = { | |
'create_time': datetime.fromtimestamp(proc.create_time()).isoformat(), | |
'status': proc.status(), | |
'nice': str(proc.nice()), | |
'cwd': proc.cwd(), | |
'cmdline': ' '.join(proc.cmdline() or []), | |
'num_ctx_switches_voluntary': str(proc.num_ctx_switches().voluntary), | |
'num_ctx_switches_involuntary': str(proc.num_ctx_switches().involuntary) | |
} | |
try: | |
parent = proc.parent() | |
if parent: | |
metadata['parent_pid'] = str(parent.pid) | |
metadata['parent_name'] = parent.name() | |
except (psutil.NoSuchProcess, psutil.AccessDenied): | |
pass | |
self.process_info.labels( | |
host=base_labels['host'], | |
pid=base_labels['pid'], | |
name=base_labels['name'] | |
).info(metadata) | |
except (psutil.NoSuchProcess, psutil.AccessDenied) as e: | |
logger.debug(f"Error collecting process metadata: {str(e)}") | |
class SystemMetricsCollector: | |
def __init__(self, update_interval: int = 15): | |
""" | |
Initialize metrics collectors | |
Args: | |
update_interval: How often to update metrics in seconds | |
""" | |
self.update_interval = update_interval | |
self.hostname = socket.gethostname() | |
# Get system information for labels | |
self.sys_labels = { | |
'host': self.hostname, | |
'os': platform.system(), | |
'os_version': platform.release(), | |
'architecture': platform.machine() | |
} | |
# CPU Metrics | |
self.cpu_usage = Gauge('system_cpu_usage_percent', | |
'CPU usage percentage', | |
['host', 'os', 'os_version', 'architecture', 'cpu']) | |
self.cpu_freq = Gauge('system_cpu_frequency_mhz', | |
'CPU frequency in MHz', | |
['host', 'os', 'os_version', 'architecture', 'cpu']) | |
# Memory Metrics | |
self.memory_usage = Gauge('system_memory_usage_bytes', | |
'Memory usage in bytes', | |
['host', 'os', 'os_version', 'architecture', 'type']) | |
# Disk Metrics | |
self.disk_usage = Gauge('system_disk_usage_bytes', | |
'Disk usage in bytes', | |
['host', 'os', 'os_version', 'architecture', 'device', 'mountpoint', 'type']) | |
self.disk_io_counters = Counter('system_disk_io_total', | |
'Disk I/O operations', | |
['host', 'os', 'os_version', 'architecture', 'device', 'operation']) | |
# Network Metrics | |
self.network_io = Counter('system_network_traffic_bytes_total', | |
'Network traffic in bytes', | |
['host', 'os', 'os_version', 'architecture', 'interface', 'direction']) | |
self.process_collector = ProcessMetricsCollector(self.sys_labels) | |
def collect_cpu_metrics(self) -> None: | |
"""Collect CPU-related metrics""" | |
try: | |
# Per-CPU usage | |
for i, percentage in enumerate(psutil.cpu_percent(percpu=True)): | |
labels = {**self.sys_labels, 'cpu': f'cpu{i}'} | |
self.cpu_usage.labels(**labels).set(percentage) | |
# CPU frequencies | |
cpu_freqs = psutil.cpu_freq(percpu=True) | |
if cpu_freqs: | |
for i, freq in enumerate(cpu_freqs): | |
labels = {**self.sys_labels, 'cpu': f'cpu{i}'} | |
self.cpu_freq.labels(**labels).set(freq.current) | |
except Exception as e: | |
logger.error(f"Error collecting CPU metrics: {e}") | |
def collect_memory_metrics(self) -> None: | |
"""Collect memory-related metrics""" | |
try: | |
mem = psutil.virtual_memory() | |
for type_name, value in [ | |
('total', mem.total), | |
('available', mem.available), | |
('used', mem.used) | |
]: | |
labels = {**self.sys_labels, 'type': type_name} | |
self.memory_usage.labels(**labels).set(value) | |
swap = psutil.swap_memory() | |
for type_name, value in [ | |
('swap_total', swap.total), | |
('swap_used', swap.used) | |
]: | |
labels = {**self.sys_labels, 'type': type_name} | |
self.memory_usage.labels(**labels).set(value) | |
except Exception as e: | |
logger.error(f"Error collecting memory metrics: {e}") | |
def collect_disk_metrics(self) -> None: | |
"""Collect disk-related metrics""" | |
try: | |
# Disk usage | |
for partition in psutil.disk_partitions(): | |
try: | |
usage = psutil.disk_usage(partition.mountpoint) | |
base_labels = { | |
**self.sys_labels, | |
'device': partition.device, | |
'mountpoint': partition.mountpoint | |
} | |
for type_name, value in [('total', usage.total), ('used', usage.used)]: | |
labels = {**base_labels, 'type': type_name} | |
self.disk_usage.labels(**labels).set(value) | |
except PermissionError: | |
continue | |
# Disk I/O | |
disk_io = psutil.disk_io_counters(perdisk=True) | |
for disk_name, counters in disk_io.items(): | |
base_labels = {**self.sys_labels, 'device': disk_name} | |
self.disk_io_counters.labels( | |
**base_labels, | |
operation='read' | |
).inc(counters.read_bytes) | |
self.disk_io_counters.labels( | |
**base_labels, | |
operation='write' | |
).inc(counters.write_bytes) | |
except Exception as e: | |
logger.error(f"Error collecting disk metrics: {e}") | |
def collect_network_metrics(self) -> None: | |
"""Collect network-related metrics""" | |
try: | |
net_io = psutil.net_io_counters(pernic=True) | |
for interface, counters in net_io.items(): | |
base_labels = {**self.sys_labels, 'interface': interface} | |
self.network_io.labels( | |
**base_labels, | |
direction='received' | |
).inc(counters.bytes_recv) | |
self.network_io.labels( | |
**base_labels, | |
direction='sent' | |
).inc(counters.bytes_sent) | |
except Exception as e: | |
logger.error(f"Error collecting network metrics: {e}") | |
def collect_metrics(self) -> None: | |
"""Collect all system metrics""" | |
self.collect_cpu_metrics() | |
self.collect_memory_metrics() | |
self.collect_disk_metrics() | |
self.collect_network_metrics() | |
self.process_collector.collect_process_metrics() | |
def run(self) -> None: | |
"""Main collection loop""" | |
logger.info(f"Starting metrics collection every {self.update_interval} seconds") | |
logger.info(f"System information: {self.sys_labels}") | |
while True: | |
try: | |
self.collect_metrics() | |
time.sleep(self.update_interval) | |
except KeyboardInterrupt: | |
logger.info("Stopping metrics collection") | |
break | |
except Exception as e: | |
logger.error(f"Error in main collection loop: {e}") | |
time.sleep(self.update_interval) | |
def parse_args(): | |
parser = argparse.ArgumentParser(description='System Metrics Exporter for Prometheus') | |
parser.add_argument('--host', default='0.0.0.0', | |
help='Address to bind the server to (default: 0.0.0.0)') | |
parser.add_argument('--port', type=int, default=8000, | |
help='Port to bind the server to (default: 8000)') | |
parser.add_argument('--interval', type=int, default=15, | |
help='Metrics collection interval in seconds (default: 15)') | |
return parser.parse_args() | |
def main(): | |
args = parse_args() | |
# Start Prometheus HTTP server | |
try: | |
start_http_server(args.port, args.host) | |
logger.info(f"Started Prometheus metrics server on {args.host}:{args.port}") | |
except Exception as e: | |
logger.error(f"Failed to start Prometheus server: {e}") | |
sys.exit(1) | |
# Start metrics collection | |
collector = SystemMetricsCollector(update_interval=args.interval) | |
collector.run() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment