Last active
January 5, 2020 10:42
-
-
Save TylerTemp/670c7f6baf566e4745166bfd79062486 to your computer and use it in GitHub Desktop.
Watching android adb logcat change on a specific pacakge name
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Usage: | |
adblogcatwatcher <package_name> [options] [--filter-message-contains <string>]... | |
adblogcatwatcher pid <package_name> | |
adblogcatwatcher start <package_name> | |
Options: | |
--adb=<path> specific adb execuable path. [default: adb] | |
-c clean logcat before start | |
-r, --restart restart application | |
--filter-message-contains <string>, --fmc <string> | |
filter message that contains <string> | |
-w<mode>, --width=<mode> | |
max width for log trigger name, avaliable: number, `max`, `min`. [default: max] | |
--no-wrap disable wrap for message | |
--no-date | |
--no-time | |
--no-pid | |
--no-sub-pid | |
Dependences: | |
pip install docpie | |
[optional] pip install colorlog | |
""" | |
import subprocess | |
import threading | |
import logging | |
import os | |
import sys | |
import time | |
import shutil | |
import textwrap | |
LOGGER = logging.getLogger('adblogcatwatcher') | |
def ps_line_cutter(line): | |
return tuple(each.strip() for each in line.split(maxsplit=8)) | |
def get_pid(package_name, adb='adb'): | |
logger = logging.getLogger('adblogcatwatcher.get_pid') | |
p = subprocess.Popen([adb, 'shell', 'ps'], stdout=subprocess.PIPE) | |
try: | |
titles = ps_line_cutter(next(p.stdout).decode('utf-8')) | |
except StopIteration: | |
return None | |
# print(titles) | |
for line_b in p.stdout: | |
line = line_b.decode('utf-8') | |
line_info = dict(zip(titles, ps_line_cutter(line))) | |
# logger.debug('get process %s', line_info) | |
# print(line_info) | |
if 'NAME' not in line_info: | |
continue | |
if line_info['NAME'] == package_name: | |
pid = int(line_info['PID']) | |
# logger.info('%s pid = %d', package_name, pid) | |
return pid | |
else: | |
# logger.warning('%s pid not found', package_name) | |
return None | |
class AdbLogcatFilter(threading.Thread): | |
BASH_FORMAT_CONTROL = dict( | |
RS = "\033[0m" , # reset | |
HC = "\033[1m" , # hicolor | |
UL = "\033[4m" , # underline | |
INV = "\033[7m" , # inverse background and foreground | |
# BOLD = "\x1B[1;0;0m", # BOLD | |
FBLK = "\033[30m", # foreground black | |
FRED = "\033[31m", # foreground red | |
FGRN = "\033[32m", # foreground green | |
FYEL = "\033[33m", # foreground yellow | |
FBLE = "\033[34m", # foreground blue | |
FMAG = "\033[35m", # foreground magenta | |
FCYN = "\033[36m", # foreground cyan | |
FWHT = "\033[37m", # foreground white | |
BBLK = "\033[40m", # background black | |
BRED = "\033[41m", # background red | |
BGRN = "\033[42m", # background green | |
BYEL = "\033[43m", # background yellow | |
BBLE = "\033[44m", # background blue | |
BMAG = "\033[45m", # background magenta | |
BCYN = "\033[46m", # background cyan | |
BWHT = "\033[47m", # background white | |
) | |
def __init__(self, adb_executable, pid, width_mode='max', excludes=frozenset(), wrap_mode=None, filter_message_contains=frozenset(), *a, **k): | |
super(self.__class__, self).__init__(*a, **k) | |
# self.ADB = adb_executable | |
self.PID = pid | |
self.process = subprocess.Popen([adb_executable, 'logcat'], stdout=subprocess.PIPE) | |
# print(self) | |
self._stop_event = threading.Event() | |
self.WIDTH_MODE = width_mode | |
self.EXCLUDES = excludes | |
self.WRAP_MODE = wrap_mode | |
self.FILTER_MESSAGE_CONTAINS = filter_message_contains | |
def run(self): | |
LOGGER = logging.getLogger('adblogcatwatcher.AdbLogcatFilter.w_{}'.format(threading.get_ident())) | |
LOGGER.debug('checking pid %s', self.PID) | |
while self.process.poll() is not None: | |
if self.stopped(): | |
LOGGER.debug('exit') | |
return | |
LOGGER.debug('process not running, restart') | |
self.process = subprocess.Popen(self.process.args, stdout=subprocess.PIPE) | |
max_sender_length = 5 | |
break_group_count = 0 | |
EXCLUDES = self.EXCLUDES | |
FILTER_MESSAGE_CONTAINS = self.FILTER_MESSAGE_CONTAINS | |
for line_b in self.process.stdout: | |
try: | |
line = line_b.decode('utf-8') | |
except UnicodeDecodeError: | |
# print(repr(line_b.strip())[2:-1]) | |
# continue | |
line = repr(line_b.strip())[2:-1] | |
# print(line) | |
if line.startswith('----'): | |
continue | |
# pid = int(line.split()[2]) | |
splited_lines = line.split(maxsplit=5) | |
pid = int(splited_lines[2]) | |
if pid == self.PID: | |
# sys.stdout.write(line) | |
level = splited_lines[4] | |
message = splited_lines[-1] | |
message_sender, message_payload = [each.strip() for each in message.split(':', 1)] | |
if any(fragment in message_payload for fragment in FILTER_MESSAGE_CONTAINS): | |
# self.LOGGER.debug('skip message %s', message) | |
continue | |
# width_controller = '' | |
if self.WIDTH_MODE == 'max': | |
max_sender_length = max((len(message_sender), max_sender_length)) | |
width_controller = ':{}'.format(max_sender_length) | |
elif self.WIDTH_MODE == 'min': | |
width_controller = '' | |
else: | |
width_controller = ':%d' % (self.WIDTH_MODE,) | |
line_head_pretty_builder = [] | |
line_head_plain_builder = [] | |
builders = (line_head_pretty_builder, line_head_plain_builder) | |
for ex_check, ex_fragment in ( | |
('date', '{date}'), | |
('time', '{time}'), | |
('pid', '{pid:5}'), | |
('sub_pid', '{sub_pid:5}'), | |
): | |
if ex_check not in EXCLUDES: | |
for builder in builders: | |
builder.append(ex_fragment) | |
line_head_pretty_builder.append('{log_level}{RS}') | |
line_head_plain_builder.append('{log_level}') | |
# line_head_pretty_builder.append('{UL}{message_sender%s}{RS}' % (width_controller,)) | |
line_head_pretty_builder.append('\x1B[1;38;48m\x1B[4;38;48m{message_sender%s}\x1B[0m' % (width_controller,)) | |
line_head_plain_builder.append('{message_sender%s}' % (width_controller,)) | |
# line_head_pretty = '{log_level_control}{date} {time} {pid:5} {sub_pid:5} {log_level}{RS} {UL}{message_sender:%d}{RS}' % (max_sender_length,) | |
# line_head_plain = '{date} {time} {pid:5} {sub_pid:5} {log_level} {message_sender%s}' % (width_controller,) | |
line_head_pretty = '{log_level_control}' + ' '.join(line_head_pretty_builder) | |
line_head_plain = ' '.join(line_head_plain_builder) | |
line_info = { | |
'date': splited_lines[0], | |
'time': splited_lines[1], | |
'pid': splited_lines[2], | |
'sub_pid': splited_lines[3], | |
'log_level': splited_lines[4], | |
'message': splited_lines[5], | |
'message_sender': message_sender, | |
'message_payload': message_payload, | |
'log_level_control': { | |
'V': self.BASH_FORMAT_CONTROL['FWHT'], | |
'D': self.BASH_FORMAT_CONTROL['FGRN'], | |
'I': self.BASH_FORMAT_CONTROL['FBLE'], | |
'W': self.BASH_FORMAT_CONTROL['FMAG'], | |
'E': self.BASH_FORMAT_CONTROL['FRED'], | |
}.get(splited_lines[4], '') | |
} | |
line_info.update(self.BASH_FORMAT_CONTROL) | |
line_head = line_head_pretty.format(**line_info) | |
columns, _rows = shutil.get_terminal_size() | |
line_head_length = len(line_head_plain.format(**line_info)) | |
message_payload_rows = len(message_payload) + 1 | |
message_has_rows = columns - line_head_length | |
left_rows = columns - line_head_length - message_payload_rows | |
# LOGGER.debug('columns=%s, line_head=%s left=%s', columns, line_head_length, need_rows) | |
# sys.stdout.write(line_head) | |
break_group = '' if break_group_count % 2 else '{BGRN}' | |
cont_arrow = (u'%s{INV}↪{RS}' % (break_group,)).format(**self.BASH_FORMAT_CONTROL) | |
start_arrow = (u'%s{INV}↦{RS}' % (break_group,)).format(**self.BASH_FORMAT_CONTROL) | |
# LOGGER.debug('columns %s, line_head_length %s, message_has_rows') | |
if self.WRAP_MODE == 'no_wrap' or left_rows >= 0: # fit | |
# LOGGER.debug('direct print') | |
line_rest = ' {}\n'.format(message_payload) | |
elif message_has_rows < 10: # not enough, and very small, use another block | |
# LOGGER.debug('pump block') | |
break_group_count += 1 | |
init_indent = u'\n {INV}↦{RS}'.format(**self.BASH_FORMAT_CONTROL) | |
sub_indent = u' ' + cont_arrow | |
line_rest = '\n'.join(textwrap.wrap(message_payload, width=columns, initial_indent=init_indent, subsequent_indent=sub_indent)) + '\n' | |
else: # not enough but has big space left, use same block | |
# LOGGER.debug('cont block: columns=%s, line_head_length=%s', columns, line_head_length) | |
break_group_count += 1 | |
line_ready = '\n'.join(textwrap.wrap(' ' * line_head_length + message_payload, width=columns - 1, initial_indent='', subsequent_indent=u' ' * line_head_length + cont_arrow, drop_whitespace=False)).lstrip() + '\n' | |
if line_ready.startswith(cont_arrow): | |
line_ready = line_ready[len(cont_arrow):] | |
line_rest = start_arrow + line_ready | |
sys.stdout.write(line_head) | |
sys.stdout.write(line_rest) | |
sys.stdout.flush() | |
if self.stopped(): | |
LOGGER.debug('exit') | |
return | |
def stop(self): | |
LOGGER.debug('set to exit') | |
self._stop_event.set() | |
try: | |
self.process.kill() | |
except BaseException: | |
pass | |
def stopped(self): | |
return self._stop_event.is_set() | |
def __del__(self): | |
try: | |
self.process.kill() | |
except BaseException as e: | |
sys.stderr.write('{}\n'.format(e)) | |
if __name__ == '__main__': | |
import sys | |
import docpie | |
try: | |
import colorlog | |
except ImportError: | |
FORMATTER = logging.Formatter( | |
'[%(levelname)1.1s %(lineno)3d %(asctime)s %(funcName)s]' | |
' %(message)s' | |
) | |
else: | |
FORMATTER = colorlog.ColoredFormatter( | |
'%(log_color)s' | |
'[%(levelname)1.1s %(lineno)3d %(asctime)s %(funcName)s]' | |
'%(reset)s' | |
' %(message)s' | |
) | |
CLIARGS = docpie.docpie(__doc__) | |
HDLR = logging.StreamHandler(sys.stdout) | |
HDLR.setFormatter(FORMATTER) | |
LOGGER.addHandler(HDLR) | |
LOGGER.setLevel(logging.DEBUG) | |
if CLIARGS['pid']: | |
print(get_pid(CLIARGS['<package_name>'])) | |
sys.exit() | |
if CLIARGS['start']: | |
p = subprocess.Popen([CLIARGS['--adb'], 'shell', 'monkey', '-p', CLIARGS['<package_name>'], '-v', '500'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
pout, perr = p.communicate() | |
if p.returncode != 0: | |
msg = 'failed to start process: %s %s\n' % ( | |
pout.decode('utf-8'), | |
perr.decode('utf-8'), | |
) | |
sys.stderr.write(msg) | |
sys.exit(1) | |
else: | |
# LOGGER.info('process started') | |
print(get_pid(CLIARGS['<package_name>'])) | |
sys.exit() | |
sys.exit() | |
# pid = get_pid('com.pomelogames.TowerGame.fake') | |
pid = None | |
filter_thread = None | |
# f.start() | |
# stop if needed | |
if CLIARGS['--restart']: | |
current_pid = get_pid(CLIARGS['<package_name>']) | |
if current_pid is not None: | |
LOGGER.debug('killing process %s', current_pid) | |
p = subprocess.Popen([CLIARGS['--adb'], 'shell', 'su', '-c', 'kill {}'.format(current_pid)], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
pout, perr = p.communicate() | |
if p.returncode != 0: | |
LOGGER.error('failed to kill process: %s %s', | |
pout.decode('utf-8'), | |
perr.decode('utf-8'), | |
) | |
else: | |
LOGGER.info('process killed %s', pout.decode('utf-8')) | |
# clean log | |
if CLIARGS['-c']: | |
LOGGER.debug('clearing current logs') | |
p = subprocess.Popen([CLIARGS['--adb'], 'logcat', '-c'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
pout, perr = p.communicate() | |
if p.returncode != 0: | |
LOGGER.error('failed to clean adb log: %s %s', | |
pout.decode('utf-8'), | |
perr.decode('utf-8'), | |
) | |
else: | |
LOGGER.info('adb logcat cleaned %s', pout.decode('utf-8')) | |
if CLIARGS['--restart']: | |
# start | |
LOGGER.debug('start process %s', CLIARGS['<package_name>']) | |
p = subprocess.Popen([CLIARGS['--adb'], 'shell', 'monkey', '-p', CLIARGS['<package_name>'], '-v', '500'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
pout, perr = p.communicate() | |
if p.returncode != 0: | |
LOGGER.error('failed to start process: %s %s', | |
pout.decode('utf-8'), | |
perr.decode('utf-8'), | |
) | |
else: | |
# LOGGER.info('process started %s', pout.decode('utf-8')) | |
LOGGER.info('process started') | |
width_mode = CLIARGS['--width'] | |
if width_mode not in ('max', 'min'): | |
width_mode = int(width_mode) | |
excludes = frozenset( | |
each.replace('-', '_') for each in | |
filter( | |
lambda x: CLIARGS['--no-' + x], | |
('date', 'time', 'pid', 'sub-pid') | |
) | |
) | |
LOGGER.debug('exclude %s', excludes) | |
while True: | |
new_pid = get_pid(CLIARGS['<package_name>']) | |
if new_pid is None and filter_thread is not None: | |
filter_thread.stop() | |
filter_thread = None | |
if new_pid is not None and pid != new_pid: | |
pid = new_pid | |
LOGGER.info('restart thread') | |
if filter_thread is not None: | |
filter_thread.stop() | |
wrap_mode = 'no_wrap' if CLIARGS['--no-wrap'] else None | |
filter_thread = AdbLogcatFilter(pid=pid, adb_executable=CLIARGS['--adb'], width_mode=width_mode, wrap_mode=wrap_mode, excludes=excludes, filter_message_contains=set(CLIARGS['--fmc'])) | |
filter_thread.start() | |
else: | |
time.sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment