Skip to content

Instantly share code, notes, and snippets.

@gbraccialli
Created December 8, 2015 02:39
Show Gist options
  • Save gbraccialli/e63b60aead01c38e6ba2 to your computer and use it in GitHub Desktop.
Save gbraccialli/e63b60aead01c38e6ba2 to your computer and use it in GitHub Desktop.
///usr/hdp/2.3.2.1-12/spark/bin/spark-shell --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};
val ssc = new StreamingContext(sc, Seconds(2))
val topics = Set("cdr")
val brokers = "sandbox.hortonworks.com:6667"
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
val messagesW = messages.window(Seconds(600), Seconds(6))
val lines = messagesW.map(_._2).map(_.split(","))
case class CDR(session_id:String, sim_card_id:String, phone_number:String, record_opening_time:String, duration:String, cell_id:String, network_type:String, drop_reason:String)
lines.map(
line=>CDR(line(0), line(1), line(2), line(3), line(4), line(5), line(6), line(7))
).foreachRDD{
rdd=>rdd.toDF().registerTempTable("cdr")
val counts =
sqlContext.sql("select cell_id, count(*) as total from cdr group by cell_id")
counts.show()
}
ssc.start()
ssc.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment