Created
July 14, 2015 18:44
-
-
Save SamPenrose/8316485d0b7d7ba881fc to your computer and use it in GitHub Desktop.
v2_v4_overlap
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
def get_overlap(pair, v2_extractor=None, v4_extractor=None): | |
v2_blobs = pair['v2'].get('data', {}).get('days', {}) # {'YYYY-MM-DD': dict} | |
v4_blobs = pair['v4'] # [{'creationDate': 'YYYY-MM-DD:...', 'k': val, ...}, ...] | |
# One blob per date in v2, multiple per date in v4 | |
results = {'v2': {}, 'v4': defaultdict(list)} | |
if not (v2_blobs and v4_blobs): | |
return results | |
v2_dates = v2_blobs.keys() | |
v2_dates.sort() | |
v4_blobs.sort(key=lambda d: d['creationDate']) | |
v2_start, v2_end = v2_dates[0], v2_dates[-1] # possibly same | |
start = end = None | |
# Find overlap and walk v4 at same time. | |
for v4 in v4_blobs: | |
v4_date = v4['creationDate'][:10] | |
# Walk start up as far as we must. | |
if v4_date < v2_start: | |
# If v2 is entirely after v4, we never get past here. | |
continue | |
elif not start: | |
start = v4_date | |
# We have at least one overlapping date. | |
if v4_date <= v2_end: | |
# Walk end up as far as we can. | |
end = v4_date | |
else: | |
break | |
value = v4_extractor(v4) if v4_extractor else v4 | |
results['v4'][v4_date].append(value) | |
if end is None: # We never reached last line of the loop. | |
return results | |
for v2_date in v2_dates: | |
if v2_date < start: | |
continue | |
if v2_date > end: | |
break | |
value = v2_extractor(v2_blobs[v2_date]) if v2_extractor else v2_blobs[v2_date] | |
results['v2'][v2_date] = value | |
return results | |
def test_get_overlap(): | |
d1 = '1999-12-31' | |
dt1 = d1 + 'T23:59:59.999Z' | |
d2 = '2000-01-01' | |
dt2 = d2 + 'T00:00:00.001Z' | |
simple = {'v2': {'data': {'days': {d1: None, d2: test_get_overlap}}}, | |
'v4': [{'creationDate': dt1}, | |
{'creationDate': dt2}]} | |
assert get_overlap(simple) == {'v2': simple['v2']['data']['days'], | |
'v4': {d1: [{'creationDate': dt1}], | |
d2: [{'creationDate': dt2}]}} | |
# If either or both schema versions are empty, so is the result. | |
empty_in = {'v2': {}, 'v4': []} | |
empty = {'v2': {}, 'v4': {}} | |
assert get_overlap(empty_in) == empty | |
assert get_overlap({'v2': simple['v2'], 'v4': []}) == empty | |
assert get_overlap({'v2': {'data': {'neighs': None}}, 'v4': simple['v4']}) == empty | |
d3 = '2001-02-03' | |
dt3 = d3 + 'T03:59:59.001Z' | |
dt35 = d3 + 'T03:59:59.999Z' | |
d4 = '2002-03-04' | |
dt4 = d4 + 'T13:07:57.222Z' | |
dt45 = d4 + 'T13:09:59.333Z' | |
def v2_extractor(d): | |
return d['value'] | |
all_v2 = dict([(v, {'value': i}) for i, v in enumerate([d1, d2, d3, d4])]) | |
all_v2_out = dict([(k, d['value']) for k, d in all_v2.items()]) | |
all_v4 = [{'creationDate': dt, 'last': dt[-4:]} for dt in [dt1, dt2, dt3, dt35, dt4, dt45]] | |
def v4_extractor(d): | |
return d['last'] | |
all_v4_out = {d1: ['999Z'], d2: ['001Z'], d3: ['001Z', '999Z'], d4: ['222Z', '333Z']} | |
# Perfect overlap, multiple blobs per day in v4, extractor functions | |
input = {'v2': {'data': {'days': all_v2}}, | |
'v4': all_v4} | |
output = {'v2': all_v2_out, | |
'v4': all_v4_out} | |
result = get_overlap(input, v2_extractor, v4_extractor) | |
assert result == output | |
# Remove v2's head and v4's tail | |
from copy import deepcopy | |
input2 = deepcopy(input) | |
del input2['v2']['data']['days'][d1] | |
input2['v4'].pop() | |
input2['v4'].pop() | |
output2 = deepcopy(output) | |
del output2['v2'][d1] | |
del output2['v2'][d4] | |
del output2['v4'][d1] | |
del output2['v4'][d4] | |
assert get_overlap(input2, v2_extractor, v4_extractor) == output2 | |
# And again | |
del input2['v2']['data']['days'][d2] | |
del output2['v2'][d2] | |
del output2['v4'][d2] | |
assert get_overlap(input2, v2_extractor, v4_extractor) == output2 | |
# And again; now we're empty | |
input2['v4'].pop() | |
input2['v4'].pop() | |
del output2['v2'][d3] | |
del output2['v4'][d3] | |
assert output2 == empty | |
assert get_overlap(input2, v2_extractor, v4_extractor) == output2 | |
# Since the algorithm treats v2 and v4 asymmetrically, also test opposite | |
input3 = deepcopy(input) | |
output3 = deepcopy(output) | |
del input3['v2']['data']['days'][d4] | |
del output3['v2'][d4] | |
del output3['v4'][d4] | |
assert get_overlap(input3, v2_extractor, v4_extractor) == output3 | |
input3['v4'].pop(0) | |
del output3['v2'][d1] | |
del output3['v4'][d1] | |
assert get_overlap(input3, v2_extractor, v4_extractor) == output3 | |
del input3['v2']['data']['days'][d3] | |
del output3['v2'][d3] | |
del output3['v4'][d3] | |
assert get_overlap(input3, v2_extractor, v4_extractor) == output3 | |
v4_head = input3['v4'].pop(0) | |
del output3['v2'][d2] | |
del output3['v4'][d2] | |
assert empty == output3 | |
assert get_overlap(input3, v2_extractor, v4_extractor) == output3 | |
input3['v4'] = [v4_head] + input3['v4'] | |
del input3['v2']['data']['days'][d2] | |
assert get_overlap(input3, v2_extractor, v4_extractor) == output3 | |
print "all passed" | |
test_get_overlap() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment