Created
September 18, 2024 18:03
-
-
Save dannguyen/5579ce17fef20a9009418a75898bf13d to your computer and use it in GitHub Desktop.
A command-line python script that reads CSV files, samples their data, and prints the samples in transposed longform, i.e. one column per data row, one row per data attribute
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
skimschema.py | |
============== | |
Create an excel file of transposed data rows, for easy browsing of | |
a data file's contents (csvs only for now) | |
Longer description | |
================== | |
A command-line python script that reads CSV files, samples their data, | |
and prints the samples in transposed longform, i.e. one column per data row, | |
one row per data attribute. | |
The output is an Excel file, with multiple tabs if target input is a directory | |
of csvs. | |
Below examples assume you've copied skimschema to a system path, e.g. | |
chmod a+x skimschema.py | |
sudo cp skimschema.py /usr/local/bin/skimschema | |
Example usage: | |
skimschema mydata.csv # creates xskimschema-mydata.xlsx | |
skimschema mydata/ # creates xskimschema-mydatadir.xlsx, | |
# with a tab for each CSV in the directory | |
mydata.csv | skimschema # creates xskimschema-stdin.xlsx from stdin | |
skimschema mydata.csv -o myfile.xlsx # specify output filename | |
""" | |
import click | |
from io import StringIO | |
import pandas as pd | |
from pathlib import Path | |
import random | |
import sys | |
from openpyxl import Workbook | |
from openpyxl.worksheet.worksheet import Worksheet | |
from openpyxl.utils.dataframe import dataframe_to_rows | |
from openpyxl.styles import Alignment, Font | |
from openpyxl.styles import Color, PatternFill, Font, Border | |
from typing import List | |
BASE_FILENAME = "xskimschema" | |
HEADER_FILL = PatternFill(patternType="solid", fgColor=Color(rgb="00D1EAFF")) | |
MIN_CELL_WIDTH = 15 | |
MAX_CELL_WIDTH = 55 | |
SAMPLE_SIZE = 50 | |
def get_random_numbers(n, limit, seed=42): | |
random.seed(seed) | |
# Generate the list based on whether n is <= limit | |
dx = list(range(n)) if n <= limit else sorted(random.sample(range(n), limit)) | |
# Ensure the first and last 2 numbers are included | |
unique_dx = sorted(set(dx) | {0, 1, n - 2, n - 1}) | |
return unique_dx | |
def create_schema_book(input: List[Path] | StringIO, is_stdin: bool) -> Workbook: | |
""" | |
todo: should probably separate the schema creation from workbook building | |
""" | |
wb = Workbook() | |
wb.remove(wb.active) # Remove the default sheet | |
if is_stdin: | |
sheet_name = org_sheet_name = "stdin" | |
df = pd.read_csv(input) | |
sample_rows_for_df = { | |
f"{org_sheet_name}": df.columns, | |
"datatypes": [str(d) for d in df.dtypes.tolist()], | |
} | |
for ix in get_random_numbers(len(df), SAMPLE_SIZE): | |
sample_rows_for_df[f"Row {ix}"] = df.iloc[ix] | |
new_df = pd.DataFrame(sample_rows_for_df) | |
create_schema_sheet(wb, sheet_name, new_df) | |
return wb | |
for _i, inpath in enumerate(input): | |
sheet_name = inpath.stem | |
df = pd.read_csv(inpath) | |
org_sheet_name = sheet_name | |
if len(sheet_name) > 31: | |
sheet_name = f"{sheet_name[0:-4]}_{_i}" | |
sample_rows_for_df = { | |
f"{org_sheet_name}": df.columns, | |
"datatypes": [str(d) for d in df.dtypes.tolist()], | |
} | |
for ix in get_random_numbers(len(df), SAMPLE_SIZE): | |
sample_rows_for_df[f"Row {ix}"] = df.iloc[ix] | |
new_df = pd.DataFrame(sample_rows_for_df) | |
create_schema_sheet(wb, sheet_name, new_df) | |
return wb | |
def create_schema_sheet(wb: Workbook, sheet_name: str, df: pd.DataFrame) -> None: | |
ws = wb.create_sheet(title=sheet_name) | |
# Write the DataFrame to the sheet | |
for r_idx, row in enumerate(dataframe_to_rows(df, index=False, header=True), 1): | |
for c_idx, value in enumerate(row, 1): | |
cell = ws.cell(row=r_idx, column=c_idx, value=value) | |
cell.alignment = Alignment( | |
wrap_text=True, vertical="top", horizontal="left" | |
) | |
if r_idx == 1 or c_idx == 1: | |
cell.font = Font(bold=True, size="14") | |
cell.fill = HEADER_FILL | |
else: | |
cell.font = Font(size="12") | |
# Freeze the first column | |
ws.freeze_panes = ws["B2"] | |
# Auto-adjust column widths | |
for ix, col in enumerate(ws.columns): | |
if ix == 0: | |
continue | |
max_length = MIN_CELL_WIDTH | |
column = col[0].column_letter # Get the column name | |
for cell in col: | |
try: | |
if len(str(cell.value)) > max_length: | |
max_length = len(cell.value) | |
except: | |
pass | |
adjusted_width = max_length + 2 | |
adjusted_width = min(adjusted_width, MAX_CELL_WIDTH) | |
ws.column_dimensions[column].width = adjusted_width | |
@click.command() | |
@click.argument( | |
"input", | |
type=click.Path(allow_dash=True, file_okay=True, dir_okay=True), | |
required=False, | |
) | |
@click.option( | |
"-o", | |
"--output", | |
type=click.Path(writable=True), | |
required=False, | |
help="Path to write the output file (Excel). Default is 'schemaskim-[filename].xlsx (or just schemaskim.xlsx if input is multiple files)' ", | |
) | |
@click.pass_context | |
def cli(ctx, input, output): | |
""" | |
Create skimmable view of sampled data from CSVs | |
Example usage: | |
skimschema mydata.csv | |
mydata.csv | skimschema | |
# create multi-tab file from directory of CSVs | |
skimschema mydata/ | |
# specify output file | |
skimschema mydata.csv -o myfile.xlsx | |
""" | |
is_stdin = False | |
is_dir = False | |
# Check if no input is provided and stdin is not being piped | |
if input is None and sys.stdin.isatty(): | |
click.echo(ctx.get_help()) | |
ctx.exit() | |
if input is None: # Handle stdin | |
is_stdin = True | |
input_files = StringIO(sys.stdin.read()) | |
else: | |
inputpath = Path(input) | |
if inputpath.is_file(): # Single file | |
input_files = [inputpath] | |
elif inputpath.is_dir(): # Directory, collect *.csv files | |
is_dir = True | |
input_files = [Path(f) for f in inputpath.glob("*.csv")] | |
else: | |
click.echo("Invalid input. Please provide a valid file or directory.") | |
raise click.Abort() | |
if is_stdin: | |
click.echo("Collected data from stdin", err=True) | |
else: | |
click.echo(f"From {input}, collected {len(input_files)} files:", err=True) | |
for i in input_files: | |
click.echo(f" {i}", err=True) | |
wb = create_schema_book(input_files, is_stdin) | |
if output: | |
output_path = Path(output) | |
else: | |
if is_stdin is True: | |
ostem = f"{BASE_FILENAME}-stdin" | |
elif is_dir is True: | |
ostem = f"{BASE_FILENAME}-{inputpath.name}" | |
elif len(input_files) == 1: | |
# just a filename | |
ostem = f"{BASE_FILENAME}-{input_files[0].stem}" | |
else: | |
# unexpected | |
click.echo( | |
"Unexpected situation. Please provide a valid file or directory." | |
) | |
raise click.Abort() | |
output_path = Path(".").joinpath(f"{ostem}.xlsx") | |
click.echo(f"Writing to:\n{output_path}", err=True) | |
wb.save(output_path) | |
if __name__ == "__main__": | |
cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment