Skip to content

Instantly share code, notes, and snippets.

@Richard-Mathie
Last active March 29, 2017 10:48
Show Gist options
  • Select an option

  • Save Richard-Mathie/3341c3dca004bbe4eb27 to your computer and use it in GitHub Desktop.

Select an option

Save Richard-Mathie/3341c3dca004bbe4eb27 to your computer and use it in GitHub Desktop.
How To Test Streamparse Bolt?

Streamparse Bolt Test

Trying to have a method for testing bolt logic in streamparse projects.

from __future__ import absolute_import, print_function, unicode_literals
from collections import Counter
from streamparse.bolt import Bolt
class WordCounter(Bolt):
def initialize(self, conf, ctx):
self.counts = Counter()
def process(self, tup):
word = tup.values[0]
self.counts[word] += 1
self.emit([word, self.counts[word]])
self.log('%s: %d' % (word, self.counts[word]))
# Writing unit tests for streamparse bolt functions
# Insperation from
#
import logging
import unittest
from io import BytesIO
import simplejson as json
try:
from unittest import mock
from unittest.mock import patch
except ImportError:
import mock
from mock import patch
from streamparse.storm import Bolt, Tuple, Component
from bolts.asset_times import AssetTime
log = logging.getLogger(__name__)
class WordCountTests(unittest.TestCase):
def setUp(self):
self.tup_dict = {'id': 14,
'comp': 'word_spout',
'stream': 'words',
'task': 'some_bolt',
'tuple': ['test_word']}
tup_json = "{}\nend\n".format(json.dumps(self.tup_dict)).encode('utf-8')
self.tup = Tuple(self.tup_dict['id'], self.tup_dict['comp'],
self.tup_dict['stream'], self.tup_dict['task'],
self.tup_dict['tuple'],)
self.bolt = AssetTime(input_stream=BytesIO(tup_json),
output_stream=BytesIO())
self.bolt.initialize({}, {})
@patch.object(Bolt, 'ack', autospec=True)
@patch.object(Bolt, 'read_task_ids', autospec=True)
@patch.object(Bolt, 'send_message', autospec=True)
def test_process_basic(self, send_message_mock, read_task_ids_mock, ack_mock):
# A basic emit
read_task_ids_mock.return_value = None
self.bolt._run()
send_message_mock.assert_called_with(self.bolt, {'command': 'emit',
'anchors': [self.tup.id],
'tuple': ['test_word', 1]
})
@topiaruss
Copy link

Hi Richard,

I have more or less decided I want to climb aboard with streamparse.

I'm not meaning to be critical of your suggestion, but I'm much too lazy* to write this for perhaps 20 steps. Is this what you have gone with, or have you found a slicker way to test?

Meanwhile I'll be considering how to unit test my bolts.

--r

@topiaruss
Copy link

I'm not entirely happy with this.
https://gist.github.com/topiaruss/264015387b69f867c4bf
It uses pytest, which I prefer for consistency with fixtures from my existing packages. I'll get started on some real code, and see if there are refactorings that make sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment