Skip to content

Instantly share code, notes, and snippets.

@CamDavidsonPilon
Last active August 29, 2015 14:11
Show Gist options
  • Save CamDavidsonPilon/44afe62318832531d6d3 to your computer and use it in GitHub Desktop.
Save CamDavidsonPilon/44afe62318832531d6d3 to your computer and use it in GitHub Desktop.
one_bucket_theta.py results
# aftering import one_bucket_theta.py, https://gist.github.com/CamDavidsonPilon/8750e37242c4942c1984
from datetime import datetime
S = sc.parallelize([
{'start': datetime(2000,10,11), 'end': datetime(2001,01,01)},
{'start': datetime(2001,01,01), 'end': datetime(2002,01,01)},
{'start': datetime(2002,01,01), 'end': datetime(2003,01,01)},
{'start': datetime(2003,01,01), 'end': datetime(2004,01,01)},
{'start': datetime(2004,01,01), 'end': datetime(2005,01,01)},
])
T = sc.parallelize([
{'timestamp': datetime(2000,06,05)},
{'timestamp': datetime(2002,06,05)},
{'timestamp': datetime(2003,06,05)},
{'timestamp': datetime(2002,07,05)},
{'timestamp': datetime(2010,07,05)},
])
# set the keys needed for the join
S = S.map(lambda r: ((r['start'], r['end']), r))
T = T.map(lambda r: (r['timestamp'], r))
# create a lambda function to act as a join.
join_condition = lambda s,t: t[0] < s < t[1]
results = theta_join(S,T, join_condition).collect()
for r in results:
print r[1] #the original rows
"""
({'timestamp': datetime.datetime(2002, 6, 5, 0, 0)}, {'start': datetime.datetime(2002, 1, 1, 0, 0), 'end': datetime.datetime(2003, 1, 1, 0, 0)})
({'timestamp': datetime.datetime(2003, 6, 5, 0, 0)}, {'start': datetime.datetime(2003, 1, 1, 0, 0), 'end': datetime.datetime(2004, 1, 1, 0, 0)})
({'timestamp': datetime.datetime(2002, 7, 5, 0, 0)}, {'start': datetime.datetime(2002, 1, 1, 0, 0), 'end': datetime.datetime(2003, 1, 1, 0, 0)})
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment