Last active
August 29, 2015 14:05
-
-
Save msukmanowsky/a23ff11b44534ec12c8f to your computer and use it in GitHub Desktop.
A custom code execution bolt, not yet tested.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from streamparse.bolt import Bolt | |
log = logging.getLogger("custom_code_bolt") | |
class CustomCodeBolt(Bolt): | |
auto_ack = False # take care of acking tuples yourself | |
auto_fail = False # take care of failing tuples yourself | |
def initialize(self, conf, ctx): | |
# TODO: Initialize database connections that hold custom code | |
self.custom_calc_cache = {} # (customer_id, msg_type): code object | |
def load_customer_calculation(self, customer_id, msg_type): | |
cache_key = (customer_id, msg_type) | |
if cache_key not in self.custom_calc_cache: | |
res = self.db.query(""" | |
SELECT code FROM custom_code | |
WHERE customer_id=? AND msg_type=? AND approved=true; | |
""", customer_id, msg_type) | |
if res.code is None: | |
# No customizations exist, exit early | |
self.custom_calc_cache[cache_key] = None | |
return None | |
# Compile customer's code into a "fake" Python module | |
module_filename = "{}_{}.py".format(customer_id, msg_type) | |
try: | |
code = compile(res.code, module_filename, "exec") | |
except SyntaxError: | |
log.error("Invalid syntax in customer code for %s %s", | |
customer_id, msg_type, exc_info=True) | |
code = None | |
except TypeError: | |
log.error("Customer source for %s %s contains null bytes", | |
customer_id, msg_type, exc_info=True) | |
code = None | |
self.custom_calc_cache[cache_key] = code | |
return self.custom_calc_cache[cache_key] | |
def process(self, tup): | |
customer_id, msg_type = tup.values[:2] | |
custom_code = self.load_customer_calculation(customer_id, msg_type) | |
if custom_code is None: | |
# No customizations exist, just ack the tuple and move on | |
self.ack(tup) | |
return | |
# Otherwise, execute the custom calculation | |
result = None | |
data = tup.values[2:] | |
# TODO: probably want to create a safe "globals" with __builtins__ that | |
# excludes certain functions, note that this is still not 100% safe. | |
# Idea is the custom code would depend on the result and data variables | |
# and overwrite result | |
try: | |
eval(custom_code) | |
# TODO: check result is still valid then maybe other stuff like emit | |
self.ack(tup) | |
except Exception: | |
log.error("Error executing customer code for %s %s", customer_id, | |
msg_type, exc_info=True) | |
self.fail(tup) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment