Skip to content

Instantly share code, notes, and snippets.

@progrium
Created July 15, 2010 23:05
Show Gist options
  • Select an option

  • Save progrium/477669 to your computer and use it in GitHub Desktop.

Select an option

Save progrium/477669 to your computer and use it in GitHub Desktop.
import time
import operator
import collections
class SampledRate(object):
"""Tool for pushing rate over time data"""
def __init__(self, frequency=1, resolution=1, callback=None):
""" frequency: Rate update frequency in seconds
resolution: Interval to average data over in seconds
callback: Optional callback when frequency is updated"""
self.frequency = frequency
self.resolution = resolution
self.callback = callback
self.samples = collections.defaultdict(int)
self.last_start = None
self.last_value = 0
def tick(self, ticks=1):
if not self.last_start:
self.last_start = int(time.time())
if int(time.time() - self.last_start) > self.frequency:
# Add empty samples
for x in range(self.frequency-len(self.samples)):
self.samples[x] = 0
self.last_value = reduce(operator.add, self.samples.values()) / self.resolution / self.frequency
self.last_start = int(time.time())
self.samples = collections.defaultdict(int)
if self.callback:
self.callback(self.last_value)
self.samples[int(time.time() / self.resolution)] += ticks
return self
def value(self):
return self.last_value
def __int__(self):
return self.last_value
def __repr__(self):
return "<SampledRate: %i avg/%is updated/%is>" % (self.last_value, self.frequency, self.resolution)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment