Skip to content

Instantly share code, notes, and snippets.

@bencharb
Last active February 6, 2016 07:54
Show Gist options
  • Select an option

  • Save bencharb/4118772a4a3bbe00c42d to your computer and use it in GitHub Desktop.

Select an option

Save bencharb/4118772a4a3bbe00c42d to your computer and use it in GitHub Desktop.
date range class
# coding: utf-8
# In[258]:
import datetime
import operator
import arrow
# In[259]:
def round_time(dt, units='hours'):
dt_units = dt.day, dt.hour, dt.minute, dt.second
dt_args = [dt.year, dt.month]
td_units = 'days', 'hours', 'minutes', 'seconds',
for td_unit, dt_arg in zip(td_units, dt_units):
dt_args.append(dt_arg)
if td_unit == units:
break
newdt = datetime.datetime(*dt_args)
newdt = newdt.replace(tzinfo=dt.tzinfo)
return newdt
# In[364]:
def distribute_time(dt, count=-1, step=1, units='hours', round_time=True):
"""
Examples:
In [01]:
list(distribute_time(dt, -3))
Out[01]:
[datetime.datetime(2016, 1, 26, 22, 0),
datetime.datetime(2016, 1, 26, 21, 0),
datetime.datetime(2016, 1, 26, 20, 0)]
In [02]:
list(distribute_time(dt, 3, units='days'))
Out[02]:
[datetime.datetime(2016, 1, 26, 0, 0),
datetime.datetime(2016, 1, 27, 0, 0),
datetime.datetime(2016, 1, 28, 0, 0)]
"""
td_units = 'days', 'hours', 'minutes', 'seconds',
tzinfo = dt.tzinfo
if round_time:
dt_units = dt.day, dt.hour, dt.minute, dt.second
dt_args = [dt.year, dt.month]
for td_unit, dt_arg in zip(td_units, dt_units):
dt_args.append(dt_arg)
if td_unit == units:
break
dt = datetime.datetime(*dt_args)
dt = dt.replace(tzinfo=tzinfo)
op = operator.sub if count < 0 else operator.add
yield dt
for x in xrange(abs(count)-1):
yield op(dt, datetime.timedelta(**{units:x+step}))
# In[365]:
def clean_time(something_like_a_datetime, to_utc=False):
dt = something_like_a_datetime
if isinstance(something_like_a_datetime, datetime.datetime):
dt = something_like_a_datetime
if not dt.tzinfo:
dt = arrow.get(dt).to('UTC').datetime
else:
try:
if isinstance(dt, basestring):
dt = dateutil.parser.parse(dt)
else:
dt = arrow.get(something_like_a_datetime).datetime
except Exception:
try:
dt = arrow.get(something_like_a_datetime).datetime
except Exception:
pass
raise
if to_utc:
return arrow.get(dt).to('UTC')
return dt
# In[366]:
def to_utc(dt):
return arrow.get(dt).to('UTC').datetime
# In[367]:
SECOND = 1
MINUTE = SECOND*60
HOUR = MINUTE*60
DAY = HOUR*24
# In[368]:
unit_values = {
'seconds':SECOND,
'minutes':MINUTE,
'hours':HOUR,
'days':DAY,
}
# In[369]:
def convert_time_units(val, from_units='seconds', to_units='days'):
return int((val*unit_values[from_units])/unit_values[to_units])
# In[370]:
class InvalidDateRange(Exception):
pass
# In[371]:
class DateRange(object):
'''
Date range start to stop. Iterates values from start to stop in steps. Iterator excludes stop value.
'''
def __init__(self, start, stop, step=1, units='days', round_time=True, strftime='epoch'):
inst = super(DateRange, self).__init__()
self.units = units
self.round_time = round_time
self.step = step
self.start, self.stop = self.clean_times(start, stop, units, round_time_=self.round_time)
self.validate()
self.strftime = strftime
def validate(self):
if self.start > self.stop:
raise InvalidDateRange('Start time must be before stop time.')
if self.count == 0:
td = self.stop - self.start
secs = td.total_seconds()
if secs < MINUTE:
likely_units = 'seconds'
elif secs < HOUR:
likely_units = 'minutes'
elif secs < DAY:
likely_units = 'hours'
else:
likely_units = 'days'
raise InvalidDateRange('Perhaps the units must be in %s' % likely_units)
if self.step > self.count:
raise InvalidDateRange('Step must be less than count between times.')
@property
def count(self):
ct = self.stop - self.start
ct = convert_time_units(ct.total_seconds(), from_units='seconds', to_units=self.units)
return ct
@staticmethod
def clean_times(start, stop, units, round_time_=False):
start = clean_time(start, to_utc=False)
if isinstance(stop, int):
stop = start + datetime.timedelta(**{units:stop})
stop = clean_time(stop, to_utc=False)
assert start.tzinfo == stop.tzinfo
if round_time_:
start, stop = round_time(start, units=units), round_time(stop, units=units)
return start, stop
def as_dict(self):
return {
'start':self.start,
'stop':self.stop,
'step':self.step,
'units':self.units,
'count':self.count
}
def __repr__(self):
dct = self.as_dict()
dct['cls'] = self.__class__.__name__
return '<{cls}> {start}-{stop}, {count} {units}'.format(**dct)
def __iter__(self):
for d in distribute_time(self.start, count=self.count, units=self.units, step=self.step, round_time=self.round_time):
yield d
def format(self, strftime=None, sep='-'):
if not strftime:
strftime = self.strftime
if strftime == 'epoch':
start = unicode(arrow.get(self.start).timestamp)
stop = unicode(arrow.get(self.stop).timestamp)
else:
start = self.start.strftime(strftime)
stop = self.stop.strftime(strftime)
return sep.join([start, stop])
def __unicode__(self):
return self.format()
# In[378]:
def test_date_range():
now = datetime.datetime(2016,1,10,3)
later = now + datetime.timedelta(days=5)
# Not rounded time is unchanged
dtr_unround = DateRange(now, later, step=1, units='days', round_time=False)
# Compare without tzinfo because I don't know how to generate exactly the same tzinfo as the one in the class
comp_start_unround = dtr_unround.start.replace(tzinfo=None)
assert comp_start_unround == now
assert dtr_unround.count == 5
# Roundedd time is rounded to the day
dtr = DateRange(now, later, step=1, units='days', round_time=True)
assert dtr.start.tzinfo is not None
assert dtr.stop.tzinfo is not None
comp_start, comp_stop = dtr.start.replace(tzinfo=None), dtr.stop.replace(tzinfo=None)
assert comp_start == datetime.datetime(2016, 1, 10, 0, 0)
assert comp_stop == datetime.datetime(2016, 1, 15, 0, 0)
# Test Hours
# Test second argument as a count, not a stop date
dtr = DateRange(now, 24, step=1, units='hours', round_time=True)
comp_start, comp_stop = dtr.start.replace(tzinfo=None), dtr.stop.replace(tzinfo=None)
assert comp_start == datetime.datetime(2016, 1, 10, 3, 0)
assert comp_stop == datetime.datetime(2016, 1, 11, 3, 0)
# Minutes
dtr = DateRange(now, 3, step=1, units='minutes', round_time=True)
comp_start, comp_stop = dtr.start.replace(tzinfo=None), dtr.stop.replace(tzinfo=None)
assert comp_start == datetime.datetime(2016, 1, 10, 3, 0)
assert comp_stop == datetime.datetime(2016, 1, 10, 3, 3)
# Iteration
dtr_without_timezone = [d.replace(tzinfo=None) for d in dtr]
assert list(dtr_without_timezone) == [datetime.datetime(2016, 1, 10, 3, 0),
datetime.datetime(2016, 1, 10, 3, 1),
datetime.datetime(2016, 1, 10, 3, 2)]
# Formatting
dtr = DateRange(now, 3, step=1, units='minutes', round_time=True, strftime='epoch')
assert unicode(dtr) == u'1452394800-1452394980'
dtr = DateRange(now, 3, step=1, units='minutes', round_time=True, strftime='%Y%m%d')
assert unicode(dtr) == u'20160110-20160110'
dtr = DateRange(now, 3, step=1, units='minutes', round_time=True, strftime='%Y-%m-%dT%H:%M')
assert unicode(dtr) == u'2016-01-10T03:00-2016-01-10T03:03'
test_date_range()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment