Last active
December 19, 2015 02:38
-
-
Save exarkun/5884100 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Run `nc -l -p 8181`. | |
Then run `python chunkypushproducer.py tcp:localhost:port=8181`. | |
Depending on your platform/hardware, you may need to tweak the | |
multipliers or sleep interval in `chunkyWorkload` to trigger | |
interesting behavior. These values seem to do a pretty good | |
job on my system, though. The expected output is something like: | |
Workload paused | |
Workload resumed | |
Workload paused | |
Workload resumed | |
Workload paused | |
Workload resumed | |
Workload paused | |
Workload resumed | |
While the nc window fills up with massive spam. This demonstrates | |
that producers are paused and resumed based on available send buffer | |
space, even if they are not continuously writing data (as this example | |
demonstrates, writes may be scheduled on a time-based system; the | |
producer will still be paused and resumed). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if __name__ == '__main__': | |
import sys | |
import chunkypushproducer | |
raise SystemExit(chunkypushproducer.main(sys.argv)) | |
from zope.interface import implementer | |
from twisted.internet.interfaces import IPushProducer | |
from twisted.internet.task import react, deferLater, cooperate | |
from twisted.internet.protocol import Protocol | |
from twisted.internet.endpoints import clientFromString, connectProtocol | |
def chunkyWorkload(reactor, protocol): | |
while True: | |
protocol.send(b"Hello world " * 1024 * 10) | |
yield deferLater(reactor, 2, lambda: None) | |
@implementer(IPushProducer) | |
class WorkloadProtocol(Protocol): | |
def __init__(self, workload): | |
self.workload = workload | |
def connectionMade(self): | |
self.transport.registerProducer(self, True) | |
gen = self.workload(self.transport.reactor, self) | |
self.task = cooperate(gen) | |
def send(self, data): | |
self.transport.write(data) | |
def pauseProducing(self): | |
print 'Workload paused' | |
self.task.pause() | |
def resumeProducing(self): | |
print 'Workload resumed' | |
self.task.resume() | |
def stopProducing(self): | |
print 'Workload stopped' | |
self.task.stop() | |
def connectionLost(self, reason): | |
print 'Connection lost' | |
self.task.stop() | |
def connect(reactor, endpointDescription): | |
endpoint = clientFromString(reactor, endpointDescription) | |
protocol = WorkloadProtocol(chunkyWorkload) | |
d = connectProtocol(endpoint, protocol) | |
d.addCallback(lambda proto: proto.task.whenDone()) | |
return d | |
def main(argv): | |
return react(connect, argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment