Skip to content

Instantly share code, notes, and snippets.

@jwkidd3
Created December 6, 2017 03:08
Show Gist options
  • Save jwkidd3/aae45bbc8c4629f967220ef2df0ba536 to your computer and use it in GitHub Desktop.
Save jwkidd3/aae45bbc8c4629f967220ef2df0ba536 to your computer and use it in GitHub Desktop.
// First we're going to import the classes we need
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.avro.generic.GenericRecord
import parquet.hadoop.ParquetInputFormat
import parquet.avro.AvroReadSupport
import org.apache.spark.rdd.RDD
// Then we create RDD's for 2 of the files we imported from MySQL with Sqoop
// RDD's are Spark's data structures for working with distributed datasets
def rddFromParquetHdfsFile(path: String): RDD[GenericRecord] = {
val job = new Job()
FileInputFormat.setInputPaths(job, path)
ParquetInputFormat.setReadSupportClass(job,
classOf[AvroReadSupport[GenericRecord]])
return sc.newAPIHadoopRDD(job.getConfiguration,
classOf[ParquetInputFormat[GenericRecord]],
classOf[Void],
classOf[GenericRecord]).map(x => x._2)
}
val warehouse = "hdfs://{{cluster_data.manager_node_hostname}}/user/hive/warehouse/"
val order_items = rddFromParquetHdfsFile(warehouse + "order_items");
val products = rddFromParquetHdfsFile(warehouse + "products");
// Next, we extract the fields from order_items and products that we care about
// and get a list of every product, its name and quantity, grouped by order
val orders = order_items.map { x => (
x.get("order_item_product_id"),
(x.get("order_item_order_id"), x.get("order_item_quantity")))
}.join(
products.map { x => (
x.get("product_id"),
(x.get("product_name")))
}
).map(x => (
scala.Int.unbox(x._2._1._1), // order_id
(
scala.Int.unbox(x._2._1._2), // quantity
x._2._2.toString // product_name
)
)).groupByKey()
// Finally, we tally how many times each combination of products appears
// together in an order, then we sort them and take the 10 most common
val cooccurrences = orders.map(order =>
(
order._1,
order._2.toList.combinations(2).map(order_pair =>
(
if (order_pair(0)._2 < order_pair(1)._2)
(order_pair(0)._2, order_pair(1)._2)
else
(order_pair(1)._2, order_pair(0)._2),
order_pair(0)._1 * order_pair(1)._1
)
)
)
)
val combos = cooccurrences.flatMap(x => x._2).reduceByKey((a, b) => a + b)
val mostCommon = combos.map(x => (x._2, x._1)).sortByKey(false).take(10)
// We print our results, 1 per line, and exit the Spark shell
println(mostCommon.deep.mkString("\n"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment