Skip to content

Instantly share code, notes, and snippets.

@emmanuellyautomated
Created November 21, 2017 17:14
Show Gist options
  • Save emmanuellyautomated/a0b0343d77c8b790df61bc2a87fe550a to your computer and use it in GitHub Desktop.
Save emmanuellyautomated/a0b0343d77c8b790df61bc2a87fe550a to your computer and use it in GitHub Desktop.
An example of using Python coroutines to create processing pipelines.
import json
import os
import requests
from copy import copy
# To run try this out, run --> `python coroutines.py`
#-- UTILS ------------------------------------------->>>
def coroutine(func):
def start(*args, **kwargs):
cr = func(*args, **kwargs)
next(cr)
return cr
return start
def source(generator, target):
for item in generator:
if target is None:
return
target.send(item)
def pipeline(targets, sink, call=None):
print('CALL: ', call)
if not any(targets):
return call
if not call:
targets = copy(targets)
return pipeline(targets, sink, call=globals().get(targets.pop())(sink()))
return pipeline(targets, sink, call=globals().get(targets.pop())(call))
@coroutine
def broadcast(targets):
while True:
item = (yield)
for target in targets:
target.send(item)
#-- UTILS ------------------------------------------->>>
#-- SINKS ------------------------------------------->>>
@coroutine
def printer():
while True:
line = (yield)
print(line,)
@coroutine
def dumpstream():
while True:
item = (yield)
with open(os.devnull, 'w') as dump:
dump.write(item)
#-- SINKS ------------------------------------------->>>
#-- PROCESSORS -------------------------------------->>>
@coroutine
def lowercase(target):
while True:
item = (yield)
target.send(item.lower())
@coroutine
def underscore(target):
while True:
item = (yield)
target.send(item.replace(' ', '_'))
@coroutine
def deunderscore(target):
while True:
item = (yield)
target.send(item.replace('_', ' '))
@coroutine
def titlecase(target):
while True:
item = (yield)
target.send(item.title())
@coroutine
def uppercase(target):
while True:
item = (yield)
target.send(item.upper())
#-- PROCESSORS -------------------------------------->>>
#-- FILTERS ----------------------------------------->>>
@coroutine
def grep(pattern, target):
while True:
line = (yield)
if pattern in line:
target.send(line)
#-- FILTERS ----------------------------------------->>>
def get_jobs(url='http://api.dataatwork.org/v1/jobs?limit=', num=100):
jobs_url = f'{url}{num}'
response = requests.get(jobs_url)
for d in json.loads(response.content.decode('utf-8')):
if d.get('title'):
yield d.get('title')
if __name__ == '__main__':
records = []
print('***** Fetching Records...')
for call in get_jobs(num=500):
records.append(call)
targets = ['lowercase', 'titlecase', 'deunderscore', 'underscore', 'uppercase']
source(records, pipeline(targets, printer))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment