Skip to content

Instantly share code, notes, and snippets.

@dgadiraju
Last active March 7, 2017 05:34
Show Gist options
  • Save dgadiraju/94b05e131e7ef99199cb6b4432f33823 to your computer and use it in GitHub Desktop.
Save dgadiraju/94b05e131e7ef99199cb6b4432f33823 to your computer and use it in GitHub Desktop.
# base directory of retail_db and output path are passed as arguments
# spark-submit daily_revenue.py /Users/itversity/Research/data/retail_db /Users/itversity/Research/revenue_per_day --master local
from pyspark import SparkContext, SparkConf
import sys
conf = SparkConf().setAppName("Daily Revenue").setMaster("local")
sc = SparkContext(conf=conf)
orders = sc.textFile(sys.argv[1] + "/orders")
ordersFiltered = orders.filter(lambda rec: rec.split(",")[3] == "COMPLETE" or rec.split(",")[3] == "CLOSED")
ordersFilteredMap = ordersFiltered.map(lambda rec: (int(rec.split(",")[0]), rec.split(",")[1]))
orderItems = sc.textFile(sys.argv[1] + "/order_items")
orderItemsMap = orderItems.map(lambda rec: (int(rec.split(",")[1]), float(rec.split(",")[4])))
ordersJoin = ordersFilteredMap.join(orderItemsMap)
ordersJoinMap = ordersJoin.map(lambda rec: rec[1])
ordersJoinMap = ordersJoin.map(lambda rec: rec[1])
orderRevenuePerDay = ordersJoinMap.reduceByKey(lambda agg, val: agg + val)
orderRevenuePerDay.\
map(lambda rec: rec[0] + "\t" + str(rec[1])).\
saveAsTextFile(sys.argv[2])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment