Created
April 18, 2012 08:50
-
-
Save jrydberg/2412129 to your computer and use it in GitHub Desktop.
clock abstraction for gevent
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2012 Johan Rydberg. | |
# Copyright 2001-2008 Twisted Matrix Laboratories. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
"""Simple abstraction of a clock that allows callers to read out the | |
current time, but also schedule future calls. Scheduled calls can be | |
cancelled and resetted. | |
The MockClock and DelayedCall classes and their functionality is based | |
on the twisted.internet.task.Clock class from Twisted which is | |
released under a MIT license. | |
""" | |
from time import time as _time | |
from gevent.event import Event | |
from gevent import spawn_later, kill as kill_greenlet, sleep as gevent_sleep | |
import gevent | |
class AlreadyCancelled(Exception): | |
pass | |
class AlreadyCalled(Exception): | |
pass | |
class DelayedCall(object): | |
"""Representation of a call in the future.""" | |
def __init__(self, clock, time, func, args, kw, cancel, reset): | |
self.clock = clock | |
self.time, self.func, self.args, self.kw = time, func, args, kw | |
self.resetter = reset | |
self.canceller = cancel | |
self.cancelled = self.called = 0 | |
def cancel(self): | |
"""Unschedule this call.""" | |
if self.cancelled: | |
raise AlreadyCancelled() | |
elif self.called: | |
raise AlreadyCalled() | |
else: | |
self.canceller(self) | |
self.cancelled = 1 | |
if self.debug: | |
self._str = str(self) | |
del self.func, self.args, self.kw | |
def reset(self, seconds_from_now): | |
"""Reschedule this call for a different time.""" | |
if self.cancelled: | |
raise AlreadyCancelled() | |
elif self.called: | |
raise AlreadyCalled() | |
else: | |
new_time = self.clock.time() + seconds_from_now | |
self.time = new_time | |
self.resetter(self) | |
def __le__(self, other): | |
return self.time <= other.time | |
class Clock(object): | |
"""Abstraction of a clock that has functionality for reporting | |
current time and scheduler functions to be called in the future. | |
""" | |
def __init__(self): | |
self._greenlets = {} | |
def _call(self, call): | |
call.called = 1 | |
del self._greenlets[call] | |
call.func(*call.args, **call.kw) | |
def _canceller(self, call): | |
"""Cancel delayed call.""" | |
greenlet = self._greenlets.pop(call, None) | |
if greenlet is not None: | |
kill_greenlet(greenlet) | |
def _resetter(self, call): | |
"""Schedule or reschedule delayed call.""" | |
self._canceller(call) | |
self._greenlets[call] = spawn_later( | |
call.time - self.time(), self._call, call) | |
def sleep(self, seconds=0): | |
"""Sleep for C{seconds}.""" | |
gevent_sleep(seconds) | |
advance = sleep | |
def call_later(self, seconds, fn, *args, **kw): | |
"""Call function C{fn} at a later time.""" | |
dc = DelayedCall(self, self.time() + seconds, | |
fn, args, kw, self._canceller, self._resetter) | |
self._resetter(dc) | |
return dc | |
def time(self): | |
"""Return current time.""" | |
return _time() | |
class MockClock(object): | |
"""Provide a deterministic, easily-controlled version of | |
L{Clock}. | |
This is commonly useful for writing deterministic unit tests for | |
code which schedules events using this API. | |
""" | |
right_now = 0.0 | |
def __init__(self): | |
self.calls = [] | |
def time(self): | |
"""Pretend to be time.time().""" | |
return self.right_now | |
def call_later(self, seconds, fn, *a, **kw): | |
dc = DelayedCall(self, self.time() + seconds, | |
fn, a, kw, self.calls.remove, lambda c: None) | |
self.calls.append(dc) | |
self.calls.sort(lambda a, b: cmp(a.time, b.time)) | |
return dc | |
def sleep(self, amount=0): | |
"""Sleep current greenlet for the specified amount.""" | |
ev = Event() | |
self.call_later(amount, ev.set) | |
ev.wait() | |
def advance(self, amount=0): | |
"""Move time on this clock forward by the given amount and run | |
whatever pending calls should be run. | |
""" | |
# First we yield the control so that other greenlets have a | |
# chance to run. | |
gevent_sleep() | |
future = self.right_now + amount | |
while self.calls and self.calls[0].time <= future: | |
call = self.calls.pop(0) | |
self.right_now = call.time | |
call.called = 1 | |
call.func(*call.args, **call.kw) | |
gevent_sleep() | |
self.right_now = future | |
def patch_gevent(c=None, time=False): | |
"""Patch gevent itself so that gevent.sleep and gevent.spawn_later | |
uses a clock. | |
@param time: If true, also patch C{time.time} to use the clock. | |
""" | |
if c is None: | |
c = Clock() | |
_gevent = __import__('gevent') | |
_gevent.sleep = c.sleep | |
# FIXME: we need to wrap this in something that looks like a | |
# greenlet so that it can be killed with gevent.kill. | |
_gevent.spawn_later = c.call_later | |
if time: | |
_time = __import__('time') | |
_time.time = c.time | |
if __name__ == '__main__': | |
def test(c): | |
def fn(*args, **kw): | |
print c.time(), "fn", args, kw | |
dc = c.call_later(2, fn, 1, 2, a="a", b="b") | |
print "start to sleep" | |
c.sleep(2) | |
print "back again" | |
def test2(): | |
import gevent, time | |
def fn(*args, **kw): | |
print time.time(), "fn", args, kw | |
print "start to sleep" | |
gevent.spawn_later(2, fn, 1, 2, a="a", b="b") | |
gevent.sleep(2) | |
print "back again" | |
c = MockClock() | |
gevent.spawn(test, c) | |
c.advance(4) | |
patch_gevent(c, time=True) | |
gevent.spawn(test2) | |
c.advance(4) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment