Skip to content

Instantly share code, notes, and snippets.

@gbraccialli
Last active June 8, 2017 02:03
Show Gist options
  • Save gbraccialli/14711bd4132a3332b05dd7712b729b01 to your computer and use it in GitHub Desktop.
Save gbraccialli/14711bd4132a3332b05dd7712b729b01 to your computer and use it in GitHub Desktop.
case class Test (typet: String, value: Int)
val test = List(Test("B", 99), Test("B", 2), Test("B", 35), Test("A", 6), Test("A", 3))
val rdd = sc.parallelize(test)
val df = rdd.toDF
var i = 0
var previous = ""
def udf_buffer(in: String): Option[Int] =
{
if (in.equals(previous))
i+=1
else
i=0
previous = in
Some(i)
}
val udfb = udf(udf_buffer _)
spark.conf.set("spark.sql.shuffle.partitions","1")
df.repartition($"typet").sortWithinPartitions($"value").withColumn("new", udfb(col("typet"))).show
spark.conf.set("spark.sql.shuffle.partitions","100")
df.repartition($"typet").sortWithinPartitions($"value").withColumn("new", udfb(col("typet"))).show
var previous_key = ""
var previous_end = ""
var id = 1
def overlap_id(key: String, start: String, end: String): Option[Int] =
{
if (key.equals(previous_key)){
if (start > previous_end){
id+=1
}
}else{
id=1
}
previous_key = key
if (end > previous_end){
previous_end = end
}
Some(id)
}
val udf_overlap_id = udf(overlap_id _)
val xxx = xxxx
.repartition(col("ID"))
.sortWithinPartitions(col("START_DATE"), col("END_DATE"))
.withColumn("overlap_id", udf_overlap_id(col("ID"),col("START_DATE"), col("END_DATE")))
.groupBy(col("PTID"),col("overlap_id"))
.agg(
min("START_DATE").alias("START_DATE"),
max("END_DATE").alias("END_DATE")
)
.drop("overlap_id")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment