Created
November 8, 2019 13:59
-
-
Save marcsello/5d122d1eb442a0d028ac7310d0cd75fc to your computer and use it in GitHub Desktop.
My fucking data visualizer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env python3 | |
| # | |
| # I seriously had to write this, to have an easy to use data explorer... | |
| # It's not quick, or flexible, but it get's the job done, with minimal memory footprint. (achieved by a lot of disk access, sorry...) | |
| # | |
| # Usage: | |
| # 1) Install matplotlib and pythondialog (later may be provided by your distribution) | |
| # 2) run `python3 my_fucking_data_visualizer.py my_data.csv` where my_data.csv is a CSV file with the following properties: | |
| # - The first row is the column headers | |
| # - The dataset must be tagged by at least a single tag (otherwise you could just use Excel). | |
| # - Tags are represented by values in a tag column | |
| # - null values represented as empty cells | |
| # - Examples may be found here: https://github.com/cisco-ie/telemetry | |
| # 3) Follow on-screen instructions | |
| # | |
| # ... and yes, this is program is just a fancy filter | |
| # | |
| import locale | |
| from dialog import Dialog | |
| import csv | |
| import os | |
| import sys | |
| import matplotlib.pyplot | |
| INPUT_FILE = sys.argv[1] | |
| locale.setlocale(locale.LC_ALL, '') | |
| d = Dialog(dialog="dialog", autowidgetsize=True) | |
| d.set_background_title("My fucking data visualizer") | |
| progress = 0 | |
| progress_text = "" | |
| def start_progress(text): | |
| global progress | |
| global progress_text | |
| global d | |
| progress = 0 | |
| progress_text = text | |
| d.infobox("Reading dataset...\n\n" + progress_text, title="Please wait") | |
| def update_progress(): | |
| global progress | |
| progress += 1 | |
| if (progress % 1000) == 0: | |
| print("\r{} rows processed".format(progress), end="") | |
| sys.stdout.flush() | |
| # Read headers | |
| header = [] | |
| with open(INPUT_FILE, 'r') as f: | |
| reader = csv.reader(f) | |
| header = reader.__next__() | |
| header_example = {} | |
| # find examples | |
| start_progress("Finding examples for each column") | |
| with open(INPUT_FILE, 'r') as f: | |
| reader = csv.DictReader(f) | |
| for row in reader: | |
| update_progress() | |
| for col in header: | |
| if row[col]: | |
| if col not in header_example.keys(): | |
| header_example[col] = row[col] | |
| if len(header_example.keys()) == len(header): | |
| break | |
| code, tags = d.checklist("Select fields which are considered as tags", | |
| choices=[(col, header_example[col], False) for col in header], | |
| title="Select tags") | |
| if not tags: | |
| sys.exit(1) | |
| # Select filters | |
| while True: | |
| tagvalues = {} | |
| filtered_tags = {} | |
| for tag in tags: | |
| tagvalues[tag] = {} | |
| start_progress("Searching for tag values") | |
| with open(INPUT_FILE, 'r') as f: | |
| reader = csv.DictReader(f) | |
| for row in reader: | |
| update_progress() | |
| filter_ok = True | |
| for col, filtered_tag in filtered_tags.items(): | |
| if row[col] != filtered_tag: | |
| filter_ok = False | |
| if not filter_ok: | |
| continue | |
| if row[tag]: | |
| if row[tag] not in tagvalues[tag].keys(): | |
| tagvalues[tag][row[tag]] = 0 | |
| tagvalues[tag][row[tag]] += 1 | |
| if tagvalues[tag]: | |
| code, choice = d.menu("Select a filter for {}".format(tag), | |
| choices=[(tagval, "{} matches".format(count)) for tagval, count in tagvalues[tag].items()]) | |
| if not choice: | |
| sys.exit(1) | |
| filtered_tags[tag] = choice | |
| else: | |
| d.msgbox("No tag values found for {} when applying previously created filter. Skipping the filter for this tag...".format(tag)) | |
| # ready to visualize | |
| visualizable_cols = set() | |
| start_progress("Searching for visualizable columns") | |
| with open(INPUT_FILE, 'r') as f: | |
| reader = csv.DictReader(f) | |
| for row in reader: | |
| update_progress() | |
| filter_ok = True | |
| for col, filtered_tag in filtered_tags.items(): | |
| if row[col] != filtered_tag: | |
| filter_ok = False | |
| if not filter_ok: | |
| continue | |
| for col in header: | |
| if col not in visualizable_cols: | |
| if row[col]: | |
| try: | |
| float(row[col]) | |
| visualizable_cols.add(col) | |
| except: | |
| pass | |
| if not visualizable_cols: | |
| print("Nothing found with this combination") | |
| sys.exit(1) | |
| while True: # after closing the graph, return to column selecting | |
| code, cols_to_see = d.checklist("Now please select the columns you want to see", | |
| choices=[(col, "", False) for col in visualizable_cols]) | |
| if code == d.CANCEL: | |
| break # Returng to tag filter setup | |
| data = {} | |
| for col in cols_to_see: | |
| data[col] = [] | |
| start_progress("Visualizing") | |
| i = 0 | |
| with open(INPUT_FILE, 'r') as f: | |
| reader = csv.DictReader(f) | |
| reader.__next__() # skip header | |
| for row in reader: | |
| update_progress() | |
| filter_ok = True | |
| for col, filtered_tag in filtered_tags.items(): | |
| if row[col] != filtered_tag: | |
| filter_ok = False | |
| if not filter_ok: | |
| continue | |
| for col in cols_to_see: | |
| if row[col]: | |
| data[col].append(float(row[col])) | |
| fig, ax = matplotlib.pyplot.subplots(len(cols_to_see), 1) | |
| if len(cols_to_see) == 1: | |
| ax = (ax,) | |
| for i in range(len(cols_to_see)): | |
| ax[i].plot(data[cols_to_see[i]], label=cols_to_see[i]) | |
| ax[i].grid(b=True, which='major', axis='both') | |
| ax[i].legend() | |
| matplotlib.pyplot.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment