Last active
March 17, 2017 15:41
-
-
Save Stiivi/5937938 to your computer and use it in GitHub Desktop.
Another simple Bubbles example: Use two data sources: customers and orders CSV files (exports from Volusion platform). Get list of customers (name, email address) who made orders in certain range of years. There are two versions of the same process: one involves loading the data into a SQL table and performing the operations using SQL, the other…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bubbles import Pipeline, open_store | |
stores = { | |
"source": open_store("csv", "data/source", encoding="utf16", infer_fields=True), | |
"target": open_store("sql", "sqlite:///data.sqlite") | |
} | |
p = Pipeline(stores=stores) | |
# Load customers into a SQL table | |
detail = p.fork() | |
detail.source("source", "Customers") | |
detail.create("target", "customers", replace=True) | |
# Load orders into a SQL table, convert date into a date type | |
p.source("source", "Orders") | |
p.string_to_date("shipdate", fmt="%m/%d/%Y %H:%M:%S %p") | |
p.create("target", "orders", replace=True) | |
# Get only shipped orders | |
p.filter_not_empty("shipdate") | |
# Filter by year | |
p.split_date("shipdate") | |
p.filter_by_range("shipdate_year", 2011, 2013) | |
# Join customer details (see above) | |
p.join_details(detail, "customerid", "customerid") | |
# Naive removal of duplicates | |
p.distinct(["customerid", "shipdate_year", "firstname", "lastname", "emailaddress"]) | |
# Store result into a `report` table, replace the table if exists | |
p.create("target", "report", replace=True) | |
p.run() | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bubbles import Pipeline, open_store | |
stores = { | |
"source": open_store("csv", "data/source", encoding="utf16", infer_fields=True), | |
"target": open_store("csv", "data/output", role="target") | |
p = Pipeline(stores=stores) | |
# Set source for customers | |
detail = p.fork() | |
detail.source("source", "Customers") | |
# Get orders source | |
p.source("source", "Orders") | |
# Convert ship date to date format and get only shipped orders | |
p.string_to_date("shipdate", fmt="%m/%d/%Y %H:%M:%S %p") | |
p.filter_not_empty("shipdate") | |
# Filter by year | |
p.split_date("shipdate") | |
p.filter_by_range("shipdate_year", 2011, 2013) | |
# Join customer details (see above) | |
p.join_details(detail, "customerid", "customerid") | |
# Naive removal of duplicates | |
p.distinct(["customerid", "shipdate_year", "firstname", "lastname", "emailaddress"]) | |
# Store result into a `report.csv` file, replace the file if exists | |
p.create("target", "report", replace=True) | |
p.run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DEBUG step 0: evaluate soure Orders in source | |
DEBUG step 1: evaluate operation string_to_date | |
DEBUG calling string_to_date(rows) | |
WARNING operation string_to_date is experimental | |
INFO called string_to_date(rows) | |
DEBUG step 2: evaluate soure Customers in source | |
DEBUG step 3: evaluate operation filter_not_empty | |
DEBUG calling filter_not_empty(rows) | |
INFO called filter_not_empty(rows) | |
DEBUG step 4: evaluate operation split_date | |
DEBUG calling split_date(rows) | |
INFO called split_date(rows) | |
DEBUG step 5: evaluate operation filter_by_range | |
DEBUG calling filter_by_range(rows) | |
INFO called filter_by_range(rows) | |
DEBUG step 6: evaluate operation join_details | |
DEBUG calling join_details(rows, rows) | |
INFO called join_details(rows, rows) | |
DEBUG step 7: evaluate operation distinct | |
DEBUG calling distinct(rows) | |
INFO called distinct(rows) | |
DEBUG step 8: evaluate <bubbles.graph.CreateObjectNode object at 0x10332b6d0> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DEBUG step 0: evaluate soure Orders in source | |
DEBUG step 1: evaluate soure Customers in source | |
DEBUG step 2: evaluate operation string_to_date | |
DEBUG calling string_to_date(rows) | |
WARNING operation string_to_date is experimental | |
INFO called string_to_date(rows) | |
DEBUG step 3: evaluate <bubbles.graph.CreateObjectNode object at 0x1055d0650> | |
DEBUG append_from: appending rows into customers | |
DEBUG step 4: evaluate <bubbles.graph.CreateObjectNode object at 0x1059ec150> | |
DEBUG append_from: appending rows into orders | |
DEBUG step 5: evaluate operation filter_not_empty | |
DEBUG calling filter_not_empty(sql) | |
INFO called filter_not_empty(sql) | |
DEBUG step 6: evaluate operation split_date | |
DEBUG calling split_date(sql) | |
INFO called split_date(sql) | |
DEBUG step 7: evaluate operation filter_by_range | |
DEBUG calling filter_by_range(sql) | |
INFO called filter_by_range(sql) | |
DEBUG step 8: evaluate operation join_details | |
DEBUG calling join_details(sql, sql) | |
INFO called join_details(sql, sql) | |
DEBUG step 9: evaluate operation distinct | |
DEBUG calling distinct(sql) | |
INFO called distinct(sql) | |
DEBUG step 10: evaluate <bubbles.graph.CreateObjectNode object at 0x1059ec210> | |
DEBUG append_from: composing into report |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment