Created
October 10, 2017 11:33
-
-
Save palin/01ba1feedff6e4d321e86c9c09f47833 to your computer and use it in GitHub Desktop.
Connect Luigi with mysql db
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import luigi | |
import luigi.contrib | |
import luigi.contrib.mysqldb | |
import MySQLdb | |
""" | |
This simple worflow works like this: | |
1. Read 12 records from reviews table in local revieworld_live DB | |
2. Insert those 12 records (only review_id and published date) into local luigi_test_1 table in test DB | |
3. Read 12 records from local luigi_test_1 table in test DB | |
4. Insert those 12 records (only review_id) into luigi_test_2 table. | |
Main executed task is "SaveSomewhereElse" which depends on "ReadFromReportingDb", so the latter is executed first. | |
After "ReadFromReportingDb" is done there's a checkpoint/output created in marker table (specified in config). | |
The code `self.output().touch()` creates a record which marks the job as done. | |
When "ReadFromReportingDb" finished the main task runs and creates similar checkpoint in DB when it's done. | |
""" | |
class SaveSomewhereElse(luigi.Task): | |
x = luigi.Parameter() | |
def output(self): | |
return luigi.contrib.mysqldb.MySqlTarget("localhost", "test", "root", "", "luigi_test_2", "save_final_{}".format(self.x)) | |
def requires(self): | |
return ReadFromReportingDb(17) | |
def run(self): | |
mysql_connection = MySQLdb.connect(host="localhost", port=3306, user="root", passwd="", db="test") | |
cursor = mysql_connection.cursor() | |
query = cursor.execute("""SELECT review_id, published | |
FROM luigi_test_1 | |
LIMIT 12""") | |
data = cursor.fetchall() | |
mysql_connection.close() | |
self.process(data) | |
self.output().touch() | |
def process(_, rows): | |
mysql_connection = MySQLdb.connect(host="localhost", port=3306, user="root", passwd="", db="test") | |
cursor = mysql_connection.cursor() | |
for row in rows: | |
cursor.execute("INSERT INTO luigi_test_2(review_id) VALUES ({})".format(row[0])) | |
mysql_connection.commit() | |
mysql_connection.close() | |
class ReadFromReportingDb(luigi.Task): | |
x = luigi.Parameter() | |
def output(self): | |
return luigi.contrib.mysqldb.MySqlTarget("localhost", "test", "root", "", "luigi_test_1", "read_first_{}".format(self.x)) | |
def run(self): | |
mysql_connection = MySQLdb.connect(host="localhost", port=3306, user="root", passwd="", db="revieworld_live") | |
cursor = mysql_connection.cursor() | |
query = cursor.execute("""SELECT id, published, retailer_id | |
FROM reviews | |
LIMIT 12""") | |
data = cursor.fetchall() | |
mysql_connection.close() | |
self.process(data) | |
self.output().touch() | |
def process(_, rows): | |
mysql_connection = MySQLdb.connect(host="localhost", port=3306, user="root", passwd="", db="test") | |
cursor = mysql_connection.cursor() | |
for row in rows: | |
cursor.execute("INSERT INTO luigi_test_1(review_id,published) VALUES ({},'{}')".format(row[0], row[1])) | |
mysql_connection.commit() | |
mysql_connection.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment