Collecting examples that break or do not break.
Next step is refactoring the broken bq example into the https requests example using the same http lib that bq does.
need to get to the root of why SSLErrors
are thrown
# crashes when you run in parallel | |
# python broken_bq.py Clean --workers 2 | |
import luigi | |
import tylertreatwrapper as bigquery | |
class BQTarget(luigi.Target): | |
""" represents a table in BQ """ | |
def __init__(self, dataset=None, table_name=None): | |
if not dataset: | |
raise Exception("dataset must be given to a BQTarget") | |
if not table_name: | |
raise Exception("table_name must be given to a BQTarget") | |
self.dataset = dataset | |
self.table_name = table_name | |
def _service(self): | |
return bigquery.get_service() | |
def exists(self): | |
""" check to see if the given dataset.table_name is in | |
bigquery. """ | |
tt = bigquery.get_service() | |
return tt.check_table(self.dataset, self.table_name) | |
def full_name(self): | |
return "{}.{}".format(self.dataset, self.table_name) | |
def write(self, query): | |
j = self._service().write_to_table( | |
query, | |
self.dataset, | |
self.table_name | |
) | |
return self._service().wait_for_job(j, timeout=120) | |
def delete(self): | |
self._service().delete_table(self.dataset, self.table_name) | |
class A(luigi.Task): | |
def output(self): | |
return BQTarget("tmp", "a") | |
def run(self): | |
self.output().write("select 'a' as user, 10 as moneys") | |
class B(luigi.Task): | |
def output(self): | |
return BQTarget("tmp", "b") | |
def run(self): | |
self.output().write("select 'a' as user, 300 as weight") | |
class Joined(luigi.Task): | |
def requires(self): | |
yield A() | |
yield B() | |
def output(self): | |
return BQTarget("tmp", "a_b_joined") | |
def run(self): | |
a, b = [i.full_name() for i in self.input()] | |
self.output().write( | |
""" | |
select | |
a.user user, | |
a.moneys moneys, | |
b.weight weight, | |
from | |
[{a_table}] a | |
join | |
[{b_table}] b | |
on | |
a.user = b.user | |
""".format( | |
a_table=a, | |
b_table=b | |
) | |
) | |
class Clean(luigi.Task): | |
def requires(self): | |
yield A() | |
yield B() | |
yield Joined() | |
def run(self): | |
for t in self.input(): | |
t.delete() | |
if __name__ == '__main__': | |
luigi.run() |
# can run in parallel and does not crash | |
# python mysterious_working_pipeline.py Every --local-scheduler --workers 6 | |
import luigi | |
import requests | |
import time | |
class Doer(luigi.Task): | |
fname = "a" | |
site = "https://google.com" | |
def output(self): | |
return luigi.LocalTarget("goat/{}.txt".format(self.fname)) | |
def run(self): | |
time.sleep(3) | |
with self.output().open("w") as f: | |
t = requests.get(self.site).text | |
f.write("{} was this big {}\n".format(self.site, len(t))) | |
class A(Doer): | |
fname = "a" | |
site = "https://google.com" | |
class B(Doer): | |
fname = "b" | |
site = "https://reddit.com" | |
class C(Doer): | |
fname = "c" | |
site = "https://yahoo.com" | |
class D(Doer): | |
fname = "d" | |
site = "https://gawker.com" | |
class E(Doer): | |
fname = "e" | |
site = "https://www.readthedocs.org" | |
class F(Doer): | |
fname = "f" | |
site = "https://facebook.com" | |
class Every(luigi.Task): | |
def requires(self): | |
return [ | |
A(), | |
B(), | |
C(), | |
E(), | |
F(), | |
] | |
def output(self): | |
return luigi.LocalTarget("goat/done.txt") | |
def run(self): | |
with self.output().open("w") as out: | |
for dep in self.input(): | |
with dep.open("r") as f: | |
out.write("*" + f.read()) | |
if __name__ == '__main__': | |
luigi.run() |