Skip to content

Instantly share code, notes, and snippets.

@SegFaultAX
Created February 15, 2017 04:35
Show Gist options
  • Save SegFaultAX/c25a7315f6f48b594d22d90e2e24743f to your computer and use it in GitHub Desktop.
Save SegFaultAX/c25a7315f6f48b594d22d90e2e24743f to your computer and use it in GitHub Desktop.
Demonstration of timeseries data, windowing, and aggregation [python]
import datetime
import random
import itertools
import pprint
TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
NUM_USERS = 1e6
BROWSERS = ["Chrome", "Firefox", "IE"]
def logstream(start_time, end_time, max_step=10000):
current = start_time
while current <= end_time:
ts = current.strftime(TIME_FORMAT)
yield {
"user_id": random.randint(0, NUM_USERS),
"browser": random.choice(BROWSERS),
"timestamp": ts,
}
current += datetime.timedelta(milliseconds=random.randint(0, max_step))
def aggregate_by_key(keyfn, aggfn, windowfn, logs):
acc = {}
for log in logs:
key = keyfn(log)
windows = windowfn(log)
acc.setdefault(key, {})
for window in windows:
acc[key].setdefault(window, (None, []))
res, vals = acc[key][window]
new_vals = vals + [log]
new_res = aggfn(new_vals)
acc[key][window] = (new_res, new_vals)
return acc
def count(logs):
return len(logs)
def truncate(dt, interval):
seconds = (dt - dt.min).seconds
offset = seconds % interval.total_seconds()
return dt - datetime.timedelta(0, offset, dt.microsecond)
def browser_key(log):
return log["browser"]
def extract_timestamp(log):
return datetime.datetime.strptime(log["timestamp"], TIME_FORMAT)
def window(extractorfn, resolution, step=None):
if step is None:
step = resolution
if step.total_seconds() > resolution.total_seconds():
raise ValueError("step must not be larger than resolution")
def windowfn(log):
dt = extractorfn(log)
start = truncate(dt - resolution + step, step)
while start <= dt:
yield start.strftime(TIME_FORMAT)
start += step
return windowfn
def graph(timeseries, browser, resolution, start, end):
current = truncate(start, resolution)
while current < end:
ts = current.strftime(TIME_FORMAT)
print "{}:{} = {}".format(
browser,
ts,
"#" * timeseries.get(browser, {}).get(ts, (0, None))[0])
current += resolution
end = datetime.datetime.utcnow()
start = end - datetime.timedelta(hours=4)
resolution = datetime.timedelta(minutes=5)
step = datetime.timedelta(minutes=5)
logs = logstream(start, end)
timeseries = aggregate_by_key(browser_key, count, window(extract_timestamp, resolution, step), logs)
graph(timeseries, "Chrome", step, start, end)
pprint.pprint(timeseries)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment