Skip to content

Instantly share code, notes, and snippets.

@breeko
Last active September 8, 2018 18:29
Show Gist options
  • Save breeko/aa81e44c224b97fbebeb64a582ad7bf1 to your computer and use it in GitHub Desktop.
Save breeko/aa81e44c224b97fbebeb64a582ad7bf1 to your computer and use it in GitHub Desktop.
Lambda Architecture Simple Example
import os
from datetime import datetime as dt
from dateutil import parser
"""
logs
botrank
[user]
good
summary
[year]
[month]
[day]
[yyyy-mm-dd hh:mm:ss]
[count up to that point]
[year]
[month]
[date]
[yyyy-mm-dd hh:mm:ss]
[count]
bad
...
"""
BASE_DIR = "/home/branko/reddit/logs/botrank"
USER = "___alexa___"
USER2 = "AddedColor"
BATCH_KEY = "summary"
TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
DATE_FORMAT = "%Y/%m/%d"
### HELPERS
class NoYearHandlingParserInfo(parser.parserinfo):
yearFirst=True
def convertyear(self, year, *args, **kwargs):
""" dateutils.parser.parse parses 1 as 2001. This corrects it to parse 1 as in 1AD """
return int(year)
class RecordFilter:
BEFORE = 0
BEFORE_OR_EQUAL = 1
AFTER = 2
AFTER_OF_EQUAL = 3
VALID = set([BEFORE, BEFORE_OR_EQUAL, AFTER, AFTER_OF_EQUAL])
parser_info = NoYearHandlingParserInfo(yearfirst=True)
def time_to_date_path(time: dt):
""" Converts datetime object to folder path based on PATH_FORMAT """
return time.strftime(DATE_FORMAT)
def time_to_time_dir(time: dt):
""" Converts datetime object to file name based on FILE_FORMAT """
return time.strftime(TIME_FORMAT)
def file_to_time(p: str):
"""" Converts a file or path into a datetime object """
try:
f = os.path.basename(p)
return parser.parse(f, parserinfo=parser_info, ignoretz=True)
except TypeError:
return None
def get_batch_key(key: str):
""" Converts a key into a batch key """
return os.path.join(key, BATCH_KEY)
def get_int_dirs(path: str):
""" Returns directories in a path whose names are ints """
if os.path.isdir(path):
return [f for f in os.listdir(path) if os.path.isdir(os.path.join(path, f)) and f.isdigit()]
return []
def get_latest_in_path(path: str):
""" Returns max file in a path by its name """
if os.path.isdir(path):
files = [f for f in os.listdir(path)]
if len(files) > 0:
return max(files)
def get_regex_group(pattern: str, string: str, flags: re.RegexFlag = 0):
""" Returns first matching group given regex pattern and string or None if no match """
match = re.search(pattern, string, flags)
if match:
return match.group()
### RECORDS
def add_record(path: str, user: str, key: str, time: dt, val: int = 1):
""" Adds record for user and key sharded on time """
date_path = time_to_date_path(time)
time_path = time_to_time_dir(time)
final_dir = os.path.join(path, user, key, date_path, time_path)
if not os.path.exists(final_dir):
os.makedirs(final_dir)
final_path = os.path.join(final_dir, "{}".format(val))
with open(final_path, "w") as f:
f.write("")
return final_path
def get_records_filtered(path: str, user: str, key: str, time: dt, f: RecordFilter):
""" Returns the number of records after a given time """
filters = [lambda d: d == user, lambda d: d == key]
time_components = time.strftime(DATE_FORMAT).split("/")
if f not in RecordFilter.VALID:
raise ValueError("Invalid record filter: {}. Valid filters: {}".format(f, RecordFilter.VALID))
for t in time_components:
if f in (RecordFilter.BEFORE, RecordFilter.BEFORE_OR_EQUAL):
path_filter = lambda d: d.isdigit() and int(d) <= int(t)
elif f in (RecordFilter.AFTER, RecordFilter.AFTER_OF_EQUAL):
path_filter = lambda d: d.isdigit() and int(d) >= int(t)
filters.append(path_filter)
if f == RecordFilter.BEFORE:
file_filter = lambda f: file_to_time(f) and file_to_time(f) < time
elif f == RecordFilter.BEFORE_OR_EQUAL:
file_filter = lambda f: file_to_time(f) and file_to_time(f) <= time
elif f == RecordFilter.AFTER:
file_filter = lambda f: file_to_time(f) and file_to_time(f) > time
elif f == RecordFilter.AFTER_OF_EQUAL:
file_filter = lambda f: file_to_time(f) and file_to_time(f) >= time
filters.append(file_filter)
paths = [path]
for f in filters:
new_paths = []
for p in paths:
new_dirs = [d for d in os.listdir(p) if f(d)]
for n in new_dirs:
new_paths.append(os.path.join(p, n))
paths = new_paths
num_files = 0
for p in paths:
num_files += sum([int(f) for f in os.listdir(p) if f.isdigit()])
return num_files
def get_latest_record(path: str, user: str, key: str, val: bool = False):
""" Returns latest record """
cur_path = os.path.join(path, user, key)
cur_sub_dirs = get_int_dirs(cur_path)
while len(cur_sub_dirs) > 0:
new_dir = max(cur_sub_dirs, key=lambda x: int(x))
cur_path = os.path.join(cur_path, new_dir)
cur_sub_dirs = get_int_dirs(cur_path)
latest_file = get_latest_in_path(cur_path)
if latest_file:
latest_path = os.path.join(cur_path, latest_file)
if val:
latest_file_val = sum([int(f) for f in os.listdir(latest_path) if f.isdigit()])
return latest_file_val
return latest_path
def get_latest_record_time(path: str, user: str, key: str):
""" Returns time of lastest record """
latest_record = get_latest_record(path, user, key, val=False)
return file_to_time(latest_record)
### BATCHING
def batch(path: str, user: str, key: str):
""" Batches record for given path, user and key """
records_to_batch = get_records_filtered(path, user, key, dt.min, RecordFilter.AFTER_OF_EQUAL)
return add_summary(path, user, key, dt.now(), records_to_batch)
def batch_all_keys(path: str, user: str):
""" Batches all records for given path and user"""
user_path = os.path.join(path, user)
keys = os.listdir(user_path)
return [batch(path, user, k) for k in keys]
def batch_all_users(path: str):
""" Batches all records for given path"""
users = os.listdir(path)
batch_summaries = []
for user in users:
batch_summaries.append(batch_all_keys(path, user))
return batch_summaries
def get_latest_batch(path: str, user: str, key: str, val: bool = False):
""" Returns latest batch result """
new_key = get_batch_key(key)
return get_latest_record(path, user, new_key, val)
def get_latest_batch_time(path: str, user: str, key: str):
""" Returns time of last batch """
latest_batch = get_latest_batch(path, user, key, val=False)
return file_to_time(latest_batch)
def add_summary(path: str, user: str, key: str, time: dt, val: int):
""" Adds a summary record for user and key sharded on time """
new_key = get_batch_key(key)
val = "{}".format(val)
return add_record(path, user, new_key, time, val)
### SPEED
def update_batch(path: str, user: str, key: str):
""" Updates summary without going through all records"""
latest_batch_val = get_latest_batch(path, user, key, val=True)
latest_batch_time = get_latest_batch_time(path, user, key)
records_to_update = get_records_filtered(path, user, key, latest_batch_time, RecordFilter.AFTER_OF_EQUAL)
update_val = int(latest_batch_val) + records_to_update
return add_summary(path, user, key, dt.now(), update_val)
# Adding a base record
add_record(BASE_DIR, USER, "good", dt.min, val=100)
# '.../botrank/___alexa___/good/1/01/01/1-01-01 00:00:00/100'
# Retrieve last summary
get_latest_batch(BASE_DIR, USER, "good", val=True)
# None (nothing batched yet)
# Batching all records
batch(BASE_DIR, USER, "good")
# '/home/branko/reddit/logs/botrank/___alexa___/good/summary/2018/09/08/2018-09-08 14:15:52/100'
# Retrieving latest batch value
get_latest_batch_time(BASE_DIR, USER, "good")
# datetime.datetime(2018, 9, 8, 14, 15, 52)
# Retrieve last summary
get_latest_batch(BASE_DIR, USER, "good", val=True)
# 100
# Adding a record
add_record(BASE_DIR, USER, "good", dt.now())
# '/home/branko/reddit/logs/botrank/___alexa___/good/2018/09/08/2018-09-08 14:16:43/1'
# Retrieving latest batch
get_latest_batch(BASE_DIR, USER, "good", val=True)
# '100' (record we just added hasn't been batched yet)
# Batching all records for a given key
batch(BASE_DIR, USER, "good")
# '/home/branko/reddit/logs/botrank/___alexa___/good/summary/2018/09/08/2018-09-08 14:17:04/101'
# Retrieving latest batch
get_latest_batch(BASE_DIR, USER, "good", val=True)
# '101' (record we just added has now been batched)
# Adding a record with multiple values
add_record(BASE_DIR, USER, "good", dt.now(), val=3)
# '/home/branko/reddit/logs/botrank/___alexa___/good/2018/09/08/2018-09-08 14:17:20/3'
# Updating batch based on prior summary and new records added
update_batch(BASE_DIR, USER, "good")
# '/home/branko/reddit/logs/botrank/___alexa___/good/summary/2018/09/08/2018-09-08 14:17:33/104'
# Adding a log for another key
add_record(BASE_DIR, USER, "bad", dt.now())
# '/home/branko/reddit/logs/botrank/___alexa___/bad/2018/09/08/2018-09-08 14:17:48/1'
# Batching all keys
batch_all_keys(BASE_DIR, USER)
# ['/home/branko/reddit/logs/botrank/___alexa___/bad/summary/2018/09/08/2018-09-08 14:18:28/1',
# '/home/branko/reddit/logs/botrank/___alexa___/good/summary/2018/09/08/2018-09-08 14:18:28/104']
# Adding a new user
add_record(BASE_DIR, USER2, "good", dt.now())
# '/home/branko/reddit/logs/botrank/AddedColor/good/2018/09/08/2018-09-08 14:18:41/1'
# Batching all users
batch_all_users(BASE_DIR)
# [['/home/branko/reddit/logs/botrank/___alexa___/bad/summary/2018/09/08/2018-09-08 14:18:52/1',
# '/home/branko/reddit/logs/botrank/___alexa___/good/summary/2018/09/08/2018-09-08 14:18:52/104'],
# ['/home/branko/reddit/logs/botrank/AddedColor/good/summary/2018/09/08/2018-09-08 14:18:52/1']]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment