Last active
April 24, 2016 07:20
-
-
Save beenje/6150400 to your computer and use it in GitHub Desktop.
Twisted gateway
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import time | |
from twisted.internet import defer | |
from twisted.internet.protocol import ServerFactory | |
from twisted.protocols.basic import LineReceiver | |
from twisted.python import log | |
class BasicProtocol(LineReceiver): | |
@defer.inlineCallbacks | |
def lineReceived(self, line): | |
yield self.messageReceived(line) | |
@defer.inlineCallbacks | |
def messageReceived(self, message): | |
try: | |
yield self.factory.messageReceived(message) | |
except AttributeError: | |
pass | |
class BasicGatewayFactory(ServerFactory): | |
protocol = BasicProtocol | |
def __init__(self, service, channel): | |
self.service = service | |
self.channel = channel | |
@defer.inlineCallbacks | |
def messageReceived(self, message): | |
yield self.service.publish(self.channel, message) | |
class RedisPublishService(object): | |
def __init__(self, factory): | |
""" | |
@param factory: redis client factory | |
""" | |
self.factory = factory | |
@defer.inlineCallbacks | |
def publish(self, channel, message): | |
log.msg("Publish message {} on {}".format(message, channel)) | |
yield self.factory.client.publish(channel, message) | |
timestamp = int(time.time() * 1000) | |
# Include the timestamp in the value to allow | |
# duplicate message | |
value = json.dumps({"timestamp": timestamp, "message": message}) | |
log.msg("Store message in {} sorted set with score {}".format( | |
channel, timestamp)) | |
# Set the timestamp as score to easily fetch the values within a | |
# time period using zrangebyscore | |
yield self.factory.client.zadd(channel, timestamp, value) | |
if __name__ == '__main__': | |
import sys | |
from twisted.internet import reactor | |
from txredis.client import RedisClientFactory | |
log.startLogging(sys.stdout) | |
redis_factory = RedisClientFactory() | |
reactor.connectTCP('localhost', 6379, redis_factory) | |
redis_pub_service = RedisPublishService(redis_factory) | |
gw_factory = BasicGatewayFactory(redis_pub_service, "test") | |
reactor.listenTCP(8000, gw_factory) | |
reactor.run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from twisted.internet import reactor, defer, protocol | |
from twisted.python import log | |
from twisted.test import proto_helpers | |
from twisted.trial.unittest import TestCase | |
from txredis.client import RedisSubscriber, RedisClientFactory | |
from txredis.testing import REDIS_HOST, REDIS_PORT | |
from gateway.example import BasicGatewayFactory, RedisPublishService | |
class GatewayServiceTestCase(TestCase): | |
@defer.inlineCallbacks | |
def setUp(self): | |
self.redis_factory = RedisClientFactory() | |
reactor.connectTCP(REDIS_HOST, REDIS_PORT, self.redis_factory) | |
yield self.redis_factory.deferred | |
self.redis_pub_service = RedisPublishService(self.redis_factory) | |
self.factory = BasicGatewayFactory(self.redis_pub_service, "test") | |
self.server = self.factory.buildProtocol(None) | |
self.transport = proto_helpers.StringTransportWithDisconnection() | |
self.transport.protocol = self.server | |
self.server.makeConnection(self.transport) | |
class MySubscriber(RedisSubscriber): | |
def __init__(self, *args, **kwargs): | |
RedisSubscriber.__init__(self, *args, **kwargs) | |
self.msg_channel = None | |
self.msg_message = None | |
self.msg_received = defer.Deferred() | |
def messageReceived(self, channel, message): | |
log.msg("Message received!") | |
self.msg_channel = channel | |
self.msg_message = message | |
self.msg_received.callback(None) | |
self.msg_received = defer.Deferred() | |
clientCreator = protocol.ClientCreator(reactor, MySubscriber) | |
self.subscriber = yield clientCreator.connectTCP(REDIS_HOST, | |
REDIS_PORT) | |
yield self.subscriber.subscribe("test") | |
def tearDown(self): | |
self.subscriber.transport.loseConnection() | |
self.redis_factory.continueTrying = 0 | |
self.redis_factory.stopTrying() | |
if self.redis_factory.client: | |
self.redis_factory.client.setTimeout(None) | |
self.redis_factory.client.transport.loseConnection() | |
self.transport.loseConnection() | |
@defer.inlineCallbacks | |
def test_messageReceived(self): | |
cb = self.subscriber.msg_received | |
yield self.server.dataReceived('HELLO1\r\n') | |
yield cb | |
self.assertEqual(self.subscriber.msg_channel, "test") | |
self.assertEqual(self.subscriber.msg_message, "HELLO1") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment