Skip to content

Instantly share code, notes, and snippets.

@kanzure
Created November 21, 2015 19:25
Show Gist options
  • Save kanzure/110ecb73abc3a9f9c28b to your computer and use it in GitHub Desktop.
Save kanzure/110ecb73abc3a9f9c28b to your computer and use it in GitHub Desktop.
hplusroadmap irc log visualization
"""
Render an image of the "text mass" per minute per day of logs. Horizontal
minutes against vertical days. The color of each pixel represents the relative
text mass for that time slice (that minute).
http://gnusha.org/logs/graphs/300-days-anycolor-heatmap-cropped-sorted.png
1440 minutes/day
2555 days
1440 * 2555
"""
import time
import os
import sys
import logging
import re
import matplotlib
from matplotlib import pyplot as plt
import numpy
# all log files combined into a single file
MEGALOG_PATH = "./megalog.txt"
# for matching the beginning of a log line HH:MM
hhmm_regex_spec = r'^([0-9]|0[0-9]|1[0-9]|2[0-3]):[0-5][0-9]'
hhmm_regex = re.compile(hhmm_regex_spec)
# disqualify "HH:MM -!-"
hhmm_status_disqualifier_regex_spec = r'^([0-9]|0[0-9]|1[0-9]|2[0-3]):[0-5][0-9] -!-'
hhmm_status_disqualifier_regex = re.compile(hhmm_status_disqualifier_regex_spec)
# disqualify "HH:MM ["
hhmm_nick_list_disqualifier_regex_spec = r'^([0-9]|0[0-9]|1[0-9]|2[0-3]):[0-5][0-9] \['
hhmm_nick_list_disqualifier_regex = re.compile(hhmm_nick_list_disqualifier_regex_spec)
def setup_logging():
"""
Log everything to stdout.
"""
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setLevel(logging.DEBUG)
streamhandler.setFormatter(formatter)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(streamhandler)
return logger
logger = setup_logging()
def make_file_path(name, output_dirname="output"):
"""
Create a new filepath for the output file.
"""
timestamp = int(time.time())
current_dirpath = os.path.realpath(".")
graph_dirpath = os.path.join(current_dirpath, output_dirname)
output_image_path = os.path.join(graph_dirpath, "{name}.{timestamp}.png".format(name=name, timestamp=timestamp))
return output_image_path
def find_log_files():
"""
Get file path list of all log files.
"""
current_dirpath = os.path.realpath(".")
log_dirpath = os.path.join(current_dirpath, "logs")
log_filenames = os.listdir(log_dirpath)
log_filepaths = [os.path.join(log_dirpath, log_filename) for log_filename in log_filenames]
# remove .swp files if any...
filtered_log_filepaths = []
for log_filepath in log_filepaths:
if log_filepath[-4:] == ".log":
filtered_log_filepaths.append(log_filepath)
return sorted(filtered_log_filepaths)
def make_timestamp_list():
"""
Make a list of HH:MM possibilities for log line timestamps.
"""
timestamps = []
for hour in range(0, 24):
for minute in range(0, 60):
timestamps.append("{hour:02d}:{minute:02d}".format(hour=hour, minute=minute))
return timestamps
def generate_textmass_image(output_image_path=None):
"""
Render an image of "text mass" per minute per day of logs.
"""
# The default textmass is the score of each HH:MM timestamp log line, prior
# to looking at the actual log lines.
default_textmass = 0
# counts linelength for every line that is processed
linelengths = []
if not output_image_path:
output_image_path = make_file_path("textmass")
log_files = find_log_files()
# TODO: remove this restriction
log_files = log_files[0:300]
# Make map of (filepath, ISO 8601 date) useful for populating the timestamp
# data structure below and other purposes.
log_filenames_map = {filepath:filepath.split("/")[-1].split(".")[0] for filepath in log_files}
# HH:MM possibilities for the definition of a single day
timestamps = make_timestamp_list()
# Setup a map of every date having a list of all HH:MM possibilities and
# start them off with a default textmass.
timestamp_line_lengths = {logdatename:{timestamp:default_textmass for timestamp in timestamps} for logdatename in log_filenames_map.values()}
# process each file
for (logfilepath, logdatename) in log_filenames_map.iteritems():
longest_line_in_log = 0
logger.info("Processing: {}".format(logdatename))
with open(logfilepath, "r") as log_fd:
logcontent = log_fd.read()
loglines = logcontent.split("\n")
# remove the (empty) last line
if len(loglines[-1]) == 0:
del loglines[-1]
for logline in loglines:
# quick sanity check
if len(logline) < 6: # 6 == len("HH:MM ")
logger.warn("Log file {} has line with length less than 6 (this is unexpected but not showstopping)".format(logdatename))
else:
# not all lines start with HH:MM and they should be ignored
matched = hhmm_regex.match(logline)
if matched:
# skip lines with "HH:MM -!- " because they are not relevant log lines
if hhmm_status_disqualifier_regex.match(logline):
continue
# skip lines with "HH:MM [" because they are nick lists
if hhmm_nick_list_disqualifier_regex.match(logline):
continue
# to which HHMM value should the line length contribute?
hhmm = matched.group()
# line length should probably not include the username..
# Also, this conveniently removes the HHMM timestamp from
# the length calculation. Use + 1 to avoid the space after
# the nickname.
partial_logline = logline[logline.find(">") + 2:]
linelength = len(partial_logline)
# record the line length
timestamp_line_lengths[logdatename][hhmm] += linelength
#if linelength == 1:
# logger.info("line with length 1 is: {}".format(logline))
#linelengths.append((linelength, logline, partial_logline))
linelengths.append(linelength)
if linelength > longest_line_in_log:
longest_line_in_log = linelength
#if linelength > 900:
# logger.info("Superlong line of length {} is line: {}".format(linelength, logline))
logger.info("++ longest line in log: {}".format(longest_line_in_log))
data = []
for filename in sorted(timestamp_line_lengths.keys()):
some_day = timestamp_line_lengths[filename]
data.append(some_day.values())
data = numpy.array(data)
heatmap = plt.pcolor(data) #, cmap=matplotlib.cm.Blues)
plt.show()
return linelengths
if __name__ == "__main__":
generate_textmass_image()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment