Last active
December 26, 2015 07:19
-
-
Save kurtbrose/7113990 to your computer and use it in GitHub Desktop.
Getting very close to useful. Will convert into a real github repo soon.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import heapq | |
| import collections | |
| import time | |
| import random | |
| import greenlet | |
| #import faststat | |
| class Simulation(object): | |
| def __init__(self): | |
| self.time = Timeline() | |
| self.event = EventDispatcher(self.time) | |
| self.net = Network(self.time) | |
| def register(self, name, target): | |
| self.net.directory[name] = target | |
| def run(self, duration): | |
| self.time.run(duration) | |
| NEVER = object() # sentinel value for scheduling | |
| class Timeline(object): | |
| 'the simulated timeline / sequence of events' | |
| def __init__(self): | |
| # simulation state | |
| self.heap = [] # actions that should execute in the future | |
| self.now = 0 # current simulation time | |
| self.loop = greenlet.greenlet(self._loop) | |
| self.return_to = None | |
| # meta-data | |
| self.time_steps = 0 | |
| self.realtime = 0 | |
| def wait(self, duration): | |
| 'simulate a pause (e.g. work being done)' | |
| heapq.heappush( self.heap, (self.now + duration, greenlet.getcurrent()) ) | |
| self.next() | |
| def schedule(self, deferred, delay=0): | |
| 'schedule a deferred action (represented as a greenlet) to execute later' | |
| if type(deferred) is not greenlet.greenlet: | |
| deferred = greenlet.greenlet(deferred, self.loop) | |
| if delay is not NEVER: | |
| heapq.heappush( self.heap, (self.now + delay, deferred)) | |
| return deferred | |
| def next(self): | |
| 'advance the timeline to the next event' | |
| self.time_steps += 1 | |
| self.now, next = heapq.heappop(self.heap) | |
| next.switch() | |
| def _loop(self): | |
| while 1: | |
| while self.heap: | |
| self.next() | |
| self.return_to.switch() | |
| def run(self, duration): | |
| self.return_to = greenlet.getcurrent() | |
| self.schedule(greenlet.getcurrent(), duration) | |
| #schedule self, which will bounce back to this function and return | |
| start = time.time() | |
| self.loop.switch() | |
| #alternatively, if there are no events the loop will switch back | |
| self.realtime += time.time() - start | |
| def __repr__(self): | |
| return "<Timeline simtime={0} realtime={1} steps={2} pending={3}>".format( | |
| self.now, self.realtime, self.time_steps, len(self.heap)) | |
| TIMED_OUT = object() | |
| class EventDispatcher(object): | |
| def __init__(self, timeline): | |
| self.listeners = {} | |
| self.timeline = timeline | |
| def wait(self, event, timeout=NEVER): | |
| do_once = self.timeline.schedule(greenlet.getcurrent().switch, timeout) | |
| listeners = self.listeners.setdefault(event, {}) | |
| listeners[do_once] = TIMED_OUT | |
| self.timeline.next() # will resume execution when either event has triggered, or timeout occurred | |
| result = self.listeners[event][do_once] | |
| del self.listeners[event][do_once] | |
| if not self.listeners[event]: | |
| del self.listeners[event] | |
| if result is TIMED_OUT: | |
| raise TimeoutError(timeout) # more info here | |
| return result | |
| def trigger(self, event, data=None): | |
| listeners = self.listeners.get(event, ()) | |
| for glet in listeners: | |
| listeners[glet] = data | |
| self.timeline.schedule(glet) | |
| class Network(object): | |
| 'the simulated network fabric' | |
| def __init__(self, timeline, directory=None, connect_delay=None): | |
| self.timeline = timeline | |
| self.directory = directory or {} | |
| self.connect_delay = connect_delay | |
| self.num_connects = 0 | |
| def connect(self, name): | |
| self.num_connects += 1 | |
| if self.connect_delay: | |
| self.timeline.wait(self.connect_delay) | |
| return self.directory[name] | |
| class Request(object): | |
| ''' | |
| Represents a network request. | |
| To wait on the request, simply call sim.event.wait(request). | |
| ''' | |
| __slots__ = ('response', 'start', 'finish', 'sim', 'target') | |
| def __init__(self, sim, target): | |
| self.sim = sim | |
| self.target = target | |
| self.start = sim.time.now | |
| sim.net.directory[target](self) | |
| def finish(self, response): | |
| self.response = response | |
| self.finish = self.sim.now | |
| self.sim.event.trigger(self) | |
| class ConnectionRefused(Exception): | |
| pass | |
| class TimeoutError(Exception): | |
| pass | |
| class Router(object): | |
| def __init__(self, name, servers): | |
| self.name = name | |
| self.servers = servers | |
| self.num_requests = 0 | |
| def __call__(self, req): | |
| self.num_requests += 1 | |
| return self.servers[self.num_requests % len(self.servers)](req) | |
| def __repr__(self): | |
| return "<Router name={0} num_requests={1}>".format(self.name, self.num_requests) | |
| _QUEUE_HAS_ITEM = object() | |
| class Server(object): | |
| 'a simulated physical server instance' | |
| def __init__(self, simulation, num_workers, queue_size): | |
| self.queue_size = queue_size | |
| self.num_workers = num_workers | |
| self.sim = simulation | |
| self.queue = collections.deque() | |
| self.workers = [self.sim.time.schedule(self._worker_run) for i in range(num_workers)] | |
| def enqueue(self, req): | |
| if len(self.queue) > self.queue_size: | |
| raise ConnectionRefused() | |
| self.queue.append(req) | |
| self.sim.event.trigger((self, _QUEUE_HAS_ITEM)) | |
| def _worker_run(self): | |
| while 1: | |
| while self.queue: | |
| req = self.queue.popleft() | |
| self.do_work(req) | |
| self.sim.event.trigger(req) | |
| self.sim.event.wait((self, _QUEUE_HAS_ITEM)) | |
| def do_work(self, req): | |
| self.sim.time.wait(0.1) | |
| req.response = "fooo" | |
| def __call__(self, req): | |
| self.enqueue(req) | |
| class PoissonProcess(object): | |
| 'represents events occurring independently' | |
| def __init__(self, simulation, event, requests_per_second): | |
| self.rps = requests_per_second | |
| self.sim = simulation | |
| self.event = event | |
| simulation.time.schedule(self._run) | |
| def _run(self): | |
| while 1: | |
| delay = random.expovariate(self.rps) | |
| self.sim.time.wait(delay) | |
| self.sim.time.schedule(self.event) | |
| class TrafficSource(PoissonProcess): | |
| def __init__(self, simulation, target, requests_per_second): | |
| self.sim = simulation | |
| self.target = target | |
| self.success = 0 | |
| self.failure = 0 | |
| super(TrafficSource, self).__init__( | |
| simulation, self.event, requests_per_second) | |
| def event(self): | |
| try: | |
| req = Request(self.sim, self.target) | |
| self.sim.event.wait(req) | |
| self.success += 1 | |
| except: | |
| self.failure += 1 | |
| SWITCHES = 0 | |
| def _trace(event, src_dst): | |
| global SWITCHES | |
| SWITCHES += 1 | |
| src, dst = src_dst | |
| print event, _glet_repr(src), "==>", _glet_repr(dst) | |
| def _glet_repr(glet): | |
| stack = [] | |
| cur = glet.gr_frame | |
| while cur: | |
| stack.append(cur.f_code) | |
| cur = cur.f_back | |
| stack = [e.co_name for e in stack[::-1]] | |
| if stack: | |
| stack = "()->".join(stack) | |
| else: | |
| stack = "(none)" | |
| return "<greenlet id={0} stack={1}>".format(id(glet) % 1024, stack) | |
| def test(): | |
| sim = Simulation() | |
| timeline = sim.time | |
| router = Router("vip1", [Server(sim, 3, 100) for i in range(3)]) | |
| sim.register("vip1", router) | |
| traffic = TrafficSource(sim, "vip1", 100) | |
| #greenlet.settrace(_trace) | |
| print timeline.heap | |
| timeline.run(100) | |
| print timeline | |
| print router | |
| print "successes", traffic.success, "failures", traffic.failure | |
| print "SWITCHES", SWITCHES | |
| if __name__ == "__main__": | |
| test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment