Skip to content

Instantly share code, notes, and snippets.

@Richard-Mathie
Last active March 29, 2017 10:48
Show Gist options
  • Select an option

  • Save Richard-Mathie/3341c3dca004bbe4eb27 to your computer and use it in GitHub Desktop.

Select an option

Save Richard-Mathie/3341c3dca004bbe4eb27 to your computer and use it in GitHub Desktop.
How To Test Streamparse Bolt?

Streamparse Bolt Test

Trying to have a method for testing bolt logic in streamparse projects.

from __future__ import absolute_import, print_function, unicode_literals
from collections import Counter
from streamparse.bolt import Bolt
class WordCounter(Bolt):
def initialize(self, conf, ctx):
self.counts = Counter()
def process(self, tup):
word = tup.values[0]
self.counts[word] += 1
self.emit([word, self.counts[word]])
self.log('%s: %d' % (word, self.counts[word]))
# Writing unit tests for streamparse bolt functions
# Insperation from
#
import logging
import unittest
from io import BytesIO
import simplejson as json
try:
from unittest import mock
from unittest.mock import patch
except ImportError:
import mock
from mock import patch
from streamparse.storm import Bolt, Tuple, Component
from bolts.asset_times import AssetTime
log = logging.getLogger(__name__)
class WordCountTests(unittest.TestCase):
def setUp(self):
self.tup_dict = {'id': 14,
'comp': 'word_spout',
'stream': 'words',
'task': 'some_bolt',
'tuple': ['test_word']}
tup_json = "{}\nend\n".format(json.dumps(self.tup_dict)).encode('utf-8')
self.tup = Tuple(self.tup_dict['id'], self.tup_dict['comp'],
self.tup_dict['stream'], self.tup_dict['task'],
self.tup_dict['tuple'],)
self.bolt = AssetTime(input_stream=BytesIO(tup_json),
output_stream=BytesIO())
self.bolt.initialize({}, {})
@patch.object(Bolt, 'ack', autospec=True)
@patch.object(Bolt, 'read_task_ids', autospec=True)
@patch.object(Bolt, 'send_message', autospec=True)
def test_process_basic(self, send_message_mock, read_task_ids_mock, ack_mock):
# A basic emit
read_task_ids_mock.return_value = None
self.bolt._run()
send_message_mock.assert_called_with(self.bolt, {'command': 'emit',
'anchors': [self.tup.id],
'tuple': ['test_word', 1]
})
@topiaruss
Copy link

I'm not entirely happy with this.
https://gist.github.com/topiaruss/264015387b69f867c4bf
It uses pytest, which I prefer for consistency with fixtures from my existing packages. I'll get started on some real code, and see if there are refactorings that make sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment