Last active
August 9, 2024 13:12
-
-
Save lelandbatey/814172aa044c4f21754b66fae841054e to your computer and use it in GitHub Desktop.
columnize.py parses STDIN as column-based data, printing as nicely formatted columns
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# Copyright (c) 2022 Leland Batey. All rights reserved. | |
# | |
# This work is licensed under the terms of the MIT license. | |
# For a copy, see <https://opensource.org/licenses/MIT>. | |
""" | |
columnize.py reads column-oriented text data and prints that data as | |
nicely-padded columns to STDOUT. Input data *must* be line-oriented; data that | |
spans multiple lines will not be correctly understood and will not be correctly | |
displayed. | |
""" | |
from typing import Callable, Iterable, Any, List | |
from itertools import islice | |
import argparse | |
import doctest | |
import sys | |
def columnize_dict(rows, field_order=None): | |
"""Columnizes a list of dicts. Mostly useful as a reference.""" | |
if field_order is None: | |
field_order = sorted(rows[0].keys()) | |
rrows = [field_order] + [[str(row[fn]) for fn in field_order] for row in rows] | |
return columnize(rrows) | |
def columnize(rows: List[List[str]]) -> List[List[str]]: | |
"""Columnize turns a 2-d array of strings into a 2-d array of strings, but | |
with the output 2-d array of strings justified to be the same width. | |
>>> inputrows = [ | |
... ['a' , 'b' , 'c' ], | |
... ['111' , '222', '333'], | |
... ['fizzy', 'wow', 'hi' ], | |
... ] | |
>>> expecting = [ | |
... ['a ', 'b ', 'c '], | |
... ['111 ', '222', '333'], | |
... ['fizzy', 'wow', 'hi '], | |
... ] | |
>>> assert columnize(inputrows) == expecting | |
""" | |
rows = list(rows) | |
colwidths = [max([len(cell) for cell in column]) for column in zip(*rows)] | |
outrows = list() | |
for row in rows: | |
strcells = list() | |
for idx, cell in enumerate(row): | |
strcells.append(cell.ljust(colwidths[idx])) | |
outrows.append(strcells) | |
return outrows | |
def line_splitter(infile, delimiter: str) -> Iterable[List[str]]: | |
"""Reads lines from `infile` and generates a list of columns for each line | |
read from infile. `infile` must by a file-like object which can be iterated | |
to access its lines.""" | |
for line in infile: | |
yield line.rstrip("\r\n").split(delimiter) | |
# for m in modifiers: | |
# oline = [m(cell) for cell in oline] | |
# yield oline | |
def chunk_iter(iterable: Iterable[Any], chlen=1024) -> Iterable[List[Any]]: | |
"""Yields chunks of at most size `chlen` items from `iterable` until all | |
items of `iterable` have been yielded.""" | |
itr = iter(iterable) | |
chunk = list(islice(itr, chlen)) | |
while chunk: | |
yield chunk | |
chunk = list(islice(itr, chlen)) | |
def format_2d_list(l2d): | |
"""Pretty-prints a syntactically valid Python expression, which is a | |
nicely-formatted version of argument `l2d`. | |
>>> print(format_2d_list([['a', 'b', 'c'], ['111', '222', '333'], ['fizzy', 'wow', 'hi']])) | |
[ | |
['a' , 'b' , 'c' ], | |
['111' , '222', '333'], | |
['fizzy', 'wow', 'hi' ], | |
] | |
""" | |
reprl2d = list() | |
for orow in l2d: | |
nrow = list() | |
for ocell in orow: | |
ncell = repr(ocell) | |
nrow.append(ncell) | |
reprl2d.append(nrow) | |
columnized = columnize(reprl2d) | |
outf = "[\n" | |
for row in columnized: | |
outf += f"[{', '.join(row)}],\n" | |
outf += "]" | |
return outf | |
def chunk_column_pad(chunk: Iterable[List[str]], justify="left") -> Iterable[List[str]]: | |
"""chunk_column_pad inspects each row in a 2-d last of strings, ensuring | |
that each row has the same number of columns in it as every other row. | |
""" | |
chunk = list(chunk) | |
maxcols = max(len(row) for row in chunk) | |
for row in chunk: | |
diff = maxcols - len(row) | |
if justify == "left": | |
row.extend([""] * diff) | |
else: | |
raise NotImplementedError( | |
f"justify can only by 'left' but '{justify}' was provided" | |
) | |
yield row | |
def apply_row_modifiers( | |
chunk: Iterable[List[str]], | |
row_mods: List[Callable[[List[str]], List[str]]], | |
) -> Iterable[List[str]]: | |
for row in chunk: | |
nr = row.copy() | |
for mod in row_mods: | |
nr = mod(nr) | |
yield nr | |
def add_delim_row(delim: str) -> Callable[[List[str]], List[str]]: | |
def innerfunc(row: List[str]) -> List[str]: | |
if len(row) < 2: | |
return row | |
for i in range(0, len(row) - 1): | |
row[i] = row[i] + delim | |
return row | |
return innerfunc | |
def trim_cells(row: List[str]) -> List[str]: | |
for i in range(0, len(row)): | |
row[i] = row[i].strip() | |
return row | |
def main(): | |
parser = argparse.ArgumentParser( | |
description="Parse STDIN as column-based data, printing as nicely formatted columns" | |
) | |
parser.add_argument( | |
"--streaming-window-size", | |
"-w", | |
type=int, | |
default=1024, | |
help="The number of lines to read and format columns for. " | |
"If -1, all lines are read and formatted. Default is 1024", | |
) | |
parser.add_argument("--input-column-delimiter", "-d", default=",") | |
parser.add_argument( | |
"--preserve-delimiter", | |
"-p", | |
action="store_true", | |
default=False, | |
help="If provided, the delimiter will remain present in the output." | |
" Otherwise, the delimiter will be removed.", | |
) | |
parser.add_argument( | |
"--trim-cells", | |
"-t", | |
action="store_true", | |
default=False, | |
help="If provided, each cell will have their contents trimmed of" | |
" whitespace on the left and right side.", | |
) | |
args = parser.parse_args() | |
itr = line_splitter(sys.stdin, args.input_column_delimiter) | |
if args.streaming_window_size > 0: | |
itr = chunk_iter(itr, args.streaming_window_size) | |
else: | |
nl = list() | |
nl.append(list(itr)) | |
itr = nl | |
row_modifiers = list() | |
if args.preserve_delimiter: | |
row_modifiers.append(add_delim_row(args.input_column_delimiter)) | |
if args.trim_cells: | |
row_modifiers.append(trim_cells) | |
for chunk in itr: | |
chunk = apply_row_modifiers(chunk, row_modifiers) | |
chunk = chunk_column_pad(chunk) | |
fmtchunk = columnize(chunk) | |
for row in fmtchunk: | |
print(" ".join(row).strip()) | |
if __name__ == "__main__": | |
import doctest | |
doctest.testmod() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment