Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save gbraccialli/e201123f58dcee46073d34a3e2b6cca9 to your computer and use it in GitHub Desktop.
Save gbraccialli/e201123f58dcee46073d34a3e2b6cca9 to your computer and use it in GitHub Desktop.
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.{NewHadoopRDD}
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.Window
val fc = classOf[TextInputFormat]
val kc = classOf[LongWritable]
val vc = classOf[Text]
val path = "Desktop/samuca/"
val text = sc.newAPIHadoopFile(path, fc ,kc, vc, sc.hadoopConfiguration)
//old version, with line number
//val rddWithFileName = text.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
// .mapPartitionsWithInputSplit((inputSplit, iterator) => {
// val file = inputSplit.asInstanceOf[FileSplit].getPath.toString
// iterator.map(tup => (file.substring(file.lastIndexOf("/")+1), tup._2))
// }
//)
//val rddWithFileAndRow = rddWithFileName.zipWithUniqueId.map{case (tup, index) => Row(Seq(tup._1, index) ++ tup._2.toString.split(" ") :+ " " :+ " " :_*)}
//new version, byte offset
val rddWithFileName = text.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
.mapPartitionsWithInputSplit((inputSplit, iterator) => {
val file = inputSplit.asInstanceOf[FileSplit].getPath.toString
iterator.map(tup => (file.substring(file.lastIndexOf("/")+1), tup._1.get, tup._2))
}
)
val rddWithFileAndRow = rddWithFileName.map(tup => Row(Seq(tup._1, tup._2) ++ tup._3.toString.split(" ") :+ " " :+ " " :_*))
val schema = StructType(StructField("file_name", StringType) :: StructField("offset", LongType) :: StructField("field1", StringType) :: StructField("field2", StringType) :: StructField("field3", StringType) :: Nil)
val df = spark.createDataFrame(rddWithFileAndRow, schema)
//df.show(false)
val df2 = df
.withColumn("new_session", when($"field2" === "Starting_Code_Section",1).otherwise(0))
.withColumn("session_id", sum($"new_session").over(Window.partitionBy($"file_name").orderBy($"offset")))
.withColumn("session_id", concat($"file_name", lit("_") , $"session_id"))
df2.sort("offset").show(1000,false)
val df2 = df
//option1
.withColumn("start_code_section", when($"field2" === "Starting_Code_Section",$"field3"))
.withColumn("start_code_section_previous", last($"start_code_section", true).over(Window.partitionBy($"file_name").orderBy($"offset")))
//option2
.withColumn("new_session", when($"field2" === "Starting_Code_Section",1).otherwise(0))
.withColumn("session_id", sum($"new_session").over(Window.partitionBy($"file_name").orderBy($"offset")))
.withColumn("session_id", concat($"file_name", lit("_") , $"session_id"))
.withColumn("field3_input", first($"field3").over(Window.partitionBy($"session_id").orderBy($"offset")))
df2.sort("offset").show(1000,false)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment