Created
February 22, 2020 12:52
-
-
Save rainbowbird/c68fad83fe00c8db3b43ab79aa963388 to your computer and use it in GitHub Desktop.
Pipelining Chaining Commands - Coroutines with generators
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Use coroutines to sum log file data with different log levels. | |
log file content: | |
ERROR: 78 | |
DEBUG: 72 | |
WARN: 99 | |
CRITICAL: 97 | |
FATAL: 40 | |
FATAL: 33 | |
CRITICAL: 34 | |
ERROR: 10 | |
ERROR: 89 | |
ERROR: 46 | |
""" | |
import functools | |
import sys | |
import time | |
def init_coroutine(func): | |
@functools.wraps(func) | |
def init(*args, **kwargs): | |
gen = func(*args, **kwargs) | |
next(gen) | |
return gen | |
return init | |
def read_forever(fobj, target): | |
"""Read from a file as long as there are lines. | |
Wait for the other process to write more lines. | |
Send the lines to 'target'. | |
""" | |
counter = 0 | |
while True: | |
line = fobj.readline() | |
if not line: | |
time.sleep(0.1) | |
continue | |
target.send(line) | |
@init_coroutine | |
def filter_comments(target): | |
"""Filter out all lines starting with #. | |
""" | |
while True: | |
line = yield | |
if not line.strip().startswith('#'): | |
target.send(line) | |
@init_coroutine | |
def get_number(targets): | |
"""Read the number in the line and convert it to an integer. | |
Use the level read from the line to choose the to target. | |
""" | |
while True: | |
line = yield | |
level, number = line.split(':') | |
number = int(number) | |
targets[level].send(number) | |
@init_coroutine | |
def fatal(): | |
"""Handle fatal errors.""" | |
sum_ = 0 | |
while True: | |
value = yield | |
sum_ += value | |
sys.stdout.write('FATAL sum: %7d\n' % sum_) | |
sys.stdout.flush() | |
@init_coroutine | |
def critical(): | |
"""Handle critical errors.""" | |
sum_ = 0 | |
while True: | |
value = yield | |
sum_ += value | |
sys.stdout.write('CRITICAL sum: %7d\n' % sum_) | |
sys.stdout.flush() | |
@init_coroutine | |
def error(): | |
"""Handle normal errors.""" | |
sum_ = 0 | |
while True: | |
value = yield | |
sum_ += value | |
sys.stdout.write('ERROR sum: %7d\n' % sum_) | |
sys.stdout.flush() | |
@init_coroutine | |
def warn(): | |
"""Handle warnings.""" | |
sum_ = 0 | |
while True: | |
value = yield | |
sum_ += value | |
sys.stdout.write('WARN sum: %7d\n' % sum_) | |
sys.stdout.flush() | |
@init_coroutine | |
def debug(): | |
"""Handle debug messages.""" | |
sum_ = 0 | |
while True: | |
value = yield | |
sum_ += value | |
sys.stdout.write('DEBUG sum: %7d\n' % sum_) | |
sys.stdout.flush() | |
TARGETS = {'CRITICAL': critical(), | |
'DEBUG': debug(), | |
'ERROR': error(), | |
'FATAL': fatal(), | |
'WARN': warn()} | |
def show_sum(file_name='out.txt'): | |
"""Start start the pipline. | |
""" | |
read_forever(open(file_name), filter_comments(get_number(TARGETS))) | |
if __name__ == '__main__': | |
show_sum(sys.argv[1]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment