Skip to content

Instantly share code, notes, and snippets.

@justinvdm
Created November 6, 2013 16:06
Show Gist options
  • Select an option

  • Save justinvdm/7338842 to your computer and use it in GitHub Desktop.

Select an option

Save justinvdm/7338842 to your computer and use it in GitHub Desktop.
diff --git a/vumi/tests/utils.py b/vumi/tests/utils.py
index 460853a..18a60de 100644
--- a/vumi/tests/utils.py
+++ b/vumi/tests/utils.py
@@ -20,6 +20,7 @@ from vumi.utils import vumi_resource_path, flatten_generator, LogFilterSite
from vumi.service import get_spec, Worker, WorkerCreator
from vumi.message import TransportUserMessage, TransportEvent
from vumi.tests.fake_amqp import FakeAMQPBroker, FakeAMQClient
+from vumi.blinklights import MetricManager
def import_filter(exc, *expected):
@@ -639,6 +640,10 @@ class VumiWorkerTestCase(TestCase):
def get_dispatched_failures(self, connector_name=None):
return self._get_dispatched('failures', connector_name)
+ def get_dispatched_metrics(self):
+ return self._amqp.get_messages(
+ MetricManager.exchange_name, MetricManager.routing_key)
+
def wait_for_dispatched_events(self, amount, connector_name=None):
return self._wait_for_dispatched('event', amount, connector_name)
@@ -651,6 +656,10 @@ class VumiWorkerTestCase(TestCase):
def wait_for_dispatched_failures(self, amount, connector_name=None):
return self._wait_for_dispatched('failures', amount, connector_name)
+ def wait_for_dispatched_metrics(self, amount):
+ return self._amqp.wait_messages(
+ MetricManager.exchange_name, MetricManager.routing_key, amount)
+
def clear_dispatched_events(self, connector_name=None):
return self._clear_dispatched('event', connector_name)
@@ -663,6 +672,10 @@ class VumiWorkerTestCase(TestCase):
def clear_dispatched_failures(self, connector_name=None):
return self._clear_dispatched('failures', connector_name)
+ def clear_dispatched_metrics(self):
+ return self._amqp.clear_messages(
+ MetricManager.exchange_name, MetricManager.routing_key)
+
def _dispatch(self, message, rkey, exchange='vumi'):
self._amqp.publish_message(exchange, rkey, message)
return self._amqp.kick_delivery()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment