Created
July 15, 2011 10:42
-
-
Save fmarani/1084464 to your computer and use it in GitHub Desktop.
Transformation pipeline in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import itertools | |
import random | |
import string | |
import functools | |
# support functions | |
#Sample implementation of ireduce() | |
def ireduce(func, iterable, init=None): | |
if init is None: | |
iterable = iter(iterable) | |
curr = iterable.next() | |
else: | |
curr = init | |
for x in iterable: | |
curr = func(curr, x) | |
yield curr | |
# end support functions | |
# PROTOTYPE FOR A PIPELINE SYSTEM | |
# where every transformation is applied lazily | |
# and all stages are active at the same time | |
# (took inspiration from Hadoop and dataflow languages) | |
partner_a_products = [] | |
partner_b_products = [] | |
def import_partner_products(): | |
global partner_a_products, partner_b_products | |
partner_a_load_count = 0 | |
partner_b_load_count = 0 | |
for i in range(1000): | |
isbn = random.randrange(9780000,9782000) | |
name = ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(7)) | |
price = random.randrange(1,5) | |
new_product = (isbn, name, price) | |
partner_a_load_count += 1 | |
print partner_a_load_count, " PARTNER A NEW -> ", new_product | |
partner_a_products.append(new_product) | |
for i in range(2000): | |
isbn = random.randrange(9780000,9784000) | |
name = ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(7)) | |
price = random.randrange(1,5) | |
new_product = (isbn, name, price) | |
partner_b_load_count += 1 | |
print partner_b_load_count, " PARTNER B NEW -> ", new_product | |
partner_b_products.append(new_product) | |
def Acorrelated(): | |
print "correlated.." | |
def grouper(product): | |
def accumulate_isbn_equality(acc, product, original_product): | |
if product[0] == original_product[0]: | |
return acc + [product] | |
else: | |
return acc | |
b_correlates = ireduce(functools.partial(accumulate_isbn_equality, original_product=product), partner_b_products, []) | |
try: | |
b_correlate = [] | |
while len(b_correlate) == 0: | |
b_correlate = b_correlates.next() # eagerly gets first match | |
b_correlate = b_correlate[0] | |
except StopIteration: | |
b_correlate = None | |
return (product, b_correlate) | |
return itertools.imap(grouper, partner_a_products) | |
def partner_selected(): | |
print "partner_selected.." | |
def mapper(x): | |
if x[1] == None: | |
w = x[0] | |
elif x[0][2] > x[1][2]: | |
w = x[1] | |
else: | |
w = x[0] | |
print "best_price_on %s and %s -> %s" % (str(x[0]), str(x[1]), str(w)) | |
return w | |
return itertools.imap(mapper, Acorrelated()) | |
def filtered(): | |
print "filtered.." | |
def filter(x): | |
print "filtering > 2 on", x | |
return x[2] > 2 | |
return itertools.ifilter(filter, partner_selected()) | |
def with_markup(): | |
print "with_markup.." | |
def mapper(x): | |
print "mapping markup addition on", x | |
return (x[0], x[1], x[2] + 1.2) | |
return itertools.imap(mapper, filtered()) | |
def normalized(): | |
print "normalized.." | |
def mapper(x): | |
print "mapping lowercase on", x | |
return (x[0], x[1].lower(), x[2]) | |
return itertools.imap(mapper, with_markup()) | |
count = 0 | |
import_partner_products() | |
for product in normalized(): | |
count += 1 | |
print count, " -> ", product | |
if count == 1000: | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment