Skip to content

Instantly share code, notes, and snippets.

@Daenyth
Last active October 11, 2018 17:19
Show Gist options
  • Save Daenyth/8e822b4e230e8613f7bcb1ef74167c79 to your computer and use it in GitHub Desktop.
Save Daenyth/8e822b4e230e8613f7bcb1ef74167c79 to your computer and use it in GitHub Desktop.
Python date range interval storage, merging, compaction
# coding: utf-8
"""
Provides logic for tracking known and missing date ranges.
It can be used to track data acquisition from an external API.
It allows you to say: "I need data from D1-D2, and I have these chunks: [Dn..Dm]; what date ranges am I missing?"
Data can be tracked per item, for example:
ScheduleKey("GA", "credential_id", "http://google.com") or
ScheduleKey("Marketo", "instance_id", "activities")
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import itertools
from datetime import datetime
import pytz
from future.utils import text_type
from psycopg2._psycopg import cursor as Cursor # pylint: disable=unused-import,no-name-in-module
from typing import Any, Dict, Iterable, Iterator, List, NamedTuple, Optional, Tuple, cast # pylint: disable=unused-import
from dbutil import insert_values
class DateRange(NamedTuple('DateRange', [('begin', datetime), ('end', datetime)])):
DATE_FMT = '%Y-%m-%dT%H:%M:%SZ'
@property
def to_db(self):
# type: () -> Tuple[text_type, text_type]
return self.begin.strftime(self.DATE_FMT), self.end.strftime(self.DATE_FMT)
@classmethod
def from_db(cls, range_str):
# type: (text_type) -> 'DateRange'
begin, end = range_str.lstrip('(').rstrip(')').split(',', 2)
to_date = lambda datestr: datetime.strptime(datestr, cls.DATE_FMT).replace(tzinfo=pytz.UTC)
return cls(to_date(begin), to_date(end))
ScheduleKey = NamedTuple('ScheduleKey', [('data_type', text_type), ('data_source', text_type), ('data_id', text_type)])
RangeInfo = NamedTuple(
'RangeInfo',
[
('known_ranges', List[DateRange]),
('start', datetime), # The date when we should begin collecting info
('finish', Optional[datetime]), # The date when we should stop collecting info
]
)
def range_gaps(ranges, hard_start, hard_end):
# type: (Iterable[DateRange], datetime, datetime) -> Iterator[DateRange]
assert hard_start < hard_end
sorted_ranges = iter(sorted(filter(lambda r: r.begin != r.end, ranges)))
try:
current = next(sorted_ranges)
except StopIteration:
yield DateRange(hard_start, hard_end)
return
if current.begin > hard_start:
yield DateRange(hard_start, current.begin)
current_end = current.end
for next_ in sorted_ranges:
if next_.begin > current_end:
yield DateRange(current_end, next_.begin)
current_end = max(current_end, next_.end)
if current_end < hard_end:
yield DateRange(current_end, hard_end)
def merge_ranges(new_ranges, known_ranges):
# type: (Iterable[DateRange], Iterable[DateRange]) -> Iterable[DateRange]
return compact_ranges(itertools.chain(new_ranges, known_ranges))
def remove_range(ranges, range_to_remove): # pylint: disable=too-many-branches
# type: (Iterable[DateRange], DateRange) -> Iterator[DateRange]
assert range_to_remove.begin != range_to_remove.end
iter_ranges = compact_ranges(ranges)
for range in iter_ranges:
if range.end <= range_to_remove.begin:
yield range
continue
elif range.begin >= range_to_remove.end:
yield range
break
elif range.begin < range_to_remove.begin:
if range.begin < range_to_remove.end:
yield DateRange(range.begin, range_to_remove.begin)
continue
elif range.end > range_to_remove.end:
yield DateRange(range.begin, range_to_remove.begin)
yield DateRange(range_to_remove.end, range.end)
break
elif range.end > range_to_remove.begin:
yield DateRange(begin=range.begin, end=range_to_remove.begin)
continue
else:
yield DateRange(begin=range_to_remove.end, end=range.end)
continue
elif range.begin > range_to_remove.begin:
if range.end < range_to_remove.end:
continue
elif range.end > range_to_remove.end:
yield DateRange(range_to_remove.end, range.end)
break
elif range.begin == range_to_remove.begin:
if range.end == range_to_remove.end:
break
elif range.end < range_to_remove.end:
continue
elif range.end > range_to_remove.end:
yield DateRange(range_to_remove.end, range.end)
continue
raise ValueError("Shouldn't be able to reach here. range={} remove={}".format(range, range_to_remove))
for range in iter_ranges:
yield range
def compact_ranges(ranges):
# type: (Iterable[DateRange]) -> Iterator[DateRange]
"""Compact ranges by replacing overlapping ranges with one range covering the whole span."""
iter_ranges = iter(sorted(filter(lambda r: r.begin != r.end, ranges)))
current = next(iter_ranges)
for next_ in iter_ranges:
if current == next_:
continue # Remove duplicates
if current.end < next_.begin:
yield current # No range overlap - yield and then move on
current = next_
continue
# If we reach here we know that next_.begin < current.end, so they overlap.
# Change the overlap into a single range.
current = DateRange(current.begin, max(current.end, next_.end))
continue
if current.begin != current.end:
yield current
def get_ranges(cursor, key):
# type: (Cursor, ScheduleKey) -> Optional[RangeInfo]
# Return Optional[List] because callers need to distinguish between row-missing and row-present-with-no-data
sql = """
SELECT known_ranges, start, finish FROM api_schedule_status
WHERE data_id = %(data_id)s AND data_source = %(data_source)s AND data_type = %(data_type)s
"""
cursor.execute(sql, dict(data_id=key.data_id, data_source=key.data_source, data_type=key.data_type))
row = cursor.fetchone() # type: Optional[Tuple[text_type, datetime, Optional[datetime]]]
if row is None:
return None
ranges_ = [DateRange.from_db(range_) for range_ in row[0]]
start, finish = row[1], row[2]
return RangeInfo(ranges_, start, finish)
def get_multiple_ranges(cursor, keys):
# type: (Cursor, List[ScheduleKey]) -> Dict[ScheduleKey, RangeInfo]
join_table_sql = """
CREATE TEMP TABLE schedule_keys
(data_type TEXT, data_source TEXT, data_id TEXT)
ON COMMIT DROP;
INSERT INTO schedule_keys VALUES {};
"""
select_sql = """
SELECT s.data_type, s.data_source, s.data_id, s.known_ranges, s.start, s.finish
FROM api_schedule_status s
JOIN schedule_keys k
ON k.data_id = s.data_id and k.data_source = s.data_source AND k.data_type = s.data_type
"""
insert_values(cursor, join_table_sql, cast(List[Tuple[Any, ...]], keys))
cursor.execute(select_sql)
return {
ScheduleKey(r.data_type, r.data_source, r.data_id):
RangeInfo([DateRange.from_db(kr) for kr in r.known_ranges], r.start, r.finish)
for r in cursor.fetchall()
}
def upsert_ranges(cursor, key, new_ranges, start, end):
# type: (Cursor, ScheduleKey, List[DateRange], datetime, Optional[datetime]) -> Iterable[DateRange]
while True:
try:
return update_ranges(cursor, key, new_ranges)
except LookupError:
inserted = insert_ranges(cursor, key, new_ranges, start, end)
if not inserted:
continue
return new_ranges
def update_ranges(cursor, key, new_ranges):
# type: (Cursor, ScheduleKey, List[DateRange]) -> Iterable[DateRange]
compare_and_set_update = """
UPDATE api_schedule_status
SET known_ranges = %(new_ranges)s
WHERE data_id = %(data_id)s
AND data_source = %(data_source)s
AND known_ranges = %(old_ranges)s
"""
while True:
range_info = get_ranges(cursor, key)
if range_info is None:
raise LookupError("No known ranges found with key {key}".format(key=key))
else:
merged_ranges = list(merge_ranges(range_info.known_ranges, new_ranges))
params = dict(
data_id=key.data_id,
data_source=key.data_source,
new_ranges=to_db(merged_ranges),
old_ranges=to_db(range_info.known_ranges)
)
cursor.execute(compare_and_set_update, params)
rows_updated = cursor.rowcount
if rows_updated != 0:
break
return merged_ranges
def insert_ranges(cursor, key, ranges, start, end):
# type: (Cursor, ScheduleKey, List[DateRange], datetime, Optional[datetime]) -> int
conditional_insert = """
INSERT INTO api_schedule_status (data_type, data_source, data_id, known_ranges, start, finish)
VALUES (%(data_type)s, %(data_source)s, %(data_id)s, %(new_ranges)s, %(start)s, %(finish)s)
ON CONFLICT (data_type, data_source, data_id) DO NOTHING
"""
params = dict(
data_id=key.data_id,
data_source=key.data_source,
data_type=key.data_type,
new_ranges=to_db(ranges),
start=start,
finish=end
)
cursor.execute(conditional_insert, params)
return cast(int, cursor.rowcount)
def to_db(ranges):
# type: (List[DateRange]) -> List[Tuple[text_type, text_type]]
return [r.to_db for r in ranges]
from __future__ import absolute_import, division, print_function, unicode_literals
import sqlalchemy as sa
from sqlalchemy import Column, Table # pylint: disable=unused-import
from sqlalchemy.dialects import postgresql as pg
from sqlalchemy.sql.schema import MetaData # pylint: disable=unused-import
def ApiScheduleStatus(metadata):
# type: (MetaData) -> Table
# CREATE TABLE api_schedule_status (
# data_id TEXT PRIMARY KEY,
# data_source TEXT PRIMARY KEY,
# data_type TEXT,
# start TEXT,
# finish TEXT,
# known_ranges text[][]
# )
return Table(
"api_schedule_status",
metadata,
Column("data_type", sa.TEXT, primary_key=True),
Column("data_source", sa.TEXT, primary_key=True),
Column("data_id", sa.TEXT, primary_key=True),
Column("start", sa.TEXT, nullable=False),
Column("finish", sa.TEXT, nullable=True),
# This is conceptually a nested array, but postgres doesn't treat multi-dimensional
# arrays differently from single-dimension arrays.
Column("known_ranges", pg.ARRAY(sa.TEXT)),
)
from __future__ import absolute_import, print_function, unicode_literals, division
import logging
from contextlib import contextmanager
from functools import partial
from timeit import default_timer
from builtins import filter as ifilter, map as imap, range
from datetime import datetime, timedelta
from dateutil.tz import tzutc
from typing import Any, List, Tuple, cast # pylint: disable=unused-import,import-error
from future.utils import text_type # pylint: disable=unused-import
def insert_values(cursor, sql_template, values):
# type: (Any, text_type, List[Tuple]) -> int
"""
Accepts a list of value tuples and bulk inserts them.
:param cursor:
:param sql_template: sql like 'INSERT INTO table (a, b) VALUES {}'.
The column list must be the same length as arity of the tuples in values.
:param values: A list of tuples
:return: Rowcount inserted
"""
if not values:
return 0
column_part = ', '.join(['%s'] * len(values[0]))
per_row_part = "({})".format(column_part)
value_template = ', '.join(safe_unicode(cursor.mogrify(per_row_part, row)) for row in values)
insert_sql_with_parameter_placeholders = sql_template.format(value_template)
cursor.execute(insert_sql_with_parameter_placeholders)
return cast(int, cursor.rowcount)
def safe_unicode(obj, strict=False):
# type: (Union[text_type, binary_type], bool) -> text_type
if isinstance(obj, binary_type):
return obj.decode('utf-8')
if strict:
assert isinstance(obj, text_type)
return obj
from __future__ import absolute_import, division, print_function, unicode_literals
from datetime import datetime
import hypothesis.strategies as st
import pytz
from hypothesis import assume, given
from date_intervals import DateRange, compact_ranges, range_gaps, remove_range
# pylint: disable=no-value-for-parameter
def basic_datetimes(min_year=None):
min_datetime = datetime(min_year, 1, 1) if min_year is not None else None
return st.datetimes(timezones=st.just(pytz.UTC), min_datetime=min_datetime).map(lambda d: d.replace(microsecond=0))
@st.composite
def date_ranges(draw, min_year=None, allow_empty=True):
d1 = draw(basic_datetimes(min_year))
d2 = draw(basic_datetimes(min_year))
if not allow_empty:
assume(d1 != d2)
start, end = sorted([d1, d2])
return DateRange(start, end)
@given(date_ranges(allow_empty=False))
def test_range_gaps__no_gaps(dr):
assert list(range_gaps([dr], dr.begin, dr.end)) == []
@given(date_ranges(allow_empty=False))
def test_range_gaps__no_known_range(dr):
"""The missing gaps when no values are known is the full start to end range"""
missing = list(range_gaps([], dr.begin, dr.end))
assert missing == [dr]
@given(st.lists(date_ranges(allow_empty=False), min_size=1))
def test_compact_ranges(ranges):
start = min(r.begin for r in ranges)
end = max(r.end for r in ranges)
gaps = lambda r: list(range_gaps(r, start, end))
compacted = list(compact_ranges(ranges))
assert len(compacted) <= len(ranges)
assert gaps(ranges) == gaps(compacted)
@given(st.lists(date_ranges(), min_size=1))
def test_compacting_range_with_missing(ranges):
start = min(r.begin for r in ranges)
end = max(r.end for r in ranges)
assume(start != end)
missing = list(range_gaps(ranges, start, end))
all_ranges = missing + ranges
compacted = list(compact_ranges(all_ranges))
assert len(compacted) == 1
@given(st.lists(date_ranges()))
def test_compact_ranges__idempotent(ranges):
compact = lambda r: list(compact_ranges(r))
compacted = compact(ranges)
assert compacted == compact(compacted)
@given(basic_datetimes())
def test_compact_ranges__bad_range(dt):
"""DateRange is inclusive at beginning and exclusive at end, so begin==end doesn't make sense"""
assert list(compact_ranges([DateRange(dt, dt)])) == []
@given(st.lists(date_ranges()))
def test_compact_ranges__sorted(ranges):
compacted = list(compact_ranges(ranges))
assert compacted == sorted(compacted)
@given(basic_datetimes(), basic_datetimes(), basic_datetimes(), basic_datetimes())
def test_range_examples(d1, d2, d3, d4):
d1, d2, d3, d4 = sorted([d1, d2, d3, d4])
assume(len({d1, d2, d3, d4}) == 4)
date_range = DateRange(d2, d3)
gaps = list(range_gaps([date_range], d1, d4))
assert gaps == [DateRange(d1, d2), DateRange(d3, d4)]
@given(date_ranges())
def test_remove_range__exact_match(dr):
assume(dr.begin != dr.end)
assert list(remove_range([dr], dr)) == []
@given(st.lists(date_ranges()), date_ranges())
def test_remove_range__idempotent(ranges, dr):
assume(dr.begin != dr.end)
remove = lambda rs, dr: list(remove_range(rs, dr))
removed = remove(ranges, dr)
assert removed == remove(removed, dr)
def test_remove_ranges_example():
d1 = datetime(1999, 1, 1, 0, 0, tzinfo=pytz.UTC)
d2 = datetime(2000, 1, 1, 0, 0, tzinfo=pytz.UTC)
d3 = datetime(2000, 1, 1, 0, 0, 1, tzinfo=pytz.UTC)
result = list(remove_range([DateRange(d1, d3)], DateRange(d2, d3)))
assert result == [DateRange(d1, d2)]
def test_remove_ranges__idempotent_example():
ranges = [
DateRange(
begin=datetime(1999, 1, 1, 0, 0, tzinfo=pytz.UTC), end=datetime(2000, 1, 1, 0, 0, 1, tzinfo=pytz.UTC)
), DateRange(
begin=datetime(2000, 1, 1, 0, 0, 2, tzinfo=pytz.UTC), end=datetime(2000, 1, 1, 0, 0, 3, tzinfo=pytz.UTC)
)
]
range_to_remove = DateRange(
begin=datetime(2000, 1, 1, 0, 0, tzinfo=pytz.UTC), end=datetime(2001, 1, 1, 0, 0, tzinfo=pytz.UTC)
)
remove = lambda r: list(remove_range(r, range_to_remove))
removed = remove(ranges)
assert removed == remove(removed)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment