Created
October 10, 2017 11:33
-
-
Save palin/99a1c4e20eb0348d2c7eabdb4ca0cb5f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import luigi | |
import luigi.contrib | |
import luigi.contrib.mysqldb | |
import MySQLdb | |
""" | |
This simple worflow works like this: | |
1. Read 12 records from reviews table in local revieworld_live DB | |
2. Insert those 12 records (only review_id and published date) into local luigi_test_1 table in test DB | |
3. Read 12 records from local luigi_test_1 table in test DB | |
4. Insert those 12 records (only review_id) into luigi_test_2 table. | |
Main executed task is "SaveSomewhereElse" which depends on "ReadFromReportingDb", so the latter is executed first. | |
After "ReadFromReportingDb" is done there's a checkpoint/output created in marker table (specified in config). | |
The code `self.output().touch()` creates a record which marks the job as done. | |
When "ReadFromReportingDb" finished the main task runs and creates similar checkpoint in DB when it's done. | |
""" | |
class SaveSomewhereElse(luigi.Task): | |
x = luigi.Parameter() | |
def output(self): | |
return luigi.contrib.mysqldb.MySqlTarget("localhost", "test", "root", "", "luigi_test_2", "save_final_{}".format(self.x)) | |
def requires(self): | |
return ReadFromReportingDb(17) | |
def run(self): | |
mysql_connection = MySQLdb.connect(host="localhost", port=3306, user="root", passwd="", db="test") | |
cursor = mysql_connection.cursor() | |
query = cursor.execute("""SELECT review_id, published | |
FROM luigi_test_1 | |
LIMIT 12""") | |
data = cursor.fetchall() | |
mysql_connection.close() | |
self.process(data) | |
self.output().touch() | |
def process(_, rows): | |
mysql_connection = MySQLdb.connect(host="localhost", port=3306, user="root", passwd="", db="test") | |
cursor = mysql_connection.cursor() | |
for row in rows: | |
cursor.execute("INSERT INTO luigi_test_2(review_id) VALUES ({})".format(row[0])) | |
mysql_connection.commit() | |
mysql_connection.close() | |
class ReadFromReportingDb(luigi.Task): | |
x = luigi.Parameter() | |
def output(self): | |
return luigi.contrib.mysqldb.MySqlTarget("localhost", "test", "root", "", "luigi_test_1", "read_first_{}".format(self.x)) | |
def run(self): | |
mysql_connection = MySQLdb.connect(host="localhost", port=3306, user="root", passwd="", db="revieworld_live") | |
cursor = mysql_connection.cursor() | |
query = cursor.execute("""SELECT id, published, retailer_id | |
FROM reviews | |
LIMIT 12""") | |
data = cursor.fetchall() | |
mysql_connection.close() | |
self.process(data) | |
self.output().touch() | |
def process(_, rows): | |
mysql_connection = MySQLdb.connect(host="localhost", port=3306, user="root", passwd="", db="test") | |
cursor = mysql_connection.cursor() | |
for row in rows: | |
cursor.execute("INSERT INTO luigi_test_1(review_id,published) VALUES ({},'{}')".format(row[0], row[1])) | |
mysql_connection.commit() | |
mysql_connection.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment