Skip to content

Instantly share code, notes, and snippets.

@marcsello
Created November 8, 2019 13:59
Show Gist options
  • Save marcsello/5d122d1eb442a0d028ac7310d0cd75fc to your computer and use it in GitHub Desktop.
Save marcsello/5d122d1eb442a0d028ac7310d0cd75fc to your computer and use it in GitHub Desktop.
My fucking data visualizer
#! /usr/bin/env python3
#
# I seriously had to write this, to have an easy to use data explorer...
# It's not quick, or flexible, but it get's the job done, with minimal memory footprint. (achieved by a lot of disk access, sorry...)
#
# Usage:
# 1) Install matplotlib and pythondialog (later may be provided by your distribution)
# 2) run `python3 my_fucking_data_visualizer.py my_data.csv` where my_data.csv is a CSV file with the following properties:
# - The first row is the column headers
# - The dataset must be tagged by at least a single tag (otherwise you could just use Excel).
# - Tags are represented by values in a tag column
# - null values represented as empty cells
# - Examples may be found here: https://github.com/cisco-ie/telemetry
# 3) Follow on-screen instructions
#
# ... and yes, this is program is just a fancy filter
#
import locale
from dialog import Dialog
import csv
import os
import sys
import matplotlib.pyplot
INPUT_FILE = sys.argv[1]
locale.setlocale(locale.LC_ALL, '')
d = Dialog(dialog="dialog", autowidgetsize=True)
d.set_background_title("My fucking data visualizer")
progress = 0
progress_text = ""
def start_progress(text):
global progress
global progress_text
global d
progress = 0
progress_text = text
d.infobox("Reading dataset...\n\n" + progress_text, title="Please wait")
def update_progress():
global progress
progress += 1
if (progress % 1000) == 0:
print("\r{} rows processed".format(progress), end="")
sys.stdout.flush()
# Read headers
header = []
with open(INPUT_FILE, 'r') as f:
reader = csv.reader(f)
header = reader.__next__()
header_example = {}
# find examples
start_progress("Finding examples for each column")
with open(INPUT_FILE, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
update_progress()
for col in header:
if row[col]:
if col not in header_example.keys():
header_example[col] = row[col]
if len(header_example.keys()) == len(header):
break
code, tags = d.checklist("Select fields which are considered as tags",
choices=[(col, header_example[col], False) for col in header],
title="Select tags")
if not tags:
sys.exit(1)
# Select filters
while True:
tagvalues = {}
filtered_tags = {}
for tag in tags:
tagvalues[tag] = {}
start_progress("Searching for tag values")
with open(INPUT_FILE, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
update_progress()
filter_ok = True
for col, filtered_tag in filtered_tags.items():
if row[col] != filtered_tag:
filter_ok = False
if not filter_ok:
continue
if row[tag]:
if row[tag] not in tagvalues[tag].keys():
tagvalues[tag][row[tag]] = 0
tagvalues[tag][row[tag]] += 1
if tagvalues[tag]:
code, choice = d.menu("Select a filter for {}".format(tag),
choices=[(tagval, "{} matches".format(count)) for tagval, count in tagvalues[tag].items()])
if not choice:
sys.exit(1)
filtered_tags[tag] = choice
else:
d.msgbox("No tag values found for {} when applying previously created filter. Skipping the filter for this tag...".format(tag))
# ready to visualize
visualizable_cols = set()
start_progress("Searching for visualizable columns")
with open(INPUT_FILE, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
update_progress()
filter_ok = True
for col, filtered_tag in filtered_tags.items():
if row[col] != filtered_tag:
filter_ok = False
if not filter_ok:
continue
for col in header:
if col not in visualizable_cols:
if row[col]:
try:
float(row[col])
visualizable_cols.add(col)
except:
pass
if not visualizable_cols:
print("Nothing found with this combination")
sys.exit(1)
while True: # after closing the graph, return to column selecting
code, cols_to_see = d.checklist("Now please select the columns you want to see",
choices=[(col, "", False) for col in visualizable_cols])
if code == d.CANCEL:
break # Returng to tag filter setup
data = {}
for col in cols_to_see:
data[col] = []
start_progress("Visualizing")
i = 0
with open(INPUT_FILE, 'r') as f:
reader = csv.DictReader(f)
reader.__next__() # skip header
for row in reader:
update_progress()
filter_ok = True
for col, filtered_tag in filtered_tags.items():
if row[col] != filtered_tag:
filter_ok = False
if not filter_ok:
continue
for col in cols_to_see:
if row[col]:
data[col].append(float(row[col]))
fig, ax = matplotlib.pyplot.subplots(len(cols_to_see), 1)
if len(cols_to_see) == 1:
ax = (ax,)
for i in range(len(cols_to_see)):
ax[i].plot(data[cols_to_see[i]], label=cols_to_see[i])
ax[i].grid(b=True, which='major', axis='both')
ax[i].legend()
matplotlib.pyplot.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment