Last active
September 27, 2019 23:07
-
-
Save haginara/59e92a6bf6b45547013ecfd754763cc6 to your computer and use it in GitHub Desktop.
zee-cut script with python3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import sys | |
import csv | |
import json | |
import logging | |
import pprint | |
import time | |
from datetime import datetime | |
from argparse import ArgumentParser | |
logging.basicConfig(level=logging.INFO, format="%(message)s") | |
logger = logging.getLogger(__name__) | |
parser = ArgumentParser() | |
parser.add_argument( | |
"-c", | |
"--print_header", | |
action="store_true", | |
help="Include the first format header block in the output.", | |
) | |
parser.add_argument("-C") | |
parser.add_argument( | |
"-d", | |
"--time_column", | |
action="append", | |
default=[], | |
help="Convert time values into human-readable format.", | |
) | |
parser.add_argument("-D") | |
parser.add_argument( | |
"-f", "--fields", action="store_true", help="print field header line" | |
) | |
parser.add_argument( | |
"-F", | |
"--sep", | |
type=str, | |
help="<ofs> Sets a different output field separator character.", | |
) | |
parser.add_argument("-a", "--all", action="store_true", help="print all fields") | |
parser.add_argument( | |
"-n", "--no", action="append", default=[], help="print *execpt* these fields" | |
) | |
parser.add_argument("-u") | |
parser.add_argument("-U") | |
parser.add_argument("filename") | |
options, args = parser.parse_known_args(sys.argv[1:]) | |
def convert_sep(sep_str): | |
return chr(int(sep_str.replace("\\x", ""))) | |
def convert_datetime(datetime_str, timeformat="%Y-%m-%d %H:%M:%S"): | |
dt = datetime.fromtimestamp(float(datetime_str)) | |
return dt.strftime(timeformat) | |
def get_meta(filename) -> dict: | |
meta = dict() | |
# keys variable is orderd. | |
keys = [ | |
"separator", | |
"set_separator", | |
"empty_field", | |
"unset_field", | |
"path", | |
"open", | |
"fields", | |
"types", | |
] | |
with open(filename, "r") as f: | |
for i in range(len(keys)): | |
line = f.readline() | |
value = line.replace(f"#{keys[i]}", "").strip() | |
if value.startswith("\\x"): | |
value = convert_sep(value) | |
meta[keys[i]] = value | |
meta["fields"] = meta["fields"].split(meta["separator"]) | |
meta["types"] = meta["types"].split(meta["separator"]) | |
return meta | |
def find_timecol(line: str, lp): | |
pass | |
def follow_zeeklog( | |
filename, | |
meta, | |
fields=False, | |
sep=None, | |
all_column=False, | |
columns=[], | |
excludes=[], | |
time_columns=[], | |
): | |
print_sep = sep if sep else meta["separator"] | |
columns = time_columns + columns | |
if len(columns) == 0 or all_column is True: | |
columns = meta["fields"] | |
# Exclude columns | |
try: | |
[columns.remove(exclude) for exclude in excludes] | |
except ValueError as e: | |
logger.debug(f"{e}") | |
logger.debug(f"Columns: {columns}, time_columns: {time_columns}") | |
if fields: | |
print(f"{print_sep}".join(meta["fields"])) | |
with open(filename, "r") as f: | |
for line in f.readlines(): | |
line = line.strip() | |
if line.startswith("#"): | |
continue | |
row = dict(zip(meta["fields"], line.split(meta["separator"]))) | |
for tf in time_columns: | |
row[tf] = convert_datetime(row.get(tf)) | |
logger.debug(pprint.pformat(row, indent=2)) | |
print(f"{print_sep}".join([row[column] for column in columns])) | |
def main(): | |
logger.debug(f"Load: {options.filename}") | |
meta = get_meta(options.filename) | |
if options.print_header: | |
print(meta) | |
follow_zeeklog( | |
options.filename, | |
meta, | |
fields=options.fields, | |
sep=options.sep, | |
all_column=options.all, | |
excludes=options.no, | |
time_columns=options.time_column, | |
) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment