Last active
March 31, 2021 11:09
-
-
Save se1983/cd981f01f5fca37786aa76f153a2ff00 to your computer and use it in GitHub Desktop.
using async python to capture the outputs of all processes of the current user
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import logging | |
| import os | |
| from dataclasses import dataclass | |
| from typing import Iterable | |
| from aiofile import AIOFile | |
| logging.basicConfig(level=logging.INFO) | |
| @dataclass | |
| class Process: | |
| pid: str | |
| proc_path: str | |
| pid_path: str | |
| fd_stdout_path: str | |
| out_pos = 0 | |
| def alive(self): | |
| return os.path.exists(self.pid_path) | |
| async def open_file(filepath: str) -> list[str]: | |
| try: | |
| async with AIOFile(filepath, mode='r') as file_handler: | |
| lines = await file_handler.read() | |
| return lines.split('\n') | |
| except OSError: | |
| return [] | |
| async def stdout_logging(process: Process): | |
| logging.info(f"reading output for {process}") | |
| while process.alive(): | |
| lines_std = (await open_file(process.fd_stdout_path))[process.out_pos:] | |
| process.out_pos += len(lines_std) | |
| for line in lines_std: | |
| process.out_pos += 1 | |
| logging.info(f"{process.proc_path}: {line}") | |
| await asyncio.sleep(0.3) | |
| logging.info(f"stop reading output for {process}") | |
| async def create_watcher_tasks(queue: asyncio.Queue[Process]): | |
| """consumer""" | |
| loop = asyncio.get_event_loop() | |
| while process := await queue.get(): | |
| loop.create_task(stdout_logging(process)) | |
| def get_processes(known_processes: list[str]) -> Iterable[Process]: | |
| for pid in os.listdir('/proc'): | |
| if ( | |
| pid.isdigit() | |
| and pid not in known_processes | |
| and os.stat(pid_path := os.path.join('/proc', pid)).st_uid == os.getuid() | |
| and os.path.exists(exec_path := os.path.join('/proc', pid, 'exe')) | |
| ): | |
| yield Process( | |
| pid=pid, | |
| proc_path=os.readlink(exec_path), | |
| pid_path=pid_path, | |
| fd_stdout_path=os.path.join(pid_path, 'fd', '1'), | |
| ) | |
| async def monitor_proc(queue: asyncio.Queue[Process]): | |
| """producer""" | |
| known_processes: list[str] = [] | |
| while True: | |
| for process in get_processes(known_processes): | |
| known_processes.append(process.pid) | |
| await queue.put(process) | |
| await asyncio.sleep(1) | |
| if __name__ == '__main__': | |
| loop = asyncio.get_event_loop() | |
| queue: asyncio.Queue[Process] = asyncio.Queue() | |
| loop.create_task(monitor_proc(queue)) | |
| loop.create_task(create_watcher_tasks(queue)) | |
| loop.run_forever() | |
| loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment