Skip to content

Instantly share code, notes, and snippets.

@w0rldart
Created February 27, 2025 05:07
Show Gist options
  • Save w0rldart/d77329e6f53d3b77596dce065911d6b2 to your computer and use it in GitHub Desktop.
Save w0rldart/d77329e6f53d3b77596dce065911d6b2 to your computer and use it in GitHub Desktop.
Python Script for Monitoring Running Processes and Docker Containers

Python Script for Monitoring Running Processes and Docker Containers

This Python script captures and logs information about running OS processes and Docker containers on a Unix-like system. It filters out specific unwanted processes (e.g., system tasks, internal commands) to provide a clear view of active user processes and containers.

Features

  • Process Monitoring: Uses the ps command to retrieve detailed information about running processes, including user, PID, CPU, and memory usage.
  • Docker Integration: Utilizes Docker CLI to fetch active container information in JSON format.
  • Exclusion List: Implements an exclusion mechanism to filter out system processes and commands based on a predefined list.
  • Output in JSON: Saves results in separate JSON files (running_processes.json and docker_containers.json) for easy consumption and analysis.
  • Error Logging: Captures errors during execution in a dedicated log file.

How to Use

  1. Clone or download the script from the Gist.
  2. Update the API endpoint in the script.
  3. Run the script using Python 3:
    python3 track_processes.py
import os
import json
import subprocess
import logging
from logging.handlers import RotatingFileHandler
import urllib.request
# Configure logging (optional for errors)
error_log_file = 'error.log'
error_handler = RotatingFileHandler(error_log_file, maxBytes=5*1024*1024, backupCount=5) # 5 MB per log file
logging.basicConfig(level=logging.ERROR, handlers=[error_handler])
logger = logging.getLogger()
def get_running_processes():
exclusions = [
'ps', 'top', 'init', 'systemd',
'kthreadd', 'rcu_sched', 'rcu_bh', 'migration', 'ksoftirqd', 'watchdog',
'cpuhp', 'netns', 'khungtaskd', 'oom_reaper', 'kswapd', 'fsnotify_mark',
'kworker', 'kdevtmpfs', 'kauditd', 'kblockd', 'khelper', 'khubd'
]
processes = {}
for line in os.popen('ps -eo user,pid,etime,%cpu,%mem,args').readlines()[1:]:
parts = line.split()
user = parts[0]
pid = parts[1]
etime = parts[2] # Elapsed time
cpu = parts[3] # CPU usage
mem = parts[4] # Memory usage
cmd = ' '.join(parts[5:]) # Command
# Check if any exclusion is in the command
if not any(exc in cmd for exc in exclusions):
processes[pid] = {
'user': user,
'pid': pid,
'elapsed_time': etime,
'cpu_usage': cpu,
'memory_usage': mem,
'command': cmd
}
# Save processes to a JSON file
with open('running_processes.json', 'w') as f:
json.dump(processes, f, indent=2)
return processes
def get_docker_containers():
containers = []
try:
docker_ps_output = subprocess.check_output(['docker', 'ps', '--format', '{{json .}}']).decode('utf-8')
for line in docker_ps_output.strip().split('\n'):
container_info = json.loads(line)
containers.append(container_info) # Append the full container info
except subprocess.CalledProcessError:
containers = [] # Docker not running or command failed
# Save containers to a JSON file
with open('docker_containers.json', 'w') as f:
json.dump(containers, f, indent=2)
return containers
def send_to_api(endpoint, payload):
try:
json_data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(endpoint, data=json_data, headers={'Content-Type': 'application/json'})
with urllib.request.urlopen(req) as response:
response_data = response.read()
logger.info("Payload sent successfully: %s", payload)
logger.info("Response: %s", response_data.decode('utf-8'))
except Exception as e:
logger.error("Error sending payload to API: %s", e)
def main():
api_endpoint = 'http://your.api.endpoint/submit' # Replace with your actual API endpoint
running_processes = get_running_processes()
docker_containers = get_docker_containers()
output = {
'running_processes': running_processes,
'docker_containers': docker_containers
}
print(json.dumps(output, indent=2))
# Send output to API
send_to_api(api_endpoint, output)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment