Created
November 14, 2016 14:48
-
-
Save FauxFaux/f26b68ef5e8203c77b89446fdc3aa77e to your computer and use it in GitHub Desktop.
checkrestart / psdel reimplementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# inspired by psdel, referenced from the checkrestart (debian-goodies) source code | |
import collections | |
import errno | |
import glob | |
import os | |
import re | |
import stat | |
import subprocess | |
import sys | |
from typing import Dict, List, Set, Iterator, Tuple | |
map_line = re.compile(r'^[\da-f]+-[\da-f]+ [r-][w-][x-][sp-] ' | |
r'[\da-f]+ [\da-f]{2}:[\da-f]{2} (\d+) *(.+)( \(deleted\))?\n$') | |
def main(): | |
file_user_pids = collections.defaultdict(set) # type: Dict[str, Set[int]] | |
for (pid, map_lines) in maps(): | |
for line in map_lines: | |
m = map_line.match(line) | |
if not m: | |
sys.stderr.write("parse error reading /proc/%s/maps: %s".format(pid, repr(line))) | |
break | |
inode = int(m.group(1)) | |
file = m.group(2) | |
if (False | |
# user files | |
or file.startswith('/home/') | |
# temp files | |
or file.startswith('/dev/shm/') | |
or file.startswith('/tmp/') | |
or file.startswith('/run/') | |
# weird kernel thingies | |
or file.startswith('/memfd') | |
or file.startswith('/SYSV') | |
or file.startswith('/drm') | |
or file.startswith('/dev/') | |
or file.startswith('/proc/') | |
or file.startswith('/[') | |
or file.startswith('/anon_hugepage') | |
): | |
continue | |
if inode == 0: | |
continue | |
if ' (deleted)' == file[-10:]: | |
file = file[0:-10] | |
# list file names whose inode numbers do not match their on-disk | |
# values; or files that do not exist at all | |
try: | |
if os.stat(file)[stat.ST_INO] != inode: | |
file_user_pids[file].add(pid) | |
except OSError as e: | |
if e.errno == errno.ENOENT: | |
file_user_pids[file].add(pid) | |
else: | |
sys.stderr.write("stat failed unexpectedly: {}\n".format(e)) | |
if not file_user_pids: | |
return | |
all_pids = set(pid for pids in file_user_pids.values() for pid in pids) | |
pid_unit = find_units_of(all_pids) | |
pid_path = find_paths_of(all_pids) | |
service_because = collections.defaultdict(set) # type: Dict[str, Set[str]] | |
paths_because = collections.defaultdict(set) # type: Dict[str, Set[str]] | |
unknown = set() # type: Set[int] | |
for (file, pids) in file_user_pids.items(): | |
for pid in pids: | |
if pid in pid_unit: | |
service_because[pid_unit[pid]].add(file) | |
else: | |
paths_because[pid_path[pid]].add(file) | |
for service in sorted(service_because.keys()): | |
print(service) | |
for file in sorted(service_because[service]): | |
print(' * ' + file) | |
print() | |
for path in sorted(paths_because.keys()): | |
print(path) | |
for file in sorted(paths_because[path]): | |
print(' * ' + file) | |
print() | |
def find_units_of(pids: Iterator[int]) -> Dict[int, str]: | |
ps_command = ['ps', '-eopid=,unit='] | |
ps_command.extend(str(pid) for pid in pids) | |
pid_unit = dict() # type: Dict[int, str] | |
ps_line = re.compile('^ *(\d+) (.*)') | |
for line in subprocess.check_output(ps_command).split(b'\n'): | |
m = ps_line.match(line.decode('utf-8')) | |
if not m: | |
continue | |
if '-' == m.group(2): | |
continue | |
pid_unit[int(m.group(1))] = m.group(2) | |
return pid_unit | |
def find_paths_of(all_pids: Iterator[int]) -> Dict[int, str]: | |
ret = dict() # type: Dict[int, str] | |
for pid in all_pids: | |
try: | |
ret[pid] = os.readlink("/proc/{}/exe".format(pid)) | |
except OSError as e: | |
sys.stderr.write("couldn't find path of {}: {}\n".format(pid, e)) | |
return ret | |
def maps() -> Iterator[Tuple[int, List[str]]]: | |
extract_pid = re.compile('/proc/(\d+)/maps') | |
for item in glob.glob('/proc/*/maps'): | |
m = extract_pid.match(item) | |
if not m: | |
continue | |
try: | |
with open(item) as f: | |
yield (int(m.group(1)), f.readlines()) | |
except IOError as e: | |
sys.stderr.write("{}: {}\n".format(item, e)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment