Skip to content

Instantly share code, notes, and snippets.

@se1983
Last active March 31, 2021 11:09
Show Gist options
  • Select an option

  • Save se1983/cd981f01f5fca37786aa76f153a2ff00 to your computer and use it in GitHub Desktop.

Select an option

Save se1983/cd981f01f5fca37786aa76f153a2ff00 to your computer and use it in GitHub Desktop.
using async python to capture the outputs of all processes of the current user
import asyncio
import logging
import os
from dataclasses import dataclass
from typing import Iterable
from aiofile import AIOFile
logging.basicConfig(level=logging.INFO)
@dataclass
class Process:
pid: str
proc_path: str
pid_path: str
fd_stdout_path: str
out_pos = 0
def alive(self):
return os.path.exists(self.pid_path)
async def open_file(filepath: str) -> list[str]:
try:
async with AIOFile(filepath, mode='r') as file_handler:
lines = await file_handler.read()
return lines.split('\n')
except OSError:
return []
async def stdout_logging(process: Process):
logging.info(f"reading output for {process}")
while process.alive():
lines_std = (await open_file(process.fd_stdout_path))[process.out_pos:]
process.out_pos += len(lines_std)
for line in lines_std:
process.out_pos += 1
logging.info(f"{process.proc_path}: {line}")
await asyncio.sleep(0.3)
logging.info(f"stop reading output for {process}")
async def create_watcher_tasks(queue: asyncio.Queue[Process]):
"""consumer"""
loop = asyncio.get_event_loop()
while process := await queue.get():
loop.create_task(stdout_logging(process))
def get_processes(known_processes: list[str]) -> Iterable[Process]:
for pid in os.listdir('/proc'):
if (
pid.isdigit()
and pid not in known_processes
and os.stat(pid_path := os.path.join('/proc', pid)).st_uid == os.getuid()
and os.path.exists(exec_path := os.path.join('/proc', pid, 'exe'))
):
yield Process(
pid=pid,
proc_path=os.readlink(exec_path),
pid_path=pid_path,
fd_stdout_path=os.path.join(pid_path, 'fd', '1'),
)
async def monitor_proc(queue: asyncio.Queue[Process]):
"""producer"""
known_processes: list[str] = []
while True:
for process in get_processes(known_processes):
known_processes.append(process.pid)
await queue.put(process)
await asyncio.sleep(1)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
queue: asyncio.Queue[Process] = asyncio.Queue()
loop.create_task(monitor_proc(queue))
loop.create_task(create_watcher_tasks(queue))
loop.run_forever()
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment