Created
December 6, 2017 03:08
-
-
Save jwkidd3/aae45bbc8c4629f967220ef2df0ba536 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// First we're going to import the classes we need | |
import org.apache.hadoop.mapreduce.Job | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat | |
import org.apache.avro.generic.GenericRecord | |
import parquet.hadoop.ParquetInputFormat | |
import parquet.avro.AvroReadSupport | |
import org.apache.spark.rdd.RDD | |
// Then we create RDD's for 2 of the files we imported from MySQL with Sqoop | |
// RDD's are Spark's data structures for working with distributed datasets | |
def rddFromParquetHdfsFile(path: String): RDD[GenericRecord] = { | |
val job = new Job() | |
FileInputFormat.setInputPaths(job, path) | |
ParquetInputFormat.setReadSupportClass(job, | |
classOf[AvroReadSupport[GenericRecord]]) | |
return sc.newAPIHadoopRDD(job.getConfiguration, | |
classOf[ParquetInputFormat[GenericRecord]], | |
classOf[Void], | |
classOf[GenericRecord]).map(x => x._2) | |
} | |
val warehouse = "hdfs://{{cluster_data.manager_node_hostname}}/user/hive/warehouse/" | |
val order_items = rddFromParquetHdfsFile(warehouse + "order_items"); | |
val products = rddFromParquetHdfsFile(warehouse + "products"); | |
// Next, we extract the fields from order_items and products that we care about | |
// and get a list of every product, its name and quantity, grouped by order | |
val orders = order_items.map { x => ( | |
x.get("order_item_product_id"), | |
(x.get("order_item_order_id"), x.get("order_item_quantity"))) | |
}.join( | |
products.map { x => ( | |
x.get("product_id"), | |
(x.get("product_name"))) | |
} | |
).map(x => ( | |
scala.Int.unbox(x._2._1._1), // order_id | |
( | |
scala.Int.unbox(x._2._1._2), // quantity | |
x._2._2.toString // product_name | |
) | |
)).groupByKey() | |
// Finally, we tally how many times each combination of products appears | |
// together in an order, then we sort them and take the 10 most common | |
val cooccurrences = orders.map(order => | |
( | |
order._1, | |
order._2.toList.combinations(2).map(order_pair => | |
( | |
if (order_pair(0)._2 < order_pair(1)._2) | |
(order_pair(0)._2, order_pair(1)._2) | |
else | |
(order_pair(1)._2, order_pair(0)._2), | |
order_pair(0)._1 * order_pair(1)._1 | |
) | |
) | |
) | |
) | |
val combos = cooccurrences.flatMap(x => x._2).reduceByKey((a, b) => a + b) | |
val mostCommon = combos.map(x => (x._2, x._1)).sortByKey(false).take(10) | |
// We print our results, 1 per line, and exit the Spark shell | |
println(mostCommon.deep.mkString("\n")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment