Skip to content

Instantly share code, notes, and snippets.

@alastairparagas
Created July 18, 2016 15:20
Show Gist options
  • Select an option

  • Save alastairparagas/00e3a183ce8e8d6c7bbb4ac638cbc4b2 to your computer and use it in GitHub Desktop.

Select an option

Save alastairparagas/00e3a183ce8e8d6c7bbb4ac638cbc4b2 to your computer and use it in GitHub Desktop.
Testing in Python, with Pytest and Mock
import time
import channels
import django.conf
import pydash as _
import json
import re
import gevent
class Exchange(object):
"""
List of group prefixes that can be prepended to a group
for it to be detectable as a group holding channels that we
would want to send messages to, whenever a producer fires a
relevant event.
"""
group_prefixes = \
[django.conf.settings.CHANNEL_GROUP_PREFIX]
"""
Channel layer holding groups, which hold channels that
we would want to send messages to whenever a producer fires
a relevant event.
"""
channel_layer_list = \
[django.conf.settings.CHANNEL_LAYERS.keys()[0]]
def __init__(self, input_queue="exchange",
preset_bindings=True):
"""
Bootstraps the Exchange
:param input_queue: string
:param preset_bindings: boolean - eagerly load bindings
(groups -> respective channels)?
"""
for group_prefix in Exchange.group_prefixes:
if group_prefix is None:
raise ValueError("All group_prefixes must be strings")
self.queue_name = input_queue
self.__messages__ = []
self.__bindings__ = {}
if preset_bindings is True:
self.bindings_cache_update()
def bindings_cache_update(self):
for group_prefix in Exchange.group_prefixes:
for channel_layer in Exchange.channel_layer_list:
self.set_bindings(group_prefix, channel_layer)
def set_bindings(self, group_prefix, channel_layer):
"""
Mutates locally cached dict of bindings when called so that
there is always a mapping of subscribers to their designated
subscriptions
:return: void
"""
self.__bindings__ = {}
# Get all Redis nodes
redis_nodes = \
channels.channel_layers[channel_layer]._connection_list
# Obtain all the channels along with their group namespace
bindings_map = {}
for redis_node in redis_nodes:
for group in redis_node.keys("*" + group_prefix + "*"):
# Normalize name by removing the prefix
group_normalized = group.split(group_prefix, 1)[1]
bindings_map[group_normalized] = redis_node.zrange(group, 0, -1)
for group, subscribers_list in bindings_map.iteritems():
for subscriber in subscribers_list:
self.__parse_binding__(group, subscriber)
def __parse_binding__(self, bindings_map_group_key, channel_name):
"""
Parses a Django channels group namespace into a correct
self.__bindings__ path and places the subscriber/s
(identified by "channel_name") at the intended level
:param bindings_map_group_key: string
:param channel_name: string
:return: void
"""
def recursive_get(object, dotted_path, default=None):
path_list = dotted_path.split(".")
for path in path_list:
try:
object = object[path]
except KeyError:
if default is not None:
return default
else:
raise
return object
# Unpack a group string that looks like
# "workspace:empire,dataset:test,query:{empire:ghost}" to
# {"workspace": "empire", "dataset": "test"}
binding_path_descriptor = {}
for path in re.compile(",(?![^{]*})").split(bindings_map_group_key):
label, value = path.split(":", 1)
binding_path_descriptor[label] = value
# Place the subscriber into the correct self.__bindings__ path
if "workspace" in binding_path_descriptor:
workspace_name = binding_path_descriptor["workspace"]
if "query" in binding_path_descriptor:
subscribe_obj = {
"subscriber": channel_name,
"query": dict(
_.map_([
qs for qs in binding_path_descriptor["query"]
.replace("{", "", 1)\
.replace("}", "", 1)\
.split(",")
], lambda x: tuple(x.split(":", 1))
)
)
}
dataset_name = binding_path_descriptor.get(
"dataset", "*"
)
dataset_subscriptions = recursive_get(
self.__bindings__,
"%s.%s" % (workspace_name, dataset_name),
[]
)
self.__bindings__ = _.set_(
self.__bindings__,
"%s.%s" % (workspace_name, dataset_name),
_.append(dataset_subscriptions, subscribe_obj)
)
elif "dataset" in binding_path_descriptor:
subscribe_obj = {
"subscriber": channel_name
}
dataset_name = binding_path_descriptor.get("dataset")
dataset_subscriptions = recursive_get(
self.__bindings__,
"%s.%s" % (workspace_name, dataset_name),
[]
)
self.__bindings__ = _.set_(
self.__bindings__,
"%s.%s" % (workspace_name, dataset_name),
_.append(dataset_subscriptions, subscribe_obj)
)
else:
subscribe_obj = {
"subscriber": channel_name
}
workspace_subscriptions = recursive_get(
self.__bindings__,
"%s.*" % workspace_name,
[]
)
self.__bindings__ = _.set_(
self.__bindings__,
"%s.*" % workspace_name,
_.append(workspace_subscriptions, subscribe_obj)
)
def start_eventloop(self):
"""
Grab a message from a queue and have it be handled
:return: void
"""
while True:
self.__receive_message__(self.__messages__.pop(0))
time.sleep(0.001)
def add_message(self, message):
"""
Adds a message to the exchange
:param message: string
:return: void
"""
self.__messages__.append(message)
def __receive_message__(self, message):
"""
Process a received message and send to subscribers
:param message:
:return:
"""
message = _.defaults(message, {
"workspace": None,
"dataset": "*",
"measures": ["*"],
"dimension": "*"
})
if message.get("workspace") is None:
return
subscribers = \
self.__bindings__\
.get(message["workspace"], {})\
.get(message["dataset"], [])
for subscriber in subscribers:
for channel_layer in Exchange.channel_layer_list:
channels.Channel(subscriber["subscriber"], channel_layer).send({
"text": json.dumps(message)
})
from hydra.realtime.exchange import Exchange
from mock import Mock
import json
def test_parse_binding():
exchange = Exchange(preset_bindings=False)
# Test inclusion of workspace-wide subscribers
exchange.__parse_binding__(
"workspace:empire",
"subscriberId1"
)
exchange.__parse_binding__(
"workspace:town",
"subscriberId2"
)
assert {
"empire": {
"*": [{"subscriber": "subscriberId1"}]
},
"town": {
"*": [{"subscriber": "subscriberId2"}]
}
} == exchange.__bindings__
# Test inclusion of dataset-wide subscribers
exchange.__parse_binding__(
"workspace:empire,dataset:ghost",
"subscriberId3"
)
exchange.__parse_binding__(
"workspace:pond,dataset:angel",
"subscriberId4"
)
assert {
"empire": {
"*": [{"subscriber": "subscriberId1"}],
"ghost": [{"subscriber": "subscriberId3"}]
},
"town": {
"*": [{"subscriber": "subscriberId2"}]
},
"pond": {
"angel": [{"subscriber": "subscriberId4"}]
}
} == exchange.__bindings__
# Test inclusion of query-wide subscribers
exchange.__parse_binding__(
"workspace:empire,query:{param1:value1,param2:value2}",
"subscriberId5"
)
exchange.__parse_binding__(
"workspace:moat,dataset:ghost,query:{param1:value1,param2:value2}",
"subscriberId6"
)
assert {
"empire": {
"*": [
{"subscriber": "subscriberId1"},
{"subscriber": "subscriberId5",
"query": {
"param1": "value1",
"param2": "value2"
}}
]
},
"town": {
"*": [{"subscriber": "subscriberId2"}]
},
"pond": {
"angel": [{"subscriber": "subscriberId4"}]
},
"moat": {
"ghost": [{"subscriber": "subscriberId6",
"param1": "value1",
"param2": "value2"
}]
}
}
def test_instantiation(monkeypatch):
# Mock out Django settings
class MockDjangoSettings(object):
CHANNEL_GROUP_PREFIX = "hydrrltm:"
CHANNEL_LAYERS = {
"default": {}
}
monkeypatch.setattr("django.conf.settings", MockDjangoSettings())
# Mock out Redis Client API used in AsgiRedisChannelLayer
redis_mock = Mock()
redis_mock.keys = Mock(return_value=["hydrrltm:workspace:empire"])
redis_mock.smembers = Mock(return_value=["subscriberId1"])
# Mock out AsgiRedisChannelLayer, specifically its list of connections
class MockAsgiChannelLayer(object):
_connection_list = [redis_mock]
monkeypatch.setattr("channels.channel_layers", {
"default": MockAsgiChannelLayer()
})
# Let's call out the exchange
exchange = Exchange()
# Assertions
assert {
"empire": {
"*": [{"subscriber": "subscriberId1"}]
}
} == exchange.__bindings__
redis_mock.keys\
.assert_called_with(MockDjangoSettings.CHANNEL_GROUP_PREFIX + "*")
redis_mock.smembers\
.assert_called_with("hydrrltm:workspace:empire")
def test_sending_messages(monkeypatch):
class MockDjangoSettings(object):
CHANNEL_GROUP_PREFIX = "somePrefix:"
CHANNEL_LAYERS = {
"default": {}
}
monkeypatch.setattr("django.conf.settings", MockDjangoSettings())
# Mock out 2 Redis Client APIs (simulating 2 different Redis servers)
redis_mock1 = Mock()
redis_mock1.keys = Mock(
return_value=["hydrrltm:workspace:empire"]
)
redis_mock1.smembers = Mock(
return_value=["sub1", "sub2"]
)
redis_mock2 = Mock()
redis_mock2.keys = Mock(
return_value=["hydrrltm:workspace:empire,dataset:pond"]
)
redis_mock2.smembers = Mock(
return_value=["sub2"]
)
# Mock out AsgiRedisChannelLayer - 2 redis nodes
class MockAsgiChannelLayer(object):
_connection_list = [redis_mock1, redis_mock2]
monkeypatch.setattr("channels.channel_layers", {
"default": MockAsgiChannelLayer()
})
# Mock out Channels
class ChannelMock(object):
__init__ = Mock(return_value=None)
send = Mock(return_value=None)
monkeypatch.setattr("channels.Channel", ChannelMock)
exchange = Exchange()
exchange.__receive_message__({
"workspace": "empire",
"dataset": "pond",
"content": "someMessage"
})
assert {
"empire": {
"*": [{"subscriber": "sub1"}, {"subscriber": "sub2"}],
"pond": [{"subscriber": "sub2"}]
}
} == exchange.__bindings__
ChannelMock.__init__.assert_called_once_with("sub2", "default")
ChannelMock.send.assert_called_once_with({
"text": json.dumps({
"workspace": "empire",
"dataset": "pond",
"measures": ["*"],
"dimension": "*",
"content": "someMessage"
})
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment