Last active
October 11, 2018 17:19
-
-
Save Daenyth/8e822b4e230e8613f7bcb1ef74167c79 to your computer and use it in GitHub Desktop.
Python date range interval storage, merging, compaction
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
""" | |
Provides logic for tracking known and missing date ranges. | |
It can be used to track data acquisition from an external API. | |
It allows you to say: "I need data from D1-D2, and I have these chunks: [Dn..Dm]; what date ranges am I missing?" | |
Data can be tracked per item, for example: | |
ScheduleKey("GA", "credential_id", "http://google.com") or | |
ScheduleKey("Marketo", "instance_id", "activities") | |
""" | |
from __future__ import absolute_import, division, print_function, unicode_literals | |
import itertools | |
from datetime import datetime | |
import pytz | |
from future.utils import text_type | |
from psycopg2._psycopg import cursor as Cursor # pylint: disable=unused-import,no-name-in-module | |
from typing import Any, Dict, Iterable, Iterator, List, NamedTuple, Optional, Tuple, cast # pylint: disable=unused-import | |
from dbutil import insert_values | |
class DateRange(NamedTuple('DateRange', [('begin', datetime), ('end', datetime)])): | |
DATE_FMT = '%Y-%m-%dT%H:%M:%SZ' | |
@property | |
def to_db(self): | |
# type: () -> Tuple[text_type, text_type] | |
return self.begin.strftime(self.DATE_FMT), self.end.strftime(self.DATE_FMT) | |
@classmethod | |
def from_db(cls, range_str): | |
# type: (text_type) -> 'DateRange' | |
begin, end = range_str.lstrip('(').rstrip(')').split(',', 2) | |
to_date = lambda datestr: datetime.strptime(datestr, cls.DATE_FMT).replace(tzinfo=pytz.UTC) | |
return cls(to_date(begin), to_date(end)) | |
ScheduleKey = NamedTuple('ScheduleKey', [('data_type', text_type), ('data_source', text_type), ('data_id', text_type)]) | |
RangeInfo = NamedTuple( | |
'RangeInfo', | |
[ | |
('known_ranges', List[DateRange]), | |
('start', datetime), # The date when we should begin collecting info | |
('finish', Optional[datetime]), # The date when we should stop collecting info | |
] | |
) | |
def range_gaps(ranges, hard_start, hard_end): | |
# type: (Iterable[DateRange], datetime, datetime) -> Iterator[DateRange] | |
assert hard_start < hard_end | |
sorted_ranges = iter(sorted(filter(lambda r: r.begin != r.end, ranges))) | |
try: | |
current = next(sorted_ranges) | |
except StopIteration: | |
yield DateRange(hard_start, hard_end) | |
return | |
if current.begin > hard_start: | |
yield DateRange(hard_start, current.begin) | |
current_end = current.end | |
for next_ in sorted_ranges: | |
if next_.begin > current_end: | |
yield DateRange(current_end, next_.begin) | |
current_end = max(current_end, next_.end) | |
if current_end < hard_end: | |
yield DateRange(current_end, hard_end) | |
def merge_ranges(new_ranges, known_ranges): | |
# type: (Iterable[DateRange], Iterable[DateRange]) -> Iterable[DateRange] | |
return compact_ranges(itertools.chain(new_ranges, known_ranges)) | |
def remove_range(ranges, range_to_remove): # pylint: disable=too-many-branches | |
# type: (Iterable[DateRange], DateRange) -> Iterator[DateRange] | |
assert range_to_remove.begin != range_to_remove.end | |
iter_ranges = compact_ranges(ranges) | |
for range in iter_ranges: | |
if range.end <= range_to_remove.begin: | |
yield range | |
continue | |
elif range.begin >= range_to_remove.end: | |
yield range | |
break | |
elif range.begin < range_to_remove.begin: | |
if range.begin < range_to_remove.end: | |
yield DateRange(range.begin, range_to_remove.begin) | |
continue | |
elif range.end > range_to_remove.end: | |
yield DateRange(range.begin, range_to_remove.begin) | |
yield DateRange(range_to_remove.end, range.end) | |
break | |
elif range.end > range_to_remove.begin: | |
yield DateRange(begin=range.begin, end=range_to_remove.begin) | |
continue | |
else: | |
yield DateRange(begin=range_to_remove.end, end=range.end) | |
continue | |
elif range.begin > range_to_remove.begin: | |
if range.end < range_to_remove.end: | |
continue | |
elif range.end > range_to_remove.end: | |
yield DateRange(range_to_remove.end, range.end) | |
break | |
elif range.begin == range_to_remove.begin: | |
if range.end == range_to_remove.end: | |
break | |
elif range.end < range_to_remove.end: | |
continue | |
elif range.end > range_to_remove.end: | |
yield DateRange(range_to_remove.end, range.end) | |
continue | |
raise ValueError("Shouldn't be able to reach here. range={} remove={}".format(range, range_to_remove)) | |
for range in iter_ranges: | |
yield range | |
def compact_ranges(ranges): | |
# type: (Iterable[DateRange]) -> Iterator[DateRange] | |
"""Compact ranges by replacing overlapping ranges with one range covering the whole span.""" | |
iter_ranges = iter(sorted(filter(lambda r: r.begin != r.end, ranges))) | |
current = next(iter_ranges) | |
for next_ in iter_ranges: | |
if current == next_: | |
continue # Remove duplicates | |
if current.end < next_.begin: | |
yield current # No range overlap - yield and then move on | |
current = next_ | |
continue | |
# If we reach here we know that next_.begin < current.end, so they overlap. | |
# Change the overlap into a single range. | |
current = DateRange(current.begin, max(current.end, next_.end)) | |
continue | |
if current.begin != current.end: | |
yield current | |
def get_ranges(cursor, key): | |
# type: (Cursor, ScheduleKey) -> Optional[RangeInfo] | |
# Return Optional[List] because callers need to distinguish between row-missing and row-present-with-no-data | |
sql = """ | |
SELECT known_ranges, start, finish FROM api_schedule_status | |
WHERE data_id = %(data_id)s AND data_source = %(data_source)s AND data_type = %(data_type)s | |
""" | |
cursor.execute(sql, dict(data_id=key.data_id, data_source=key.data_source, data_type=key.data_type)) | |
row = cursor.fetchone() # type: Optional[Tuple[text_type, datetime, Optional[datetime]]] | |
if row is None: | |
return None | |
ranges_ = [DateRange.from_db(range_) for range_ in row[0]] | |
start, finish = row[1], row[2] | |
return RangeInfo(ranges_, start, finish) | |
def get_multiple_ranges(cursor, keys): | |
# type: (Cursor, List[ScheduleKey]) -> Dict[ScheduleKey, RangeInfo] | |
join_table_sql = """ | |
CREATE TEMP TABLE schedule_keys | |
(data_type TEXT, data_source TEXT, data_id TEXT) | |
ON COMMIT DROP; | |
INSERT INTO schedule_keys VALUES {}; | |
""" | |
select_sql = """ | |
SELECT s.data_type, s.data_source, s.data_id, s.known_ranges, s.start, s.finish | |
FROM api_schedule_status s | |
JOIN schedule_keys k | |
ON k.data_id = s.data_id and k.data_source = s.data_source AND k.data_type = s.data_type | |
""" | |
insert_values(cursor, join_table_sql, cast(List[Tuple[Any, ...]], keys)) | |
cursor.execute(select_sql) | |
return { | |
ScheduleKey(r.data_type, r.data_source, r.data_id): | |
RangeInfo([DateRange.from_db(kr) for kr in r.known_ranges], r.start, r.finish) | |
for r in cursor.fetchall() | |
} | |
def upsert_ranges(cursor, key, new_ranges, start, end): | |
# type: (Cursor, ScheduleKey, List[DateRange], datetime, Optional[datetime]) -> Iterable[DateRange] | |
while True: | |
try: | |
return update_ranges(cursor, key, new_ranges) | |
except LookupError: | |
inserted = insert_ranges(cursor, key, new_ranges, start, end) | |
if not inserted: | |
continue | |
return new_ranges | |
def update_ranges(cursor, key, new_ranges): | |
# type: (Cursor, ScheduleKey, List[DateRange]) -> Iterable[DateRange] | |
compare_and_set_update = """ | |
UPDATE api_schedule_status | |
SET known_ranges = %(new_ranges)s | |
WHERE data_id = %(data_id)s | |
AND data_source = %(data_source)s | |
AND known_ranges = %(old_ranges)s | |
""" | |
while True: | |
range_info = get_ranges(cursor, key) | |
if range_info is None: | |
raise LookupError("No known ranges found with key {key}".format(key=key)) | |
else: | |
merged_ranges = list(merge_ranges(range_info.known_ranges, new_ranges)) | |
params = dict( | |
data_id=key.data_id, | |
data_source=key.data_source, | |
new_ranges=to_db(merged_ranges), | |
old_ranges=to_db(range_info.known_ranges) | |
) | |
cursor.execute(compare_and_set_update, params) | |
rows_updated = cursor.rowcount | |
if rows_updated != 0: | |
break | |
return merged_ranges | |
def insert_ranges(cursor, key, ranges, start, end): | |
# type: (Cursor, ScheduleKey, List[DateRange], datetime, Optional[datetime]) -> int | |
conditional_insert = """ | |
INSERT INTO api_schedule_status (data_type, data_source, data_id, known_ranges, start, finish) | |
VALUES (%(data_type)s, %(data_source)s, %(data_id)s, %(new_ranges)s, %(start)s, %(finish)s) | |
ON CONFLICT (data_type, data_source, data_id) DO NOTHING | |
""" | |
params = dict( | |
data_id=key.data_id, | |
data_source=key.data_source, | |
data_type=key.data_type, | |
new_ranges=to_db(ranges), | |
start=start, | |
finish=end | |
) | |
cursor.execute(conditional_insert, params) | |
return cast(int, cursor.rowcount) | |
def to_db(ranges): | |
# type: (List[DateRange]) -> List[Tuple[text_type, text_type]] | |
return [r.to_db for r in ranges] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import absolute_import, division, print_function, unicode_literals | |
import sqlalchemy as sa | |
from sqlalchemy import Column, Table # pylint: disable=unused-import | |
from sqlalchemy.dialects import postgresql as pg | |
from sqlalchemy.sql.schema import MetaData # pylint: disable=unused-import | |
def ApiScheduleStatus(metadata): | |
# type: (MetaData) -> Table | |
# CREATE TABLE api_schedule_status ( | |
# data_id TEXT PRIMARY KEY, | |
# data_source TEXT PRIMARY KEY, | |
# data_type TEXT, | |
# start TEXT, | |
# finish TEXT, | |
# known_ranges text[][] | |
# ) | |
return Table( | |
"api_schedule_status", | |
metadata, | |
Column("data_type", sa.TEXT, primary_key=True), | |
Column("data_source", sa.TEXT, primary_key=True), | |
Column("data_id", sa.TEXT, primary_key=True), | |
Column("start", sa.TEXT, nullable=False), | |
Column("finish", sa.TEXT, nullable=True), | |
# This is conceptually a nested array, but postgres doesn't treat multi-dimensional | |
# arrays differently from single-dimension arrays. | |
Column("known_ranges", pg.ARRAY(sa.TEXT)), | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import absolute_import, print_function, unicode_literals, division | |
import logging | |
from contextlib import contextmanager | |
from functools import partial | |
from timeit import default_timer | |
from builtins import filter as ifilter, map as imap, range | |
from datetime import datetime, timedelta | |
from dateutil.tz import tzutc | |
from typing import Any, List, Tuple, cast # pylint: disable=unused-import,import-error | |
from future.utils import text_type # pylint: disable=unused-import | |
def insert_values(cursor, sql_template, values): | |
# type: (Any, text_type, List[Tuple]) -> int | |
""" | |
Accepts a list of value tuples and bulk inserts them. | |
:param cursor: | |
:param sql_template: sql like 'INSERT INTO table (a, b) VALUES {}'. | |
The column list must be the same length as arity of the tuples in values. | |
:param values: A list of tuples | |
:return: Rowcount inserted | |
""" | |
if not values: | |
return 0 | |
column_part = ', '.join(['%s'] * len(values[0])) | |
per_row_part = "({})".format(column_part) | |
value_template = ', '.join(safe_unicode(cursor.mogrify(per_row_part, row)) for row in values) | |
insert_sql_with_parameter_placeholders = sql_template.format(value_template) | |
cursor.execute(insert_sql_with_parameter_placeholders) | |
return cast(int, cursor.rowcount) | |
def safe_unicode(obj, strict=False): | |
# type: (Union[text_type, binary_type], bool) -> text_type | |
if isinstance(obj, binary_type): | |
return obj.decode('utf-8') | |
if strict: | |
assert isinstance(obj, text_type) | |
return obj |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import absolute_import, division, print_function, unicode_literals | |
from datetime import datetime | |
import hypothesis.strategies as st | |
import pytz | |
from hypothesis import assume, given | |
from date_intervals import DateRange, compact_ranges, range_gaps, remove_range | |
# pylint: disable=no-value-for-parameter | |
def basic_datetimes(min_year=None): | |
min_datetime = datetime(min_year, 1, 1) if min_year is not None else None | |
return st.datetimes(timezones=st.just(pytz.UTC), min_datetime=min_datetime).map(lambda d: d.replace(microsecond=0)) | |
@st.composite | |
def date_ranges(draw, min_year=None, allow_empty=True): | |
d1 = draw(basic_datetimes(min_year)) | |
d2 = draw(basic_datetimes(min_year)) | |
if not allow_empty: | |
assume(d1 != d2) | |
start, end = sorted([d1, d2]) | |
return DateRange(start, end) | |
@given(date_ranges(allow_empty=False)) | |
def test_range_gaps__no_gaps(dr): | |
assert list(range_gaps([dr], dr.begin, dr.end)) == [] | |
@given(date_ranges(allow_empty=False)) | |
def test_range_gaps__no_known_range(dr): | |
"""The missing gaps when no values are known is the full start to end range""" | |
missing = list(range_gaps([], dr.begin, dr.end)) | |
assert missing == [dr] | |
@given(st.lists(date_ranges(allow_empty=False), min_size=1)) | |
def test_compact_ranges(ranges): | |
start = min(r.begin for r in ranges) | |
end = max(r.end for r in ranges) | |
gaps = lambda r: list(range_gaps(r, start, end)) | |
compacted = list(compact_ranges(ranges)) | |
assert len(compacted) <= len(ranges) | |
assert gaps(ranges) == gaps(compacted) | |
@given(st.lists(date_ranges(), min_size=1)) | |
def test_compacting_range_with_missing(ranges): | |
start = min(r.begin for r in ranges) | |
end = max(r.end for r in ranges) | |
assume(start != end) | |
missing = list(range_gaps(ranges, start, end)) | |
all_ranges = missing + ranges | |
compacted = list(compact_ranges(all_ranges)) | |
assert len(compacted) == 1 | |
@given(st.lists(date_ranges())) | |
def test_compact_ranges__idempotent(ranges): | |
compact = lambda r: list(compact_ranges(r)) | |
compacted = compact(ranges) | |
assert compacted == compact(compacted) | |
@given(basic_datetimes()) | |
def test_compact_ranges__bad_range(dt): | |
"""DateRange is inclusive at beginning and exclusive at end, so begin==end doesn't make sense""" | |
assert list(compact_ranges([DateRange(dt, dt)])) == [] | |
@given(st.lists(date_ranges())) | |
def test_compact_ranges__sorted(ranges): | |
compacted = list(compact_ranges(ranges)) | |
assert compacted == sorted(compacted) | |
@given(basic_datetimes(), basic_datetimes(), basic_datetimes(), basic_datetimes()) | |
def test_range_examples(d1, d2, d3, d4): | |
d1, d2, d3, d4 = sorted([d1, d2, d3, d4]) | |
assume(len({d1, d2, d3, d4}) == 4) | |
date_range = DateRange(d2, d3) | |
gaps = list(range_gaps([date_range], d1, d4)) | |
assert gaps == [DateRange(d1, d2), DateRange(d3, d4)] | |
@given(date_ranges()) | |
def test_remove_range__exact_match(dr): | |
assume(dr.begin != dr.end) | |
assert list(remove_range([dr], dr)) == [] | |
@given(st.lists(date_ranges()), date_ranges()) | |
def test_remove_range__idempotent(ranges, dr): | |
assume(dr.begin != dr.end) | |
remove = lambda rs, dr: list(remove_range(rs, dr)) | |
removed = remove(ranges, dr) | |
assert removed == remove(removed, dr) | |
def test_remove_ranges_example(): | |
d1 = datetime(1999, 1, 1, 0, 0, tzinfo=pytz.UTC) | |
d2 = datetime(2000, 1, 1, 0, 0, tzinfo=pytz.UTC) | |
d3 = datetime(2000, 1, 1, 0, 0, 1, tzinfo=pytz.UTC) | |
result = list(remove_range([DateRange(d1, d3)], DateRange(d2, d3))) | |
assert result == [DateRange(d1, d2)] | |
def test_remove_ranges__idempotent_example(): | |
ranges = [ | |
DateRange( | |
begin=datetime(1999, 1, 1, 0, 0, tzinfo=pytz.UTC), end=datetime(2000, 1, 1, 0, 0, 1, tzinfo=pytz.UTC) | |
), DateRange( | |
begin=datetime(2000, 1, 1, 0, 0, 2, tzinfo=pytz.UTC), end=datetime(2000, 1, 1, 0, 0, 3, tzinfo=pytz.UTC) | |
) | |
] | |
range_to_remove = DateRange( | |
begin=datetime(2000, 1, 1, 0, 0, tzinfo=pytz.UTC), end=datetime(2001, 1, 1, 0, 0, tzinfo=pytz.UTC) | |
) | |
remove = lambda r: list(remove_range(r, range_to_remove)) | |
removed = remove(ranges) | |
assert removed == remove(removed) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment