Skip to content

Instantly share code, notes, and snippets.

@phubbard
Created November 18, 2010 20:41
Show Gist options
  • Select an option

  • Save phubbard/705576 to your computer and use it in GitHub Desktop.

Select an option

Save phubbard/705576 to your computer and use it in GitHub Desktop.
Quick test of twisted,scheduling and scaling
#!/usr/bin/env python
"""
@date 11/18/10
@author Paul Hubbard
@brief Code up BrianF's twisted thought experiment
@note Must use 'limit descriptors 512' to run this!
=======================================================================
@inlinecallbacks
def simple(self):
# 100 ms of code here
# this takes 0 ms for the sake of argument
yield somethingspiffy()
#100 ms of code here
Whir up 10,000 of these puppies. Assume some kind of dynamic equilibrium where all the threads are
distributed evenly across the time lifespan of the function. That is, the average thread execution
is right at yield somethingspiffy(). I propose that the time between the yield releasing execution
and regaining execution is given by: 100ms * 10,000 calls = 1000 seconds. That means that any call
will take about 15 minutes to complete. That means if it's a function to test policy enforcement for
example, the system is screwed.
Even worse, this doesn't take into account work done by the yield, twisted overhead, or OS overhead.
"""
import sys
import time
import logging as log
from twisted.internet import defer, reactor
from twisted.internet.threads import deferToThread
from twisted.internet.protocol import Protocol, ClientFactory, Factory
NUM_CLIENTS = 500
def asleep(secs):
"""
@brief Do a reactor-safe sleep call. Call with yield to block until done.
@param secs Time, in seconds
@retval Deferred whose callback will fire after time has expired
"""
d = defer.Deferred()
reactor.callLater(secs, d.callback, None)
return d
class WorkerProtocol(Protocol):
@defer.inlineCallbacks
def dataReceived(self, msg):
yield self.worker_function(msg)
def sleeper(self, delay, index):
time.sleep(delay)
log.debug('thread %d sleep complete' % index)
@defer.inlineCallbacks
def worker_function(self, argument):
index = int(argument.strip())
log.debug('Worker %d running' % index)
t_zero = time.time()
# work for N millseconds - blocks the reactor!!
yield deferToThread(self.sleeper, 0.1, index)
#self.sleeper(0.1, index)
# non-blocking sleep
yield asleep(0.001)
# Blocking sleep
yield deferToThread(self.sleeper, 0.1, index)
#self.sleeper(0.1, index)
t_end = time.time()
log.info('Worker %d done, %f elapsed' % (index, (float(t_end - t_zero))))
class WorkerFactory(Factory):
protocol = WorkerProtocol
class ClientProtocol(Protocol):
def __init__(self, index):
self.index = index
def connectionMade(self):
log.debug('client got a connection')
self.transport.write('%d\n' % self.index)
self.transport.loseConnection()
class ClientFactory(ClientFactory):
protocol = ClientProtocol
def __init__(self, index):
self.index = index
def buildProtocol(self, addr):
return ClientProtocol(self.index)
@defer.inlineCallbacks
def main():
log.basicConfig(level=log.DEBUG, \
format='%(asctime)s %(levelname)s [%(funcName)s] %(message)s')
PORT = 2100
log.info('Starting up listener on port %d...' % PORT)
yield reactor.listenTCP(PORT, WorkerFactory())
log.info('Starting %d clients...' % NUM_CLIENTS)
d = []
t_zero = time.time()
for x in xrange(0, NUM_CLIENTS):
d.append(reactor.connectTCP('localhost', PORT, ClientFactory(x)))
yield defer.DeferredList(d)
t_end = time.time()
log.debug('Connections launched, %f elapsed' % float(t_end - t_zero))
if __name__ == '__main__':
main()
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment