Skip to content

Instantly share code, notes, and snippets.

@johanste
Last active July 15, 2020 03:56
Show Gist options
  • Save johanste/5cc62daa23adc3a68ae4cca49d109fd5 to your computer and use it in GitHub Desktop.
Save johanste/5cc62daa23adc3a68ae4cca49d109fd5 to your computer and use it in GitHub Desktop.
Decorator instead of event subscribers/handlers
import collections
import json
import typing
class RecognitionCancelledError(Exception):
...
ConditionalHandler = collections.namedtuple('ConditionalHandler', 'condition handler')
class Recognizer:
def __init__(self, endpoint, **kwargs):
self.handlers = []
def handle(self, condition: typing.Union[str, typing.Callable]):
"""Decorator for registering handlers for specific events
:param condition: Under what conditions should this handler be called.
"""
def wrapped(func):
if isinstance(condition, str):
self.handlers.append(ConditionalHandler(lambda m: m['type'] == condition, func))
else:
self.handlers.append(ConditionalHandler(condition, func))
return func
return wrapped
def recognize(self, data):
for line in data:
message = json.loads(line) # Fake load of lines into messages...
try:
handler = next(ch.handler for ch in self.handlers if ch.condition(message))
handler(message)
except StopIteration:
...
# User code - define what you want your recognizer to do:
import io
recognizer = Recognizer('https://api.contoso.com')
@recognizer.handle('speakerRecognized')
def handle_speaker_recognized(message):
print(f'speaker recognized: {message}')
@recognizer.handle('stop')
def handle_stop_message(message):
print(f'stopping: {message}')
raise RecognitionCancelledError("I'm sooo done with this!")
# Advanced scenario - custom filters of types of events
@recognizer.handle(lambda m: m['type'].startswith('x'))
def handle_x(message):
print(f'Some x-message detected: {message}')
# Advanced scenario - catch-all for all events. If common, this
# would be a handle with no parameters or an 'ALL' sentinel value
@recognizer.handle(lambda m: True)
def handle_anything_else(message):
print(f'Some other {message}')
# Feed the recognizer with a (fake) stream of data...
data = """{ "type": "something" }
{ "type": "xstuff", "yeah": "dude!" }
{ "type": "speakerRecognized" }
{ "type": "stop" }
{ "type": "something else" }
"""
try:
recognizer.recognize(io.StringIO(data))
except RecognitionCancelledError as e:
print(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment