Created
January 31, 2017 05:51
-
-
Save dat-boris/3207d428103751a9092e15da400dbd90 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from pipedream import pipeline | |
| from pipedream.store import PipelineStore | |
| def demo_broken_monitoring(): | |
| def check_word_maxlen(word): | |
| assert 0 < len(word) < 40, "Expect be less than 40 chars" | |
| class FrequencyValidator(object): | |
| def __init__(self, max_empty=100): | |
| self.empty_count = 0 | |
| self.max_empty = max_empty | |
| def validate(self, word): | |
| if not word: | |
| self.empty_count += 1 | |
| if (self.empty_count >= self.max_empty): | |
| self.empty_count = 0 | |
| assert False, \ | |
| "Do not expect {} consecutive empty words".format(self.max_empty) | |
| else: | |
| self.empty_count = 0 | |
| monitored_pipeline = pipeline.Pipeline() | |
| monitored_pipeline.set_steps([ | |
| monitored_pipeline.monitor_step( | |
| s02_functional.emit_words, | |
| FrequencyValidator().validate | |
| ), | |
| monitored_pipeline.monitor_step( | |
| s02_functional.filter_empty_word, | |
| check_word_maxlen | |
| ), | |
| s02_functional.filter_empty_word, | |
| s02_functional.count_words | |
| ]) | |
| output = monitored_pipeline.monitor_apply( | |
| BAD_INPUT[0], | |
| error_prefix='error_store' | |
| ) | |
| print(output) | |
| # Run unexpected input | |
| print("Inspecting error store: error_store1") | |
| #print("You have error keys: {}".format(monitored_pipeline.errors[0])) | |
| error_store = PipelineStore('./error_store1', inspect=True) | |
| errors = error_store.get_values() | |
| print("You have following errors:") | |
| pprint(errors) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment