Created
August 26, 2015 11:15
-
-
Save Jc2k/dfef7b4bd168aa5b342c to your computer and use it in GitHub Desktop.
A django orm-like PS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://raw.githubusercontent.com/pixelb/ps_mem/master/ps_mem.py | |
# https://people.gnome.org/~federico/misc/memstats.py | |
# http://brightbox.com/blog/2012/11/28/measuring-shared-ram-usage/ | |
import itertools | |
import os | |
import operator | |
SUPPORTED_SMAPS_BUCKETS = [ | |
'AnonHugePages', 'Anonymous', 'KernelPageSize', 'Locked', | |
'MMUPageSize', 'Private_Clean', 'Private_Dirty', 'Pss', | |
'Referenced', 'Rss', 'Shared_Clean', 'Shared_Dirty', | |
'Size', 'Swap' | |
] | |
# year, month, day, "i" variants, range | |
OPERATORS = { | |
'gt': operator.gt, | |
'lte': operator.lt, | |
'gte': operator.ge, | |
'le': operator.le, | |
'contains': operator.contains, | |
'in': lambda a, b: operator.contains(b, a), | |
'exact': operator.eq, | |
'startswith': lambda a, b: a.startswith(b), | |
'endswith': lambda a, b: a.endswith(b), | |
'isnull': lambda a, b: b and a != None or a == None, | |
} | |
def human(num, power="Ki"): | |
powers = ["Ki", "Mi", "Gi", "Ti"] | |
while num >= 1000: | |
num /= 1024.0 | |
power = powers[powers.index(power)+1] | |
return "%.1f %s" % (num, power) | |
class Process(object): | |
def __init__(self, pid): | |
self.pid = str(pid) | |
self._shared_dirty = None | |
self._private_dirty = None | |
self._status = {} | |
def _get(self, file): | |
with open(os.path.join("/proc", self.pid, file)) as fp: | |
return fp.read() | |
def _process_status(self): | |
self._status = {} | |
for line in self._get("status").splitlines(): | |
key = line.split(":")[0] | |
value = line.split(":")[1].strip() | |
self._status[key] = value | |
def _process_smaps(self): | |
if self._shared_dirty or self._private_dirty: | |
return | |
smaps = {} | |
for line in self._get("smaps").splitlines(): | |
bucket = line.split(":")[0] | |
if bucket not in SUPPORTED_SMAPS_BUCKETS: | |
continue | |
val = line.split(":")[1].split()[0].strip() | |
try: | |
val = int(val) | |
except ValueError: | |
continue | |
smaps.setdefault(bucket, 0) | |
smaps[bucket] += val | |
# FIXME: Make nicer python - expose all smaps data too | |
self._private_dirty = smaps.get('Private_Dirty', 0) | |
self._shared_dirty = smaps.get('Shared_Dirty', 0) | |
@property | |
def cmdline(self): | |
return self._get("cmdline").strip().replace("\x00", " ") | |
@property | |
def shared_dirty(self): | |
self._process_smaps() | |
return self._shared_dirty | |
@property | |
def private_dirty(self): | |
self._process_smaps() | |
return self._private_dirty | |
@property | |
def name(self): | |
self._process_status() | |
return self._status['Name'] | |
@property | |
def cwd(self): | |
return os.readlink(os.path.join("/proc", self.pid, "cwd")) | |
def __repr__(self): | |
return "Process(pid={}, cmdline='{}'".format(self.pid, self.cmdline[:20]) | |
class ProcessQuery(object): | |
def __init__(self): | |
self.filters = [] | |
def filter(self, **kwargs): | |
p = self._clone() | |
def pred(item): | |
for k, v in kwargs.items(): | |
if "__" in k: | |
key, predicate = k.split("__", 1) | |
else: | |
key = k | |
predicate = "eq" | |
if not OPERATORS[predicate](getattr(item, key), v): | |
return False | |
else: | |
return True | |
p.filters.append(pred) | |
return p | |
def _clone(self): | |
p = self.__class__() | |
p.filters.extend(self.filters) | |
return p | |
def _all(self): | |
for p in os.listdir("/proc"): | |
if not p.isdigit(): | |
continue | |
yield Process(p) | |
def _get_upstream(self): | |
iterator = self._all() | |
for f in self.filters: | |
iterator = itertools.ifilter(f, iterator) | |
return iterator | |
#def __len__(self): | |
# return len(list(self)) | |
def __iter__(self): | |
for p in self._get_upstream(): | |
# FIXME: Apply filters here | |
yield p | |
def __repr__(self): | |
return repr(list(self)) | |
if __name__ == "__main__": | |
if not os.path.exists(os.path.join("/proc", str(os.getpid()), "smaps")): | |
print "'smaps' data is not available on this system" | |
sys.exit(1) | |
p = Process(os.getpid()) | |
p._process_status() | |
print human(p.shared_dirty) | |
print human(p.private_dirty) | |
print p.cmdline |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment