Created
November 22, 2011 14:10
-
-
Save jamesp/1385745 to your computer and use it in GitHub Desktop.
A simple cache for keeping timeseries data in memory
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bisect import bisect_left, bisect_right | |
class TimeseriesCache(object): | |
"""Store a timeseries of data in memory for random access.""" | |
def __init__(self, ttl=None): | |
self.timestamps = [] | |
self.data = [] | |
self.ttl = ttl | |
def add(self, timestamp, data): | |
"""Add a set of time-value data to the cache""" | |
index = bisect_right(self.timestamps, timestamp) | |
self.timestamps.insert(index, timestamp) | |
self.data.insert(index, data) | |
if self.ttl: | |
self.__sweep(timestamp) | |
def __sweep(self, timestamp): | |
"""Remove ticks older than the ttl.""" | |
index = bisect_right(self.timestamps, timestamp - self.ttl) | |
self.timestamps = self.timestamps[index:] | |
self.data = self.data[index:] | |
def get(self, timestamp, default=None): | |
"""Returns the data value stored for the time specified.""" | |
index = bisect_right(self.timestamps, timestamp) | |
if index == 0: | |
return default | |
else: | |
return self.data[index - 1] | |
def __setitem__(self, timestamp, data): | |
self.add(timestamp, data) | |
def __getitem__(self, timestamp): | |
"""Returns the last available data for a given timestamp. | |
e.g. ts[4] #=> 'A' | |
If a slice is used, returns a list of tuples: (timestamp, data) | |
that fall within the slice range. | |
e.g. ts[3:5] #=> [(3.4, 'A'), (3.8, 'B'), (4.4, 'C'), (4.9, 'D')] | |
If a slice with step is used, returns a data item for each interval | |
of step from start to stop. | |
e.g. ts[10:20:3] #=> [(10, 'A'), (13, 'B'), (16, 'C'), (19, 'C')] | |
Returns a data item or a list of (timestamp, data) pairs.""" | |
if isinstance(timestamp, slice): | |
i, j = None, None | |
if timestamp.start: | |
i = bisect_left(self.timestamps, timestamp.start) | |
if timestamp.stop: | |
j = bisect_right(self.timestamps, timestamp.stop) | |
if timestamp.step: | |
t = timestamp.start | |
items = [] | |
while t <= timestamp.stop: | |
items.append((t, self[t])) | |
t += timestamp.step | |
return items | |
return zip(self.timestamps[i:j], self.data[i:j]) | |
else: | |
i = bisect_right(self.timestamps, timestamp) | |
if i == 0: | |
return None | |
else: | |
return self.data[i - 1] | |
def head(self): | |
"""Return the most recent data in the cache.""" | |
return self.data[-1] | |
if __name__ == '__main__': | |
c = TimeseriesCache() | |
c.add(1, 'A') | |
c.add(2, 'B') | |
c.add(3, 'C') | |
c.add(5, 'E') | |
c.add(4, 'D') | |
c.add(6, 'F') | |
assert(c[0] is None) | |
assert(c[1] == 'A') | |
assert(c[3.5] == 'C') | |
assert(c[78] == 'F') | |
assert(c[3.4:6] == [(4, 'D'), (5, 'E'), (6, 'F')]) | |
assert(c[4:7:0.25] == [(4, 'D'), (4.25, 'D'), (4.5, 'D'), | |
(4.75, 'D'), (5.0, 'E'), (5.25, 'E'), (5.5, 'E'), (5.75, 'E'), | |
(6.0, 'F'), (6.25, 'F'), (6.5, 'F'), (6.75, 'F'), (7.0, 'F')]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment