Skip to content

Instantly share code, notes, and snippets.

@ryszard
Created September 11, 2009 12:03
Show Gist options
  • Save ryszard/185253 to your computer and use it in GitHub Desktop.
Save ryszard/185253 to your computer and use it in GitHub Desktop.
class TestFlow(TestCase):
def setUp(self):
from django_sqs import queues, receiver
self.subscription = Subscription(title="Subscription")
self.subscription.save()
self.register = Register(runner_class=OnlySubscriptionRunner)
self.register(test_agent)
self.broadcast = Broadcaster(self.register.agents)
self.runner = self.register.agents.values()[0]
self.subscription_classifier = \
receiver("subscription_classifier", message_class=JSONMessage)(
SubscriptionClassifier(
agents=self.register.agents
))
self.episode_classifier = \
receiver("episode_classifier", message_class=JSONMessage)(
EpisodeClassifier(
agents=self.register.agents
))
# self.subscription_classifier = EpisodeClassifier(
# agents=self.register.agents
# )
self.agent_queue = queues[self.runner.queue_name]
self.subscription_queue = self.subscription_classifier.registered_queue
self.episode_queue = self.episode_classifier.registered_queue
def test_broadcaster(self):
self.broadcast(self.subscription)
if self.agent_queue.receive_single() is None:
self.fail("The queue is empty")
def test_subscriptions(self):
pass
# there's one item in the agent queue
def tearUp(self):
self.agent_queue.get_queue().clear()
self.subscription_queue.get_queue().clear()
self.episode_queue.get_queue().clear()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment