Last active
June 26, 2018 00:07
-
-
Save Cologler/3417beb18b5c89a68420aa8aaafec9de to your computer and use it in GitHub Desktop.
format output from pipe.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# | |
# Copyright (c) 2017~2999 - cologler <[email protected]> | |
# ---------- | |
# format output from pipe. | |
# ---------- | |
'''Usage: | |
python pr.py PATTERN | |
For example: | |
echo abc|python pr.py {line} => output: abc | |
echo abc|python pr.py {$len(line)} => output: 3 # use func | |
ping 10.0.0.1 -t|pr "[{time}] {line}" => print with time! | |
Available variables: | |
line # line text (raw input) | |
index # line index | |
datetime | |
date | |
time | |
year | |
month | |
day | |
hour | |
minute | |
second | |
Available funcs: | |
$len # for any kinds variables | |
$ms # for `datetime` and `time`, show milliseconds | |
$us # for `datetime` and `time`, show microseconds | |
$iso # for `datetime`, show `T` and `Z` | |
''' | |
import sys | |
import traceback | |
import datetime | |
from collections import Mapping | |
import re | |
# exprs | |
class IExpr: | |
def resolve_value(self, mapping: Mapping): | |
raise NotImplementedError | |
class VarGetterExpr(IExpr): | |
def __init__(self, key): | |
self._key = key | |
def resolve_value(self, mapping: Mapping): | |
cb = mapping.get_callback(self._key) | |
return cb() if cb else None | |
class MethodCallExpr(IExpr): | |
def __init__(self, method_name, obj_expr): | |
self._method_name = method_name | |
self._obj_expr = obj_expr | |
def resolve_value(self, mapping: Mapping): | |
obj = self._obj_expr.resolve_value(mapping) | |
if obj is not None: | |
if not isinstance(obj, Value): #ensure wraped | |
obj = Value(obj) | |
cb = getattr(obj, self._method_name, None) | |
return cb() if cb else None | |
def _key_clear(key): | |
'''remove invaild parts.''' | |
ret = key.strip() | |
if ret.startswith('(') and ret.endswith(')'): | |
ret = ret[1:-1] | |
return ret if len(key) == len(ret) else _key_clear(ret) # continue | |
RE_FUNC_CALL = re.compile('^\\$([a-z][a-z0-9]*)\\((.*)\\)$') | |
RE_VAR = re.compile('^([a-z]+)$') | |
def make_expr(key: str): | |
key = _key_clear(key) | |
if not key: | |
return None | |
func_call_match = RE_FUNC_CALL.match(key) | |
if func_call_match: | |
func_name, args_value = func_call_match.groups() | |
if func_name[0] == '_': | |
return None # cannot access private method | |
obj_expr = make_expr(args_value) | |
if not obj_expr: | |
return None # no arguments always be property | |
return MethodCallExpr(func_name, obj_expr) | |
var_match = RE_VAR.match(key) | |
if var_match: | |
return VarGetterExpr(var_match.groups()[0]) | |
# values | |
class Value: | |
'''wrap date to hide methods.''' | |
def __init__(self, value): | |
self._value = value | |
def __str__(self): | |
return str(self._value) | |
def __getattr__(self, attr: str): | |
if len(attr) > 1: | |
if attr[1:].isdigit(): | |
if attr[0] == 'l': | |
return lambda: str(self).ljust(int(attr[1:])) | |
if attr[0] == 'r': | |
return lambda: str(self).rjust(int(attr[1:])) | |
def len(self): | |
return len(str(self)) | |
# datetimes | |
def now(): | |
'''short for `datetime.datetime.now()`''' | |
return datetime.datetime.now() | |
class DateTime(Value): | |
def __init__(self, value: datetime.datetime, fmt=None): | |
super().__init__(value) | |
self._fmt = fmt or { | |
'sep': ' ', | |
'timespec': 'seconds' | |
} | |
def __str__(self): | |
return self._value.isoformat(**self._fmt) | |
def ms(self): | |
fmt = self._fmt.copy() | |
fmt['timespec'] = 'milliseconds' | |
return DateTime(self._value, fmt) | |
def us(self): | |
fmt = self._fmt.copy() | |
fmt['timespec'] = 'microseconds' | |
return DateTime(self._value, fmt) | |
def iso(self): | |
fmt = self._fmt.copy() | |
fmt['sep'] = 'T' | |
return DateTime(self._value, fmt) | |
class Time(Value): | |
def __init__(self, value: datetime.time, fmt=None): | |
super().__init__(value) | |
self._fmt = fmt or { | |
'timespec': 'seconds' | |
} | |
def __str__(self): | |
return self._value.isoformat(**self._fmt) | |
def ms(self): | |
fmt = self._fmt.copy() | |
fmt['timespec'] = 'milliseconds' | |
return Time(self._value, fmt) | |
def us(self): | |
fmt = self._fmt.copy() | |
fmt['timespec'] = 'microseconds' | |
return Time(self._value, fmt) | |
class ArgumentMapping(Mapping): | |
def __init__(self): | |
super().__init__() | |
self._expr_cache = {} | |
self._callbacks = { | |
'datetime': lambda: DateTime(now()), | |
'date': lambda: now().date(), | |
'time': lambda: Time(now().time()), | |
'year': lambda: now().date().year, | |
'month': lambda: now().date().month, | |
'day': lambda: now().date().day, | |
'hour': lambda: now().time().hour, | |
'minute': lambda: now().time().minute, | |
'second': lambda: now().time().second, | |
} | |
def __getitem__(self, key): | |
expr = self._expr_cache.get(key) | |
if not expr: | |
self._expr_cache[key] = expr = make_expr(key) or key | |
if expr == key: # build fail. | |
return key | |
value = expr.resolve_value(self) | |
return str(value) if value is not None else key | |
def __iter__(self): | |
return iter(self._callbacks) | |
def __len__(self): | |
return len(self._callbacks) | |
def set_value(self, key, value): | |
self._callbacks[key] = lambda: value | |
def get_callback(self, key): | |
return self._callbacks.get(key) | |
def main(argv=None): | |
if argv is None: | |
argv = sys.argv | |
try: | |
if sys.stdin.isatty(): | |
print('No input from pipe') | |
print(__doc__) | |
exit(1) | |
if len(argv) != 2: | |
print('Require argument PATTERN') | |
print(__doc__) | |
exit(2) | |
pattern = argv[1] | |
am = ArgumentMapping() | |
try: | |
for index, line in enumerate(sys.stdin): | |
line = line.rstrip() | |
am.set_value('index', index) | |
am.set_value('line', line) | |
am.set_value('$len(line)', len(line)) | |
print(pattern.format_map(am)) | |
except KeyboardInterrupt: | |
pass | |
except Exception: # pylint: disable=W0703 | |
traceback.print_exc() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment