Skip to content

Instantly share code, notes, and snippets.

@breeko
Created September 8, 2018 17:39
Show Gist options
  • Save breeko/73d5f030d5498fba5302106265a9fa12 to your computer and use it in GitHub Desktop.
Save breeko/73d5f030d5498fba5302106265a9fa12 to your computer and use it in GitHub Desktop.
Lambda architecture database returning filtered records
import os
from datetime import datetime as dt
from dateutil import parser
### HELPERS
class NoYearHandlingParserInfo(parser.parserinfo):
yearFirst=True
def convertyear(self, year, *args, **kwargs):
""" dateutils.parser.parse parses 1 as 2001. This corrects it to parse 1 as in 1AD """
return int(year)
class RecordFilter:
BEFORE = 0
BEFORE_OR_EQUAL = 1
AFTER = 2
AFTER_OF_EQUAL = 3
VALID = set([BEFORE, BEFORE_OR_EQUAL, AFTER, AFTER_OF_EQUAL])
parser_info = NoYearHandlingParserInfo(yearfirst=True)
def get_records_filtered(path: str, user: str, key: str, time: dt, f: RecordFilter):
""" Returns the number of records after a given time """
filters = [lambda d: d == user, lambda d: d == key]
time_components = time.strftime(DATE_FORMAT).split("/")
if f not in RecordFilter.VALID:
raise ValueError("Invalid record filter: {}. Valid filters: {}".format(f, RecordFilter.VALID))
for t in time_components:
if f in (RecordFilter.BEFORE, RecordFilter.BEFORE_OR_EQUAL):
path_filter = lambda d: d.isdigit() and int(d) <= int(t)
elif f in (RecordFilter.AFTER, RecordFilter.AFTER_OF_EQUAL):
path_filter = lambda d: d.isdigit() and int(d) >= int(t)
filters.append(path_filter)
if f == RecordFilter.BEFORE:
file_filter = lambda f: file_to_time(f) and file_to_time(f) < time
elif f == RecordFilter.BEFORE_OR_EQUAL:
file_filter = lambda f: file_to_time(f) and file_to_time(f) <= time
elif f == RecordFilter.AFTER:
file_filter = lambda f: file_to_time(f) and file_to_time(f) > time
elif f == RecordFilter.AFTER_OF_EQUAL:
file_filter = lambda f: file_to_time(f) and file_to_time(f) >= time
filters.append(file_filter)
paths = [path]
for f in filters:
new_paths = []
for p in paths:
new_dirs = [d for d in os.listdir(p) if f(d)]
for n in new_dirs:
new_paths.append(os.path.join(p, n))
paths = new_paths
num_files = 0
for p in paths:
num_files += sum([int(f) for f in os.listdir(p) if f.isdigit()])
return num_files
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment