Created
February 15, 2019 07:04
-
-
Save jdahm/fb17efb9d3593de73a17ae4340514e19 to your computer and use it in GitHub Desktop.
Process search
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# psearch | |
# | |
# Script used to search for and send signals to processes | |
# | |
# AUTHORS | |
# - Johann Dahm <[email protected]> (20140209) | |
# | |
# LICENSE LGPL-3.0 | |
import sys | |
import argparse | |
import subprocess as sp | |
import datetime | |
import re | |
import os, signal | |
def _entry_dict(l, field): | |
def _parse_etime(tstr): | |
"""Parse the elapsed time string and return in seconds""" | |
if '-' in tstr: | |
# has a day field | |
m = re.match(r"(?P<days>\d+)[-](?P<hrs>\d+)[:](?P<min>\d+)[:](?P<sec>\d+)", tstr) | |
if m is None: raise ValueError("Parse error in string: {0}".format(tstr)) | |
days, hours, minutes, seconds = (int(x) for x in (m.group("days"), m.group("hrs"), m.group("min"), m.group("sec"))) | |
t = datetime.timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds) | |
elif len(tstr.split(":")) == 3: | |
# HH:MM:SS | |
m = re.match(r"(?P<hrs>\d+)[:](?P<min>\d+)[:](?P<sec>\d+)", tstr) | |
if m is None: raise ValueError("Parse error in string: {0}".format(tstr)) | |
hours, minutes, seconds = (int(x) for x in (m.group("hrs"), m.group("min"), m.group("sec"))) | |
t = datetime.timedelta(hours=hours, minutes=minutes, seconds=seconds) | |
elif len(tstr.split(":")) == 2: | |
# MM:SS | |
m = re.match(r"(?P<min>\d+)[:](?P<sec>\d+)", tstr) | |
if m is None: raise ValueError("Parse error in string: {0}".format(tstr)) | |
minutes, seconds = (int(x) for x in (m.group("min"), m.group("sec"))) | |
t = datetime.timedelta(minutes=minutes, seconds=seconds) | |
else: | |
raise ValueError("Parse error in string: {0}".format(tstr)) | |
return t.total_seconds() | |
entry = filter(None, l.strip().split(' ')) | |
d = {} | |
if 'etime' in field: | |
d['etime'] = _parse_etime(entry[field.index('etime')]) | |
if 'pid' in field: | |
d['pid'] = int(entry[field.index('pid')]) | |
if 'args' in field: | |
d['args'] = ' '.join(entry[field.index('args'):]) | |
return d | |
def _time_from_etime(dt): | |
time = datetime.datetime.now() - datetime.timedelta(seconds=dt) | |
return time.strftime("%H:%M:%S") | |
class process_list: | |
def __init__(self, all=False, field=['etime', 'pid']): | |
"""Creates a process list, calls 'ps'""" | |
self.field = field | |
opts = '-ao' | |
if args.all: opts = '-Ao' | |
# Get output of ps as a list of strings | |
ps_output = sp.check_output(['ps', opts, ','.join(self.field)]).split('\n')[1:-1] | |
# Convert to list of dicts | |
self._ps = [_entry_dict(l, field) for l in ps_output] | |
def __iter__(self): | |
for p in self._ps: yield p | |
def sort(self, field='etime', reverse=False): | |
"""Sorts the processes by a given field""" | |
self._ps.sort(key=lambda k: k[field], reverse=reverse) | |
def filter(self, pattern, negate=False): | |
def make_matches(pattern, negate=False): | |
def matches(d): | |
m = any([re.search(pattern, str(item)) is not None for item in d.items()]) | |
if negate: m = not m | |
return m | |
return matches | |
matches = make_matches(pattern, negate=negate) | |
self._ps = filter(matches, self._ps) | |
def truncate(self, n, negate=False): | |
if negate: | |
self._ps = self._ps[n:] | |
else: | |
self._ps = self._ps[:n] | |
def display(self, delim='|', p=True, max_size=30): | |
maxsize = {f : 0 for f in self.field} | |
for p in self: | |
for f in self.field: | |
maxsize[f] = max(len(str(p[f])), maxsize[f]) | |
for f in self.field: | |
maxsize[f] = min(max_size, maxsize[f]) | |
sl = [] | |
for p in self: | |
e = [] | |
for f in self.field: | |
if f == 'etime': rep = _time_from_etime(p[f]) | |
else: rep = p[f] | |
e.append("{0:{1}}".format(rep, maxsize[f])) | |
sl.append(delim.join(e)) | |
s = '\n'.join(sl) | |
if p: print s | |
return s | |
def __str__(self): | |
return self.display(p=False) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='Search for and send signals to running processes') | |
parser.add_argument('pattern', type=str, help='Regular expression search') | |
parser.add_argument('--nselect', metavar='n', type=int, default=0, help='Number of matches to select. If this flag is not passed, all processes matching pattern are chosen, and the negate flag has no effect.') | |
parser.add_argument('--negate', action='store_true', help='Negates the match ordering') | |
parser.add_argument('--newest', action='store_true', help='Sorts matches by most recently started') | |
parser.add_argument('--all', '-a', action='store_true', help='Search all running processes, not just processes attached to parent') | |
parser.add_argument('--delimiter', '-d', nargs='?', const=' | ', default=' | ') | |
parser.add_argument('--signal', '-s', type=int, default=0, help='Send signal to matched processes') | |
parser.add_argument('--hup', action='store_true', help='Send SIGHUP signal to matched processes') | |
parser.add_argument('--kill', '-k', action='store_true', help='Send SIGKILL signal to matched processes') | |
parser.add_argument('--verbose', '-v', action='count', help='Verbose output') | |
args = parser.parse_args() | |
sig = None | |
if args.signal > 0: sig = args.signal | |
if args.hup: sig = signal.SIGHUP | |
if args.kill: sig = signal.SIGKILL | |
# TODO Could pass fields as option | |
ps = process_list(all=args.all, field=['etime', 'pid', 'args']) | |
# Filter | |
ps.filter(args.pattern) | |
ps.filter('psrch', negate=True) | |
# Sort | |
ps.sort(field='etime', reverse=(not args.newest)) | |
# Truncate | |
if args.nselect > 0: ps.truncate(args.nselect) | |
# List processes | |
ps.display(delim=args.delimiter) | |
# Send signal to processes | |
if sig is not None: | |
for p in ps: | |
if args.verbose > 0: | |
print "Sending signal {0} to process '{1}'".format(sig, process_name(t)) | |
os.kill(p['pid'], sig) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment