Last active
February 6, 2016 07:54
-
-
Save bencharb/4118772a4a3bbe00c42d to your computer and use it in GitHub Desktop.
date range class
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # coding: utf-8 | |
| # In[258]: | |
| import datetime | |
| import operator | |
| import arrow | |
| # In[259]: | |
| def round_time(dt, units='hours'): | |
| dt_units = dt.day, dt.hour, dt.minute, dt.second | |
| dt_args = [dt.year, dt.month] | |
| td_units = 'days', 'hours', 'minutes', 'seconds', | |
| for td_unit, dt_arg in zip(td_units, dt_units): | |
| dt_args.append(dt_arg) | |
| if td_unit == units: | |
| break | |
| newdt = datetime.datetime(*dt_args) | |
| newdt = newdt.replace(tzinfo=dt.tzinfo) | |
| return newdt | |
| # In[364]: | |
| def distribute_time(dt, count=-1, step=1, units='hours', round_time=True): | |
| """ | |
| Examples: | |
| In [01]: | |
| list(distribute_time(dt, -3)) | |
| Out[01]: | |
| [datetime.datetime(2016, 1, 26, 22, 0), | |
| datetime.datetime(2016, 1, 26, 21, 0), | |
| datetime.datetime(2016, 1, 26, 20, 0)] | |
| In [02]: | |
| list(distribute_time(dt, 3, units='days')) | |
| Out[02]: | |
| [datetime.datetime(2016, 1, 26, 0, 0), | |
| datetime.datetime(2016, 1, 27, 0, 0), | |
| datetime.datetime(2016, 1, 28, 0, 0)] | |
| """ | |
| td_units = 'days', 'hours', 'minutes', 'seconds', | |
| tzinfo = dt.tzinfo | |
| if round_time: | |
| dt_units = dt.day, dt.hour, dt.minute, dt.second | |
| dt_args = [dt.year, dt.month] | |
| for td_unit, dt_arg in zip(td_units, dt_units): | |
| dt_args.append(dt_arg) | |
| if td_unit == units: | |
| break | |
| dt = datetime.datetime(*dt_args) | |
| dt = dt.replace(tzinfo=tzinfo) | |
| op = operator.sub if count < 0 else operator.add | |
| yield dt | |
| for x in xrange(abs(count)-1): | |
| yield op(dt, datetime.timedelta(**{units:x+step})) | |
| # In[365]: | |
| def clean_time(something_like_a_datetime, to_utc=False): | |
| dt = something_like_a_datetime | |
| if isinstance(something_like_a_datetime, datetime.datetime): | |
| dt = something_like_a_datetime | |
| if not dt.tzinfo: | |
| dt = arrow.get(dt).to('UTC').datetime | |
| else: | |
| try: | |
| if isinstance(dt, basestring): | |
| dt = dateutil.parser.parse(dt) | |
| else: | |
| dt = arrow.get(something_like_a_datetime).datetime | |
| except Exception: | |
| try: | |
| dt = arrow.get(something_like_a_datetime).datetime | |
| except Exception: | |
| pass | |
| raise | |
| if to_utc: | |
| return arrow.get(dt).to('UTC') | |
| return dt | |
| # In[366]: | |
| def to_utc(dt): | |
| return arrow.get(dt).to('UTC').datetime | |
| # In[367]: | |
| SECOND = 1 | |
| MINUTE = SECOND*60 | |
| HOUR = MINUTE*60 | |
| DAY = HOUR*24 | |
| # In[368]: | |
| unit_values = { | |
| 'seconds':SECOND, | |
| 'minutes':MINUTE, | |
| 'hours':HOUR, | |
| 'days':DAY, | |
| } | |
| # In[369]: | |
| def convert_time_units(val, from_units='seconds', to_units='days'): | |
| return int((val*unit_values[from_units])/unit_values[to_units]) | |
| # In[370]: | |
| class InvalidDateRange(Exception): | |
| pass | |
| # In[371]: | |
| class DateRange(object): | |
| ''' | |
| Date range start to stop. Iterates values from start to stop in steps. Iterator excludes stop value. | |
| ''' | |
| def __init__(self, start, stop, step=1, units='days', round_time=True, strftime='epoch'): | |
| inst = super(DateRange, self).__init__() | |
| self.units = units | |
| self.round_time = round_time | |
| self.step = step | |
| self.start, self.stop = self.clean_times(start, stop, units, round_time_=self.round_time) | |
| self.validate() | |
| self.strftime = strftime | |
| def validate(self): | |
| if self.start > self.stop: | |
| raise InvalidDateRange('Start time must be before stop time.') | |
| if self.count == 0: | |
| td = self.stop - self.start | |
| secs = td.total_seconds() | |
| if secs < MINUTE: | |
| likely_units = 'seconds' | |
| elif secs < HOUR: | |
| likely_units = 'minutes' | |
| elif secs < DAY: | |
| likely_units = 'hours' | |
| else: | |
| likely_units = 'days' | |
| raise InvalidDateRange('Perhaps the units must be in %s' % likely_units) | |
| if self.step > self.count: | |
| raise InvalidDateRange('Step must be less than count between times.') | |
| @property | |
| def count(self): | |
| ct = self.stop - self.start | |
| ct = convert_time_units(ct.total_seconds(), from_units='seconds', to_units=self.units) | |
| return ct | |
| @staticmethod | |
| def clean_times(start, stop, units, round_time_=False): | |
| start = clean_time(start, to_utc=False) | |
| if isinstance(stop, int): | |
| stop = start + datetime.timedelta(**{units:stop}) | |
| stop = clean_time(stop, to_utc=False) | |
| assert start.tzinfo == stop.tzinfo | |
| if round_time_: | |
| start, stop = round_time(start, units=units), round_time(stop, units=units) | |
| return start, stop | |
| def as_dict(self): | |
| return { | |
| 'start':self.start, | |
| 'stop':self.stop, | |
| 'step':self.step, | |
| 'units':self.units, | |
| 'count':self.count | |
| } | |
| def __repr__(self): | |
| dct = self.as_dict() | |
| dct['cls'] = self.__class__.__name__ | |
| return '<{cls}> {start}-{stop}, {count} {units}'.format(**dct) | |
| def __iter__(self): | |
| for d in distribute_time(self.start, count=self.count, units=self.units, step=self.step, round_time=self.round_time): | |
| yield d | |
| def format(self, strftime=None, sep='-'): | |
| if not strftime: | |
| strftime = self.strftime | |
| if strftime == 'epoch': | |
| start = unicode(arrow.get(self.start).timestamp) | |
| stop = unicode(arrow.get(self.stop).timestamp) | |
| else: | |
| start = self.start.strftime(strftime) | |
| stop = self.stop.strftime(strftime) | |
| return sep.join([start, stop]) | |
| def __unicode__(self): | |
| return self.format() | |
| # In[378]: | |
| def test_date_range(): | |
| now = datetime.datetime(2016,1,10,3) | |
| later = now + datetime.timedelta(days=5) | |
| # Not rounded time is unchanged | |
| dtr_unround = DateRange(now, later, step=1, units='days', round_time=False) | |
| # Compare without tzinfo because I don't know how to generate exactly the same tzinfo as the one in the class | |
| comp_start_unround = dtr_unround.start.replace(tzinfo=None) | |
| assert comp_start_unround == now | |
| assert dtr_unround.count == 5 | |
| # Roundedd time is rounded to the day | |
| dtr = DateRange(now, later, step=1, units='days', round_time=True) | |
| assert dtr.start.tzinfo is not None | |
| assert dtr.stop.tzinfo is not None | |
| comp_start, comp_stop = dtr.start.replace(tzinfo=None), dtr.stop.replace(tzinfo=None) | |
| assert comp_start == datetime.datetime(2016, 1, 10, 0, 0) | |
| assert comp_stop == datetime.datetime(2016, 1, 15, 0, 0) | |
| # Test Hours | |
| # Test second argument as a count, not a stop date | |
| dtr = DateRange(now, 24, step=1, units='hours', round_time=True) | |
| comp_start, comp_stop = dtr.start.replace(tzinfo=None), dtr.stop.replace(tzinfo=None) | |
| assert comp_start == datetime.datetime(2016, 1, 10, 3, 0) | |
| assert comp_stop == datetime.datetime(2016, 1, 11, 3, 0) | |
| # Minutes | |
| dtr = DateRange(now, 3, step=1, units='minutes', round_time=True) | |
| comp_start, comp_stop = dtr.start.replace(tzinfo=None), dtr.stop.replace(tzinfo=None) | |
| assert comp_start == datetime.datetime(2016, 1, 10, 3, 0) | |
| assert comp_stop == datetime.datetime(2016, 1, 10, 3, 3) | |
| # Iteration | |
| dtr_without_timezone = [d.replace(tzinfo=None) for d in dtr] | |
| assert list(dtr_without_timezone) == [datetime.datetime(2016, 1, 10, 3, 0), | |
| datetime.datetime(2016, 1, 10, 3, 1), | |
| datetime.datetime(2016, 1, 10, 3, 2)] | |
| # Formatting | |
| dtr = DateRange(now, 3, step=1, units='minutes', round_time=True, strftime='epoch') | |
| assert unicode(dtr) == u'1452394800-1452394980' | |
| dtr = DateRange(now, 3, step=1, units='minutes', round_time=True, strftime='%Y%m%d') | |
| assert unicode(dtr) == u'20160110-20160110' | |
| dtr = DateRange(now, 3, step=1, units='minutes', round_time=True, strftime='%Y-%m-%dT%H:%M') | |
| assert unicode(dtr) == u'2016-01-10T03:00-2016-01-10T03:03' | |
| test_date_range() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment