Skip to content

Instantly share code, notes, and snippets.

@cpdean
Created May 5, 2015 04:07
Show Gist options
  • Save cpdean/f99bc58c14c0d102cacd to your computer and use it in GitHub Desktop.
Save cpdean/f99bc58c14c0d102cacd to your computer and use it in GitHub Desktop.

Collecting examples that break or do not break.

Next step is refactoring the broken bq example into the https requests example using the same http lib that bq does.

need to get to the root of why SSLErrors are thrown

# crashes when you run in parallel
# python broken_bq.py Clean --workers 2
import luigi
import tylertreatwrapper as bigquery
class BQTarget(luigi.Target):
""" represents a table in BQ """
def __init__(self, dataset=None, table_name=None):
if not dataset:
raise Exception("dataset must be given to a BQTarget")
if not table_name:
raise Exception("table_name must be given to a BQTarget")
self.dataset = dataset
self.table_name = table_name
def _service(self):
return bigquery.get_service()
def exists(self):
""" check to see if the given dataset.table_name is in
bigquery. """
tt = bigquery.get_service()
return tt.check_table(self.dataset, self.table_name)
def full_name(self):
return "{}.{}".format(self.dataset, self.table_name)
def write(self, query):
j = self._service().write_to_table(
query,
self.dataset,
self.table_name
)
return self._service().wait_for_job(j, timeout=120)
def delete(self):
self._service().delete_table(self.dataset, self.table_name)
class A(luigi.Task):
def output(self):
return BQTarget("tmp", "a")
def run(self):
self.output().write("select 'a' as user, 10 as moneys")
class B(luigi.Task):
def output(self):
return BQTarget("tmp", "b")
def run(self):
self.output().write("select 'a' as user, 300 as weight")
class Joined(luigi.Task):
def requires(self):
yield A()
yield B()
def output(self):
return BQTarget("tmp", "a_b_joined")
def run(self):
a, b = [i.full_name() for i in self.input()]
self.output().write(
"""
select
a.user user,
a.moneys moneys,
b.weight weight,
from
[{a_table}] a
join
[{b_table}] b
on
a.user = b.user
""".format(
a_table=a,
b_table=b
)
)
class Clean(luigi.Task):
def requires(self):
yield A()
yield B()
yield Joined()
def run(self):
for t in self.input():
t.delete()
if __name__ == '__main__':
luigi.run()
# can run in parallel and does not crash
# python mysterious_working_pipeline.py Every --local-scheduler --workers 6
import luigi
import requests
import time
class Doer(luigi.Task):
fname = "a"
site = "https://google.com"
def output(self):
return luigi.LocalTarget("goat/{}.txt".format(self.fname))
def run(self):
time.sleep(3)
with self.output().open("w") as f:
t = requests.get(self.site).text
f.write("{} was this big {}\n".format(self.site, len(t)))
class A(Doer):
fname = "a"
site = "https://google.com"
class B(Doer):
fname = "b"
site = "https://reddit.com"
class C(Doer):
fname = "c"
site = "https://yahoo.com"
class D(Doer):
fname = "d"
site = "https://gawker.com"
class E(Doer):
fname = "e"
site = "https://www.readthedocs.org"
class F(Doer):
fname = "f"
site = "https://facebook.com"
class Every(luigi.Task):
def requires(self):
return [
A(),
B(),
C(),
E(),
F(),
]
def output(self):
return luigi.LocalTarget("goat/done.txt")
def run(self):
with self.output().open("w") as out:
for dep in self.input():
with dep.open("r") as f:
out.write("*" + f.read())
if __name__ == '__main__':
luigi.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment