Skip to content

Instantly share code, notes, and snippets.

@hellais
Created January 9, 2013 21:03
Show Gist options
  • Save hellais/4496929 to your computer and use it in GitHub Desktop.
Save hellais/4496929 to your computer and use it in GitHub Desktop.
import itertools
class BaseTask(object):
def __init__(self):
self.running = False
self.failures = 0
def _failed(self, failure):
self.failures += 1
self.failed(failure)
return
def _run(self):
d = self.run()
d.addErrback(self._failed)
d.addCallback(self.success)
return d
def success(self, result):
"""
Place here the logic to handle a successful execution of the task.
"""
pass
def failed(self, failure):
"""
Place in here logic to handle failure.
"""
pass
def run(self):
"""
Override this with the logic of your task.
Must return a deferred.
"""
pass
class TaskTimedOut(Exception):
pass
class TaskWithTimeout(BaseTask):
timeout = 5
def _timedOut(self, d):
"""Internal method for handling timeout failure"""
self.timedOut()
d.errback(TaskTimedOut())
def _run(self):
d = BaseTask._run(self)
reactor.callLater(self.timeout, self._timedOut, d)
return d
def timedOut(self):
"""
Override this with the operations to happen when the task has timed
out.
"""
pass
class Measurement(TaskWithTimeout):
def __init__(self, test_class, test_method, test_input):
"""
test_class:
is the class, subclass of NetTestCase, of the test to be run
test_method:
is a string representing the test method to be called to perform this measurement
test_input:
is the input to the test
"""
self.test_instance = test_class()
self.test_instance.input = test_input
self.test_instance.report = {}
self.test_instance._start_time = time.time()
self.test_instance._setUp()
self.test_instance.setUp()
self.test = getattr(self.test_instance, test_method)
def run(self):
d = defer.maybeDeferred(self.test)
d.addCallback(self.test_done)
d.addErrback(self.test_error)
return d
class NetTest(object):
def __init__(self, net_test_file, inputs, options):
"""
net_test_file:
is a file object containing the test to be run.
inputs:
is a generator containing the inputs to the net test.
options:
is a dict containing the opitions to be passed to the net test.
"""
self.net_test_file = net_test_file
self.inputs = inputs
self.options = options
def _findTestCases(self):
# XXX move in here the functionality from
# findTestClassesFromFile, processTest, etc.
self.test_cases = [(0,1)]
def generateMeasurements(self):
for test_input in self.inputs:
for test_class, test_method in self.test_cases:
yield Measurement(test_class, test_method, test_input)
class Measurements(object):
retries = 2
failures = []
concurrency = 10
_measurements = iter()
_active_measurements = []
def populateMeasurements(self):
"""
Take all the currently
"""
for net_test in self.netTests:
self._measurements = itertools.chain(self._measurements,
iter(net_test.generateMeasurements()))
def start(self):
"""
Start running the measurements.
"""
self.populateMeasurements()
self.runMoreMeasurements()
def availableSlots(self):
"""
Returns the number of available slots for running tests.
"""
return self.concurrency - len(self._active_measurements)
def runMeasurement(self, measurement):
d = measurement.run()
d.addCallback(self.done)
d.addCallback(self.failed)
self._active_measurements.append(measurement)
def runMoreMeasurements(self):
for measurement in self._measurements():
if self.availableSlots <= 0:
break
self.runMeasurement(measurement)
def done(self, result, measurement):
"""
We have successfully completed a measurement.
"""
self._active_measurements.remove(measurement)
def failed(self, failure, measurement):
"""
The measurement has failed to complete.
"""
self._active_measurements.remove(measurement)
self.failures.append((failure, measurement))
if measurement.failures < self.retries:
self._measurements = itertools.chain(self._measurements,
iter(measurement))
class MeasurementManager(object):
reporters = []
def __init__(self):
self.netTests = []
self.measurements = Measurements()
self.measurements.netTests = self.netTests
def addNetTest(self, net_test):
"""
This is called to add a NetTest to the list of running network tests.
"""
self.netTests.append(net_test)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment