Skip to content

Instantly share code, notes, and snippets.

@tilakpatidar
Last active September 7, 2017 09:03
Show Gist options
  • Save tilakpatidar/5312192fd00f9c1b07cb8550a516671d to your computer and use it in GitHub Desktop.
Save tilakpatidar/5312192fd00f9c1b07cb8550a516671d to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.functions.udf
val csv = spark.read.format("csv").option("header", true).load("/Users/tilak/Downloads/Pam/SalesAnalysis/data/store_sales_unified_2017.csv")
val uniqueKey: (String, String, String, String) => String = (x, y ,z , v) => x + "_" + y + "_" + z + "_" + v
val someFn = udf(uniqueKey)
val newData = csv.withColumn("unique", someFn(csv.col("receipt_id"), csv.col("cash_register_id"), csv.col("sale_time"), csv.col("date")))
val countArticles = newData.groupBy("unique", "article_id").count()
val sameBill = countArticles.crossJoin(countArticles).filter(x => x.getString(0) == x.getString(3) && x.getString(1) != x.getString(4))
val newNames = sameBill.columns.toList.zipWithIndex.map((x) => x._1 + "_" + x._2)
val combineArticle: (String, String) => String = (x, y) => if(x.toLong >= y.toLong) x + "|" + y else y + "|" + x
val someFn1 = udf(combineArticle)
val newSameBill = sameBill.toDF(newNames: _*)
val articlesConcatSort = newSameBill.withColumn("concat_article_id", someFn1(newSameBill.col("article_id_1"), newSameBill.col("article_id_4"))).groupBy("concat_article_id").count()
val separateArticles: (String) => List[String] = (x) => x.split("\\|").toList
val someFn2 = udf(separateArticles)
val additionColumn = articlesConcatSort.withColumn("article_split", someFn2(articlesConcatSort.col("concat_article_id"))).withColumn("article1", col("article_split")(0)).withColumn("article2", col("article_split")(1))
val finalFinal = additionColumn.drop("concat_article_id").drop("article_split")
finalFinal.coalesce(1).write.option("header", "true").csv("final.csv")
// second approach
import org.apache.spark.sql.functions.udf
import spark.sessionState.conf
conf.setConfString("spark.sql.pivotMaxValues", "" + Int.MaxValue)
val csv = spark.read.format("csv").option("header", true).load("/Users/tilak/Downloads/Pam/SalesAnalysis/data/store_sales_unified_2017.csv")
val uniqueKey: (String, String, String, String) => String = (x, y, z, v) => x + "_" + y + "_" + z + "_" + v
val someFn = udf(uniqueKey)
val newData = csv.withColumn("unique", someFn(csv.col("receipt_id"), csv.col("cash_register_id"), csv.col("sale_time"), csv.col("date")))
val countArticles = newData.groupBy("unique", "article_id").count()
var articles = countArticles.select("article_id").distinct()
val articleIds = articles.collect.map(x => x(0))
val putBoolean: (Long) => String = (x) => if (x > 0) "T" else "F"
val someFn3 = udf(putBoolean)
val pivoted = countArticles.groupBy("unique").pivot("article_id", articleIds).agg(someFn3(sum("count"))).na.fill("F")
pivoted.coalesce(1).write.option("header", "true").csv("final235.csv")
//third approach
import org.apache.spark.sql.functions.udf
val csv = spark.read.format("csv").option("header", true).load("/Users/tilak/Downloads/Pam/SalesAnalysis/data/store_sales_unified_2017.csv")
val uniqueKey: (String, String, String, String) => String = (x, y, z, v) => x + "_" + y + "_" + z + "_" + v
val someFn = udf(uniqueKey)
val newData = csv.withColumn("unique", someFn(csv.col("receipt_id"), csv.col("cash_register_id"), csv.col("sale_time"), csv.col("date")))
val countArticles = newData.groupBy("unique", "article_id").count()
val articles = countArticles.select("article_id").distinct()
val articleIds = articles.collect.map(x => x(0))
val r = 0 to 16000 by 1000
r.par.map(n => {
val articleIdss = articleIds.slice(n, n + 1000)
val putBoolean: (Long) => String = (x) => if (x > 0) "T" else "F"
val someFn3 = udf(putBoolean)
val pivoted = countArticles.groupBy("unique").pivot("article_id", articleIdss).agg(someFn3(sum("count"))).na.fill("F")
pivoted.coalesce(1).write.option("header", "true").csv( n + ".csv")
})
r.par.map((n) => {
spark.read.format("csv").option("header", true).load(n + ".csv")
}).reduce((csv1, csv2) => {
csv1.join(csv2, "unique")
}).coalesce(1).write.option("header", "true").csv("final.csv")
//third approach on cluster
import org.apache.spark.sql.functions.udf
val csv = sqlContext.read.format("csv").option("header", "true").load("/tmp/store_sales_unified_2017.csv")
val uniqueKey: (String, String, String, String) => String = (x, y, z, v) => x + "_" + y + "_" + z + "_" + v
val someFn = udf(uniqueKey)
val newData = csv.withColumn("unique", someFn(csv.col("receipt_id"), csv.col("cash_register_id"), csv.col("sale_time"), csv.col("date")))
val countArticles = newData.groupBy("unique", "article_id").count()
val articles = countArticles.select("article_id").distinct()
val articleIds = articles.collect.map(x => x(0));
val r = 0 to 16000 by 1000
r.par.map(n => {
val articleIdss = articleIds.slice(n, n + 1000)
val putBoolean: (Long) => String = (x) => if (x > 0) "T" else "F"
val someFn3 = udf(putBoolean)
val pivoted = countArticles.groupBy("unique").pivot("article_id", articleIdss).agg(someFn3(sum("count"))).na.fill("F")
pivoted.coalesce(1).write.option("header", "true").format("csv").save("/tmp/"+n+".csv")
})
//fourth approach on cluster
import org.apache.spark.sql.functions.udf
val csv = sqlContext.read.format("csv").option("header", "true").load("/tmp/store_sales_unified_2017.csv")
val uniqueKey: (String, String, String, String) => String = (x, y, z, v) => x + "_" + y + "_" + z + "_" + v
val someFn = udf(uniqueKey)
val newData = csv.withColumn("unique", someFn(csv.col("receipt_id"), csv.col("cash_register_id"), csv.col("sale_time"), csv.col("date")))
val countArticles = newData.groupBy("unique", "article_id").count()
val articles = countArticles.select("article_id").distinct()
val articleIds = articles.collect.map(x => x(0));
val r = 0 to 16000 by 1000
r.par.map(n => {
val articleIdss = articleIds.slice(n, n + 1000)
val putBoolean: (Long) => String = (x) => if (x > 0) "T" else "F"
val someFn3 = udf(putBoolean)
val pivoted = countArticles.groupBy("unique").pivot("article_id", articleIdss).agg(someFn3(sum("count"))).na.fill("F")
pivoted.collect
}).reduce((a,b) => {
a.join(b, a("unique") <=> b("unique"))
}).coalesce(1).write.option("header", "true").format("csv").save("/tmp/"+n+".csv")
//fourth approach
import org.apache.spark.sql.functions.udf
val csv = spark.read.format("csv").option("header", true).load("/Users/tilak/Downloads/Pam/SalesAnalysis/data/sample.csv")
val uniqueKey: (String, String, String, String) => String = (x, y, z, v) => x + "_" + y + "_" + z + "_" + v
val someFn = udf(uniqueKey)
val newData = csv.withColumn("unique", someFn(csv.col("receipt_id"), csv.col("cash_register_id"), csv.col("sale_time"), csv.col("date")))
val countArticles = newData.groupBy("unique", "article_id").count()
val articles = countArticles.select("article_id").distinct()
val articleIds = articles.collect.map(x => x(0))
val r = 0 to 16000 by 1000
r.map(n => {
val articleIdss = articleIds.slice(n, n + 1000)
val putBoolean: (Long) => String = (x) => if (x > 0) "T" else "F"
val someFn3 = udf(putBoolean)
countArticles.groupBy("unique").pivot("article_id", articleIdss).agg(someFn3(sum("count"))).na.fill("F")
}).reduce((a,b) => {
a.join(b,"unique")
}).coalesce(1).write.option("header", "true").csv("final.csv")
// second approach
import org.apache.spark.sql.functions.udf
import spark.sessionState.conf
conf.setConfString("spark.sql.pivotMaxValues", "" + Int.MaxValue)
val csv = spark.read.format("csv").option("header", true).load("/Users/tilak/Downloads/Pam/SalesAnalysis/data/store_sales_unified_2017.csv")
val uniqueKey: (String, String, String, String) => String = (x, y, z, v) => x + "_" + y + "_" + z + "_" + v
val someFn = udf(uniqueKey)
val newData = csv.withColumn("unique", someFn(csv.col("receipt_id"), csv.col("cash_register_id"), csv.col("sale_time"), csv.col("date")))
val countArticles = newData.groupBy("unique", "article_id").count()
val articles = countArticles.select("article_id").distinct()
val articleIds = articles.collect.map(x => x(0))
val putBoolean: (Long) => String = (x) => if (x > 0) "T" else "F"
val someFn3 = udf(putBoolean)
val pivoted = countArticles.groupBy("unique").pivot("article_id", articleIds).agg(someFn3(sum("count"))).na.fill("F")
pivoted.coalesce(1).write.option("header", "true").csv("final235.csv")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment