Created
February 15, 2017 04:35
-
-
Save SegFaultAX/c25a7315f6f48b594d22d90e2e24743f to your computer and use it in GitHub Desktop.
Demonstration of timeseries data, windowing, and aggregation [python]
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import random | |
import itertools | |
import pprint | |
TIME_FORMAT = "%Y-%m-%dT%H:%M:%S" | |
NUM_USERS = 1e6 | |
BROWSERS = ["Chrome", "Firefox", "IE"] | |
def logstream(start_time, end_time, max_step=10000): | |
current = start_time | |
while current <= end_time: | |
ts = current.strftime(TIME_FORMAT) | |
yield { | |
"user_id": random.randint(0, NUM_USERS), | |
"browser": random.choice(BROWSERS), | |
"timestamp": ts, | |
} | |
current += datetime.timedelta(milliseconds=random.randint(0, max_step)) | |
def aggregate_by_key(keyfn, aggfn, windowfn, logs): | |
acc = {} | |
for log in logs: | |
key = keyfn(log) | |
windows = windowfn(log) | |
acc.setdefault(key, {}) | |
for window in windows: | |
acc[key].setdefault(window, (None, [])) | |
res, vals = acc[key][window] | |
new_vals = vals + [log] | |
new_res = aggfn(new_vals) | |
acc[key][window] = (new_res, new_vals) | |
return acc | |
def count(logs): | |
return len(logs) | |
def truncate(dt, interval): | |
seconds = (dt - dt.min).seconds | |
offset = seconds % interval.total_seconds() | |
return dt - datetime.timedelta(0, offset, dt.microsecond) | |
def browser_key(log): | |
return log["browser"] | |
def extract_timestamp(log): | |
return datetime.datetime.strptime(log["timestamp"], TIME_FORMAT) | |
def window(extractorfn, resolution, step=None): | |
if step is None: | |
step = resolution | |
if step.total_seconds() > resolution.total_seconds(): | |
raise ValueError("step must not be larger than resolution") | |
def windowfn(log): | |
dt = extractorfn(log) | |
start = truncate(dt - resolution + step, step) | |
while start <= dt: | |
yield start.strftime(TIME_FORMAT) | |
start += step | |
return windowfn | |
def graph(timeseries, browser, resolution, start, end): | |
current = truncate(start, resolution) | |
while current < end: | |
ts = current.strftime(TIME_FORMAT) | |
print "{}:{} = {}".format( | |
browser, | |
ts, | |
"#" * timeseries.get(browser, {}).get(ts, (0, None))[0]) | |
current += resolution | |
end = datetime.datetime.utcnow() | |
start = end - datetime.timedelta(hours=4) | |
resolution = datetime.timedelta(minutes=5) | |
step = datetime.timedelta(minutes=5) | |
logs = logstream(start, end) | |
timeseries = aggregate_by_key(browser_key, count, window(extract_timestamp, resolution, step), logs) | |
graph(timeseries, "Chrome", step, start, end) | |
pprint.pprint(timeseries) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment