Created
July 18, 2016 15:20
-
-
Save alastairparagas/00e3a183ce8e8d6c7bbb4ac638cbc4b2 to your computer and use it in GitHub Desktop.
Testing in Python, with Pytest and Mock
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| import channels | |
| import django.conf | |
| import pydash as _ | |
| import json | |
| import re | |
| import gevent | |
| class Exchange(object): | |
| """ | |
| List of group prefixes that can be prepended to a group | |
| for it to be detectable as a group holding channels that we | |
| would want to send messages to, whenever a producer fires a | |
| relevant event. | |
| """ | |
| group_prefixes = \ | |
| [django.conf.settings.CHANNEL_GROUP_PREFIX] | |
| """ | |
| Channel layer holding groups, which hold channels that | |
| we would want to send messages to whenever a producer fires | |
| a relevant event. | |
| """ | |
| channel_layer_list = \ | |
| [django.conf.settings.CHANNEL_LAYERS.keys()[0]] | |
| def __init__(self, input_queue="exchange", | |
| preset_bindings=True): | |
| """ | |
| Bootstraps the Exchange | |
| :param input_queue: string | |
| :param preset_bindings: boolean - eagerly load bindings | |
| (groups -> respective channels)? | |
| """ | |
| for group_prefix in Exchange.group_prefixes: | |
| if group_prefix is None: | |
| raise ValueError("All group_prefixes must be strings") | |
| self.queue_name = input_queue | |
| self.__messages__ = [] | |
| self.__bindings__ = {} | |
| if preset_bindings is True: | |
| self.bindings_cache_update() | |
| def bindings_cache_update(self): | |
| for group_prefix in Exchange.group_prefixes: | |
| for channel_layer in Exchange.channel_layer_list: | |
| self.set_bindings(group_prefix, channel_layer) | |
| def set_bindings(self, group_prefix, channel_layer): | |
| """ | |
| Mutates locally cached dict of bindings when called so that | |
| there is always a mapping of subscribers to their designated | |
| subscriptions | |
| :return: void | |
| """ | |
| self.__bindings__ = {} | |
| # Get all Redis nodes | |
| redis_nodes = \ | |
| channels.channel_layers[channel_layer]._connection_list | |
| # Obtain all the channels along with their group namespace | |
| bindings_map = {} | |
| for redis_node in redis_nodes: | |
| for group in redis_node.keys("*" + group_prefix + "*"): | |
| # Normalize name by removing the prefix | |
| group_normalized = group.split(group_prefix, 1)[1] | |
| bindings_map[group_normalized] = redis_node.zrange(group, 0, -1) | |
| for group, subscribers_list in bindings_map.iteritems(): | |
| for subscriber in subscribers_list: | |
| self.__parse_binding__(group, subscriber) | |
| def __parse_binding__(self, bindings_map_group_key, channel_name): | |
| """ | |
| Parses a Django channels group namespace into a correct | |
| self.__bindings__ path and places the subscriber/s | |
| (identified by "channel_name") at the intended level | |
| :param bindings_map_group_key: string | |
| :param channel_name: string | |
| :return: void | |
| """ | |
| def recursive_get(object, dotted_path, default=None): | |
| path_list = dotted_path.split(".") | |
| for path in path_list: | |
| try: | |
| object = object[path] | |
| except KeyError: | |
| if default is not None: | |
| return default | |
| else: | |
| raise | |
| return object | |
| # Unpack a group string that looks like | |
| # "workspace:empire,dataset:test,query:{empire:ghost}" to | |
| # {"workspace": "empire", "dataset": "test"} | |
| binding_path_descriptor = {} | |
| for path in re.compile(",(?![^{]*})").split(bindings_map_group_key): | |
| label, value = path.split(":", 1) | |
| binding_path_descriptor[label] = value | |
| # Place the subscriber into the correct self.__bindings__ path | |
| if "workspace" in binding_path_descriptor: | |
| workspace_name = binding_path_descriptor["workspace"] | |
| if "query" in binding_path_descriptor: | |
| subscribe_obj = { | |
| "subscriber": channel_name, | |
| "query": dict( | |
| _.map_([ | |
| qs for qs in binding_path_descriptor["query"] | |
| .replace("{", "", 1)\ | |
| .replace("}", "", 1)\ | |
| .split(",") | |
| ], lambda x: tuple(x.split(":", 1)) | |
| ) | |
| ) | |
| } | |
| dataset_name = binding_path_descriptor.get( | |
| "dataset", "*" | |
| ) | |
| dataset_subscriptions = recursive_get( | |
| self.__bindings__, | |
| "%s.%s" % (workspace_name, dataset_name), | |
| [] | |
| ) | |
| self.__bindings__ = _.set_( | |
| self.__bindings__, | |
| "%s.%s" % (workspace_name, dataset_name), | |
| _.append(dataset_subscriptions, subscribe_obj) | |
| ) | |
| elif "dataset" in binding_path_descriptor: | |
| subscribe_obj = { | |
| "subscriber": channel_name | |
| } | |
| dataset_name = binding_path_descriptor.get("dataset") | |
| dataset_subscriptions = recursive_get( | |
| self.__bindings__, | |
| "%s.%s" % (workspace_name, dataset_name), | |
| [] | |
| ) | |
| self.__bindings__ = _.set_( | |
| self.__bindings__, | |
| "%s.%s" % (workspace_name, dataset_name), | |
| _.append(dataset_subscriptions, subscribe_obj) | |
| ) | |
| else: | |
| subscribe_obj = { | |
| "subscriber": channel_name | |
| } | |
| workspace_subscriptions = recursive_get( | |
| self.__bindings__, | |
| "%s.*" % workspace_name, | |
| [] | |
| ) | |
| self.__bindings__ = _.set_( | |
| self.__bindings__, | |
| "%s.*" % workspace_name, | |
| _.append(workspace_subscriptions, subscribe_obj) | |
| ) | |
| def start_eventloop(self): | |
| """ | |
| Grab a message from a queue and have it be handled | |
| :return: void | |
| """ | |
| while True: | |
| self.__receive_message__(self.__messages__.pop(0)) | |
| time.sleep(0.001) | |
| def add_message(self, message): | |
| """ | |
| Adds a message to the exchange | |
| :param message: string | |
| :return: void | |
| """ | |
| self.__messages__.append(message) | |
| def __receive_message__(self, message): | |
| """ | |
| Process a received message and send to subscribers | |
| :param message: | |
| :return: | |
| """ | |
| message = _.defaults(message, { | |
| "workspace": None, | |
| "dataset": "*", | |
| "measures": ["*"], | |
| "dimension": "*" | |
| }) | |
| if message.get("workspace") is None: | |
| return | |
| subscribers = \ | |
| self.__bindings__\ | |
| .get(message["workspace"], {})\ | |
| .get(message["dataset"], []) | |
| for subscriber in subscribers: | |
| for channel_layer in Exchange.channel_layer_list: | |
| channels.Channel(subscriber["subscriber"], channel_layer).send({ | |
| "text": json.dumps(message) | |
| }) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from hydra.realtime.exchange import Exchange | |
| from mock import Mock | |
| import json | |
| def test_parse_binding(): | |
| exchange = Exchange(preset_bindings=False) | |
| # Test inclusion of workspace-wide subscribers | |
| exchange.__parse_binding__( | |
| "workspace:empire", | |
| "subscriberId1" | |
| ) | |
| exchange.__parse_binding__( | |
| "workspace:town", | |
| "subscriberId2" | |
| ) | |
| assert { | |
| "empire": { | |
| "*": [{"subscriber": "subscriberId1"}] | |
| }, | |
| "town": { | |
| "*": [{"subscriber": "subscriberId2"}] | |
| } | |
| } == exchange.__bindings__ | |
| # Test inclusion of dataset-wide subscribers | |
| exchange.__parse_binding__( | |
| "workspace:empire,dataset:ghost", | |
| "subscriberId3" | |
| ) | |
| exchange.__parse_binding__( | |
| "workspace:pond,dataset:angel", | |
| "subscriberId4" | |
| ) | |
| assert { | |
| "empire": { | |
| "*": [{"subscriber": "subscriberId1"}], | |
| "ghost": [{"subscriber": "subscriberId3"}] | |
| }, | |
| "town": { | |
| "*": [{"subscriber": "subscriberId2"}] | |
| }, | |
| "pond": { | |
| "angel": [{"subscriber": "subscriberId4"}] | |
| } | |
| } == exchange.__bindings__ | |
| # Test inclusion of query-wide subscribers | |
| exchange.__parse_binding__( | |
| "workspace:empire,query:{param1:value1,param2:value2}", | |
| "subscriberId5" | |
| ) | |
| exchange.__parse_binding__( | |
| "workspace:moat,dataset:ghost,query:{param1:value1,param2:value2}", | |
| "subscriberId6" | |
| ) | |
| assert { | |
| "empire": { | |
| "*": [ | |
| {"subscriber": "subscriberId1"}, | |
| {"subscriber": "subscriberId5", | |
| "query": { | |
| "param1": "value1", | |
| "param2": "value2" | |
| }} | |
| ] | |
| }, | |
| "town": { | |
| "*": [{"subscriber": "subscriberId2"}] | |
| }, | |
| "pond": { | |
| "angel": [{"subscriber": "subscriberId4"}] | |
| }, | |
| "moat": { | |
| "ghost": [{"subscriber": "subscriberId6", | |
| "param1": "value1", | |
| "param2": "value2" | |
| }] | |
| } | |
| } | |
| def test_instantiation(monkeypatch): | |
| # Mock out Django settings | |
| class MockDjangoSettings(object): | |
| CHANNEL_GROUP_PREFIX = "hydrrltm:" | |
| CHANNEL_LAYERS = { | |
| "default": {} | |
| } | |
| monkeypatch.setattr("django.conf.settings", MockDjangoSettings()) | |
| # Mock out Redis Client API used in AsgiRedisChannelLayer | |
| redis_mock = Mock() | |
| redis_mock.keys = Mock(return_value=["hydrrltm:workspace:empire"]) | |
| redis_mock.smembers = Mock(return_value=["subscriberId1"]) | |
| # Mock out AsgiRedisChannelLayer, specifically its list of connections | |
| class MockAsgiChannelLayer(object): | |
| _connection_list = [redis_mock] | |
| monkeypatch.setattr("channels.channel_layers", { | |
| "default": MockAsgiChannelLayer() | |
| }) | |
| # Let's call out the exchange | |
| exchange = Exchange() | |
| # Assertions | |
| assert { | |
| "empire": { | |
| "*": [{"subscriber": "subscriberId1"}] | |
| } | |
| } == exchange.__bindings__ | |
| redis_mock.keys\ | |
| .assert_called_with(MockDjangoSettings.CHANNEL_GROUP_PREFIX + "*") | |
| redis_mock.smembers\ | |
| .assert_called_with("hydrrltm:workspace:empire") | |
| def test_sending_messages(monkeypatch): | |
| class MockDjangoSettings(object): | |
| CHANNEL_GROUP_PREFIX = "somePrefix:" | |
| CHANNEL_LAYERS = { | |
| "default": {} | |
| } | |
| monkeypatch.setattr("django.conf.settings", MockDjangoSettings()) | |
| # Mock out 2 Redis Client APIs (simulating 2 different Redis servers) | |
| redis_mock1 = Mock() | |
| redis_mock1.keys = Mock( | |
| return_value=["hydrrltm:workspace:empire"] | |
| ) | |
| redis_mock1.smembers = Mock( | |
| return_value=["sub1", "sub2"] | |
| ) | |
| redis_mock2 = Mock() | |
| redis_mock2.keys = Mock( | |
| return_value=["hydrrltm:workspace:empire,dataset:pond"] | |
| ) | |
| redis_mock2.smembers = Mock( | |
| return_value=["sub2"] | |
| ) | |
| # Mock out AsgiRedisChannelLayer - 2 redis nodes | |
| class MockAsgiChannelLayer(object): | |
| _connection_list = [redis_mock1, redis_mock2] | |
| monkeypatch.setattr("channels.channel_layers", { | |
| "default": MockAsgiChannelLayer() | |
| }) | |
| # Mock out Channels | |
| class ChannelMock(object): | |
| __init__ = Mock(return_value=None) | |
| send = Mock(return_value=None) | |
| monkeypatch.setattr("channels.Channel", ChannelMock) | |
| exchange = Exchange() | |
| exchange.__receive_message__({ | |
| "workspace": "empire", | |
| "dataset": "pond", | |
| "content": "someMessage" | |
| }) | |
| assert { | |
| "empire": { | |
| "*": [{"subscriber": "sub1"}, {"subscriber": "sub2"}], | |
| "pond": [{"subscriber": "sub2"}] | |
| } | |
| } == exchange.__bindings__ | |
| ChannelMock.__init__.assert_called_once_with("sub2", "default") | |
| ChannelMock.send.assert_called_once_with({ | |
| "text": json.dumps({ | |
| "workspace": "empire", | |
| "dataset": "pond", | |
| "measures": ["*"], | |
| "dimension": "*", | |
| "content": "someMessage" | |
| }) | |
| }) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment