Skip to content

Instantly share code, notes, and snippets.

@palin
Created October 10, 2017 11:33
Show Gist options
  • Save palin/99a1c4e20eb0348d2c7eabdb4ca0cb5f to your computer and use it in GitHub Desktop.
Save palin/99a1c4e20eb0348d2c7eabdb4ca0cb5f to your computer and use it in GitHub Desktop.
import luigi
import luigi.contrib
import luigi.contrib.mysqldb
import MySQLdb
"""
This simple worflow works like this:
1. Read 12 records from reviews table in local revieworld_live DB
2. Insert those 12 records (only review_id and published date) into local luigi_test_1 table in test DB
3. Read 12 records from local luigi_test_1 table in test DB
4. Insert those 12 records (only review_id) into luigi_test_2 table.
Main executed task is "SaveSomewhereElse" which depends on "ReadFromReportingDb", so the latter is executed first.
After "ReadFromReportingDb" is done there's a checkpoint/output created in marker table (specified in config).
The code `self.output().touch()` creates a record which marks the job as done.
When "ReadFromReportingDb" finished the main task runs and creates similar checkpoint in DB when it's done.
"""
class SaveSomewhereElse(luigi.Task):
x = luigi.Parameter()
def output(self):
return luigi.contrib.mysqldb.MySqlTarget("localhost", "test", "root", "", "luigi_test_2", "save_final_{}".format(self.x))
def requires(self):
return ReadFromReportingDb(17)
def run(self):
mysql_connection = MySQLdb.connect(host="localhost", port=3306, user="root", passwd="", db="test")
cursor = mysql_connection.cursor()
query = cursor.execute("""SELECT review_id, published
FROM luigi_test_1
LIMIT 12""")
data = cursor.fetchall()
mysql_connection.close()
self.process(data)
self.output().touch()
def process(_, rows):
mysql_connection = MySQLdb.connect(host="localhost", port=3306, user="root", passwd="", db="test")
cursor = mysql_connection.cursor()
for row in rows:
cursor.execute("INSERT INTO luigi_test_2(review_id) VALUES ({})".format(row[0]))
mysql_connection.commit()
mysql_connection.close()
class ReadFromReportingDb(luigi.Task):
x = luigi.Parameter()
def output(self):
return luigi.contrib.mysqldb.MySqlTarget("localhost", "test", "root", "", "luigi_test_1", "read_first_{}".format(self.x))
def run(self):
mysql_connection = MySQLdb.connect(host="localhost", port=3306, user="root", passwd="", db="revieworld_live")
cursor = mysql_connection.cursor()
query = cursor.execute("""SELECT id, published, retailer_id
FROM reviews
LIMIT 12""")
data = cursor.fetchall()
mysql_connection.close()
self.process(data)
self.output().touch()
def process(_, rows):
mysql_connection = MySQLdb.connect(host="localhost", port=3306, user="root", passwd="", db="test")
cursor = mysql_connection.cursor()
for row in rows:
cursor.execute("INSERT INTO luigi_test_1(review_id,published) VALUES ({},'{}')".format(row[0], row[1]))
mysql_connection.commit()
mysql_connection.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment