Skip to content

Instantly share code, notes, and snippets.

@dat-boris
Created January 31, 2017 05:51
Show Gist options
  • Select an option

  • Save dat-boris/3207d428103751a9092e15da400dbd90 to your computer and use it in GitHub Desktop.

Select an option

Save dat-boris/3207d428103751a9092e15da400dbd90 to your computer and use it in GitHub Desktop.
from pipedream import pipeline
from pipedream.store import PipelineStore
def demo_broken_monitoring():
def check_word_maxlen(word):
assert 0 < len(word) < 40, "Expect be less than 40 chars"
class FrequencyValidator(object):
def __init__(self, max_empty=100):
self.empty_count = 0
self.max_empty = max_empty
def validate(self, word):
if not word:
self.empty_count += 1
if (self.empty_count >= self.max_empty):
self.empty_count = 0
assert False, \
"Do not expect {} consecutive empty words".format(self.max_empty)
else:
self.empty_count = 0
monitored_pipeline = pipeline.Pipeline()
monitored_pipeline.set_steps([
monitored_pipeline.monitor_step(
s02_functional.emit_words,
FrequencyValidator().validate
),
monitored_pipeline.monitor_step(
s02_functional.filter_empty_word,
check_word_maxlen
),
s02_functional.filter_empty_word,
s02_functional.count_words
])
output = monitored_pipeline.monitor_apply(
BAD_INPUT[0],
error_prefix='error_store'
)
print(output)
# Run unexpected input
print("Inspecting error store: error_store1")
#print("You have error keys: {}".format(monitored_pipeline.errors[0]))
error_store = PipelineStore('./error_store1', inspect=True)
errors = error_store.get_values()
print("You have following errors:")
pprint(errors)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment