Created
September 8, 2018 17:39
-
-
Save breeko/73d5f030d5498fba5302106265a9fa12 to your computer and use it in GitHub Desktop.
Lambda architecture database returning filtered records
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from datetime import datetime as dt | |
from dateutil import parser | |
### HELPERS | |
class NoYearHandlingParserInfo(parser.parserinfo): | |
yearFirst=True | |
def convertyear(self, year, *args, **kwargs): | |
""" dateutils.parser.parse parses 1 as 2001. This corrects it to parse 1 as in 1AD """ | |
return int(year) | |
class RecordFilter: | |
BEFORE = 0 | |
BEFORE_OR_EQUAL = 1 | |
AFTER = 2 | |
AFTER_OF_EQUAL = 3 | |
VALID = set([BEFORE, BEFORE_OR_EQUAL, AFTER, AFTER_OF_EQUAL]) | |
parser_info = NoYearHandlingParserInfo(yearfirst=True) | |
def get_records_filtered(path: str, user: str, key: str, time: dt, f: RecordFilter): | |
""" Returns the number of records after a given time """ | |
filters = [lambda d: d == user, lambda d: d == key] | |
time_components = time.strftime(DATE_FORMAT).split("/") | |
if f not in RecordFilter.VALID: | |
raise ValueError("Invalid record filter: {}. Valid filters: {}".format(f, RecordFilter.VALID)) | |
for t in time_components: | |
if f in (RecordFilter.BEFORE, RecordFilter.BEFORE_OR_EQUAL): | |
path_filter = lambda d: d.isdigit() and int(d) <= int(t) | |
elif f in (RecordFilter.AFTER, RecordFilter.AFTER_OF_EQUAL): | |
path_filter = lambda d: d.isdigit() and int(d) >= int(t) | |
filters.append(path_filter) | |
if f == RecordFilter.BEFORE: | |
file_filter = lambda f: file_to_time(f) and file_to_time(f) < time | |
elif f == RecordFilter.BEFORE_OR_EQUAL: | |
file_filter = lambda f: file_to_time(f) and file_to_time(f) <= time | |
elif f == RecordFilter.AFTER: | |
file_filter = lambda f: file_to_time(f) and file_to_time(f) > time | |
elif f == RecordFilter.AFTER_OF_EQUAL: | |
file_filter = lambda f: file_to_time(f) and file_to_time(f) >= time | |
filters.append(file_filter) | |
paths = [path] | |
for f in filters: | |
new_paths = [] | |
for p in paths: | |
new_dirs = [d for d in os.listdir(p) if f(d)] | |
for n in new_dirs: | |
new_paths.append(os.path.join(p, n)) | |
paths = new_paths | |
num_files = 0 | |
for p in paths: | |
num_files += sum([int(f) for f in os.listdir(p) if f.isdigit()]) | |
return num_files |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment