Last active
December 15, 2015 03:59
-
-
Save justinvdm/5198534 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class NullFilter(object): | |
| FILTERS = { | |
| 'skip': 'skip_nulls', | |
| 'zeroize': 'zeroize_nulls', | |
| } | |
| @classmethod | |
| def from_name(cls, name): | |
| attr = cls.FILTERS.get(name) | |
| return getattr(cls, attr) if attr else lambda x: x | |
| @staticmethod | |
| def skip_nulls(datapoints): | |
| return [d for d in datapoints | |
| if d['y'] is not None and d['x'] is not None] | |
| @staticmethod | |
| def zeroize_nulls(datapoints): | |
| return [d if d['y'] is not None else {'x': d['x'], 'y': 0} | |
| for d in datapoints if d['x'] is not None] | |
| class Aggregator(object): | |
| AGGREGATORS = { | |
| 'max': max, | |
| 'min': min, | |
| 'sum': sum, | |
| 'avg': (lambda vals: sum(vals) / len(vals)), | |
| } | |
| @classmethod | |
| def from_name(cls, filter_name): | |
| return cls.AGGREGATORS.get(filter_name, lambda x: x) | |
| class LastDatapointSummarizer(object): | |
| def __init__(self, bucket_size): | |
| self.bucket_size = bucket_size | |
| def __call__(self, datapoints, from_time): | |
| results = [] | |
| step = from_time | |
| next_step = step + self.bucket_size | |
| previous = datapoints[0] | |
| for current in datapoints: | |
| if current['x'] >= next_step: | |
| results.append({'x': step, 'y': previous['y']}) | |
| step = next_step | |
| next_step += self.bucket_size | |
| previous = current | |
| return results | |
| class AggregatingSummarizer(object): | |
| def __init__(self, bucket_size, aggregator): | |
| self.bucket_size = bucket_size | |
| self.aggregator = aggregator | |
| def __call__(self, datapoints, from_time): | |
| bucket = [] | |
| results = [] | |
| step = from_time | |
| next_step = step + self.bucket_size | |
| for datapoint in datapoints: | |
| if datapoint['x'] >= next_step: | |
| results.append({'x': step, 'y': self.aggregator(bucket)}) | |
| bucket = [] | |
| step = next_step | |
| next_step += self.bucket_size | |
| bucket.append(datapoint['y']) | |
| return results |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I'm sure there's a better way to achieve what I'm trying to do in the summarizers (using
__call__()). If there's a better and more pythonic way, let me know.