Skip to content

Instantly share code, notes, and snippets.

@shinkou
Last active April 18, 2025 22:44
Show Gist options
  • Save shinkou/d1ab50c8fcbcb9ed7aff2d40de0f243a to your computer and use it in GitHub Desktop.
Save shinkou/d1ab50c8fcbcb9ed7aff2d40de0f243a to your computer and use it in GitHub Desktop.
CSV Data Aggregator
#!/usr/bin/env python3
# -*- vim: set fileencoding=utf-8 ff=unix noet sw=4 ts=4 tw=0: -*-
import argparse, csv, re, sys
from functools import cmp_to_key
from itertools import groupby
from statistics import mean, median, mode, stdev
AGG = [
# int
{'avg': mean, 'count': len, 'max': max, 'mean': mean, 'median': median, 'min': min, 'mode': mode, 'stddev': stdev,
'sum': sum},
# float
{'avg': mean, 'count': len, 'max': max, 'mean': mean, 'median': median, 'min': min, 'mode': mode, 'stddev': stdev,
'sum': sum},
# str
{'count': len}
]
DATACAST = [int, float, str]
RETYPE = [
# int
re.compile('^(?:[1-9][0-9]*|0)$'),
# float
re.compile('^(?:[1-9][0-9]*|0)(\\.[0-9]+)?$'),
]
def main():
args = getargs()
if args.groupby is None:
args.groupby = []
colops = [e for l in args.colops for e in l]
rs = process(read_csv(args.csv), colops, args.groupby, args.sortby)
if args.output_csv:
write_csv(sys.stdout, rs)
else:
show_tbl(rs, len(args.groupby))
def getargs():
parser = argparse.ArgumentParser(description='CSV Aggregator')
parser.add_argument(
'--groupby',
type=lambda s: s.split(','),
help='Column(s) for grouping in comma-separated form. i.e. "col1,col2,...,colN"'
)
parser.add_argument(
'--sortby',
type=lambda s: s.split(','),
help='Column(s) for sorting in comma-separated form. i.e. "col1,col2,...,colN"'
)
parser.add_argument('--output-csv', action='store_true', help='output results as CSV')
parser.add_argument('csv', help='filepath of the source CSV')
parser.add_argument(
'colops',
nargs='+',
type=get_colops,
help='Column-operations in the form of "col:op1:op2:...:opN"'
)
return parser.parse_args()
def get_colops(s):
l = s.split(':')
return [(l[0], l[i]) for i in range(1, len(l))] if len(l) > 1 else [(l[0], "count")]
def read_csv(fpath: str) -> list[dict]:
with open(fpath, 'r') as f:
reader = csv.DictReader(f)
return list(reader)
def write_csv(f, rs):
writer = csv.writer(f)
writer.writerows(rs)
def show_tbl(rs, gblen):
maxwidth = [0 for r in rs[0]]
num_of_cols = len(maxwidth)
for r in rs:
for i in range(0, num_of_cols):
l = len(str(r[i]))
if l > maxwidth[i]:
maxwidth[i] = l
# 1st row is the header, left-align all columns
print('+' + '+'.join(list(map(lambda i: i * '-', maxwidth))) + '+')
cols = list(map(str, rs[0]))
for i in range(0, num_of_cols):
cols[i] = cols[i].ljust(maxwidth[i])
print('|' + '|'.join(cols) + '|')
# from 2nd row on, left-align groupby columns and right align the rest
print('+' + '+'.join(list(map(lambda i: i * '-', maxwidth))) + '+')
for r in rs[1:]:
cols = list(map(str, r))
for i in range(0, num_of_cols):
if i < gblen:
cols[i] = cols[i].ljust(maxwidth[i])
else:
cols[i] = cols[i].rjust(maxwidth[i])
print('|' + '|'.join(cols) + '|')
print('+' + '+'.join(list(map(lambda i: i * '-', maxwidth))) + '+')
def process(rs, colops, groupbys, sortbys):
header = groupbys + [f"{op}({col})" for (col, op) in colops]
agg = get_agg(rs, colops, groupbys)
if sortbys:
def cmp_rows(a, b):
for c in sortbys:
i = header.index(c)
if c in groupbys:
if a[i] < b[i]:
return -1
elif a[i] > b[i]:
return 1
else:
if a[i] > b[i]:
return -1
elif a[i] < b[i]:
return 1
return 0
agg = sorted(agg, key=cmp_to_key(cmp_rows))
return [header] + agg
def get_agg(rs, colops, grpbys):
dt = {col: getcoltype(col, rs) for (col, op) in colops}
keyfunc = lambda r: tuple([r[c] for c in grpbys])
grps = [(k, list(v)) for k, v in groupby(sorted(rs, key=keyfunc), keyfunc)]
return list(map(
lambda t: list(t[0]) + [
AGG[dt[col]][op](list(map(lambda l: DATACAST[dt[col]](l[col]), t[1])))
for (col, op) in colops
],
grps
))
def getcoltype(col, rs):
for i in range(0, len(RETYPE)):
if all(map(lambda r: RETYPE[i].match(r[col]) is not None, rs)):
return i
else:
return 2
if '__main__' == __name__:
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment