Skip to content

Instantly share code, notes, and snippets.

@Stiivi
Created July 17, 2013 05:22
Show Gist options
  • Save Stiivi/6017889 to your computer and use it in GitHub Desktop.
Save Stiivi/6017889 to your computer and use it in GitHub Desktop.
Bubbles recipe: Aggregate over window - assign an aggregated value over a window specified by a key (might be compound) to every row. Current example assigns latest purchase year of a customer to every order. Source in: https://github.com/Stiivi/bubbles/blob/master/examples
from bubbles import Pipeline, FieldList, data_object, open_store
# Sample order data with fields:
fields = FieldList(
["id", "integer"],
["customer_id", "integer"],
["year", "integer"],
["amount", "integer"]
)
data = [
[1, 1, 2009, 10],
[2, 1, 2010, 20],
[3, 1, 2011, 20],
[4, 1, 2012, 50],
[5, 2, 2010, 50],
[6, 2, 2012, 40],
[7, 3, 2011, 100],
[8, 3, 2012, 150],
[9, 3, 2013, 120]
]
source = data_object("iterable", data, fields)
# Stores for SQL alternative, if enabled (see below)
stores = { "default": open_store("sql","sqlite:///") }
#
# Create the pipeline
#
p = Pipeline(stores=stores)
p.source_object(source)
# Uncomment this to get SQL operations instead of python iterator
# p.create("default", "data")
# Find last purchase date
last_purchase = p.fork()
last_purchase.aggregate(["customer_id"],
[["year", "max"]],
include_count=False)
last_purchase.rename_fields({"year_max": "last_purchase_year"})
p.join_details(last_purchase, "customer_id", "customer_id")
p.pretty_print()
p.run()
DEBUG step 0: evaluate object <bubbles.objects.IterableDataSource object at 0x10ebb6b10>
DEBUG step 1: evaluate operation aggregate
DEBUG retaining consumable object <bubbles.objects.IterableDataSource object at 0x10ebb6b10>. it will be
DEBUG calling aggregate(rows)
INFO called aggregate(rows)
DEBUG step 2: evaluate operation rename_fields
DEBUG calling rename_fields(*)
WARNING operation rename_fields is experimental
DEBUG calling field_filter(rows)
INFO called field_filter(rows)
INFO called rename_fields(*)
DEBUG step 3: evaluate operation join_details
DEBUG calling join_details(rows, rows)
INFO called join_details(rows, rows)
DEBUG step 4: evaluate operation pretty_print
DEBUG calling pretty_print(records)
+--+-----------+----+------+------------------+
|id|customer_id|year|amount|last_purchase_year|
+--+-----------+----+------+------------------+
| 1| 1|2009| 10| 2012|
| 2| 1|2010| 20| 2012|
| 3| 1|2011| 20| 2012|
| 4| 1|2012| 50| 2012|
| 5| 2|2010| 50| 2012|
| 6| 2|2012| 40| 2012|
| 7| 3|2011| 100| 2013|
| 8| 3|2012| 150| 2013|
| 9| 3|2013| 120| 2013|
+--+-----------+----+------+------------------+
INFO called pretty_print(records)
DEBUG step 0: evaluate object <bubbles.objects.IterableDataSource object at 0x105466b10>
DEBUG step 1: evaluate create data in default
DEBUG append_from: appending rows into data
DEBUG step 2: evaluate operation aggregate
DEBUG calling aggregate(sql)
INFO called aggregate(sql)
DEBUG step 3: evaluate operation rename_fields
DEBUG calling rename_fields(*)
WARNING operation rename_fields is experimental
DEBUG calling field_filter(sql)
INFO called field_filter(sql)
INFO called rename_fields(*)
DEBUG step 4: evaluate operation join_details
DEBUG calling join_details(sql, sql)
INFO called join_details(sql, sql)
DEBUG step 5: evaluate operation pretty_print
DEBUG calling pretty_print(records)
+--+-----------+----+------+------------------+
|id|customer_id|year|amount|last_purchase_year|
+--+-----------+----+------+------------------+
| 1| 1|2009| 10| 2012|
| 2| 1|2010| 20| 2012|
| 3| 1|2011| 20| 2012|
| 4| 1|2012| 50| 2012|
| 5| 2|2010| 50| 2012|
| 6| 2|2012| 40| 2012|
| 7| 3|2011| 100| 2013|
| 8| 3|2012| 150| 2013|
| 9| 3|2013| 120| 2013|
+--+-----------+----+------+------------------+
INFO called pretty_print(records)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment