Kafka ,File Systems(CSV,Delimiter,Parquet,orc,avro,json),Socket
Kafka ,Console,meory,foreach
#IMP: Schema Definition is manadatory to process the data
| package org.ajp.kafkademo | |
| import org.apache.log4j.Logger | |
| import org.apache.spark.sql.SparkSession | |
| import org.apache.spark.sql.functions._ | |
| import org.apache.spark.sql.streaming.Trigger | |
| import org.apache.spark.sql.types._ | |
| object kafkademo extends Serializable{ | |
| @transient lazy val logger: Logger = Logger.getLogger(getClass.getName) |
| package org.ajp.kafkademo | |
| import org.apache.log4j.Logger | |
| import org.apache.spark.sql.SparkSession | |
| import org.apache.spark.sql.functions._ | |
| import org.apache.spark.sql.streaming.Trigger | |
| import org.apache.spark.sql.types._ | |
| object kafkademo extends Serializable{ | |
| @transient lazy val logger: Logger = Logger.getLogger(getClass.getName) |
| package org.ajp.kafkaserdeserdemo | |
| import org.apache.log4j.Logger | |
| import org.apache.spark.sql.SparkSession | |
| import org.apache.spark.sql.functions._ | |
| import org.apache.spark.sql.types._ | |
| import org.apache.spark.sql.avro.functions.to_avro | |
| object kafkaserdeser extends Serializable { | |
| @transient lazy val logger: Logger = Logger.getLogger(getClass.getName) |
| // Imports | |
| import org.apache.spark.sql.Row | |
| import org.apache.spark.sql.types._ | |
| // Generate sample data | |
| val rows = Seq((1,"Alpha","10/12/1990 12:10:10"), | |
| (2,"Beta","11/12/1990 13:10:10"), | |
| (3,"Tango","12/12/1990 14:10:10")) | |
| // Define Schema for the rows |
| import org.apache.spark.sql.Row | |
| // Generate a test DataFrame with 2 rows | |
| val df = Seq((1,"Red Green"),(2,"Blue White")).toDF("id","colors") | |
| df.show | |
| // Show ONLY the colors | |
| df.map{ case Row(id:Int,colors:String) => colors}.show | |
| // Create a row with both fields |
| # Script to add a header and static values to CSV file | |
| # Header | |
| awk '{if(NR==1){$0="env,"$0; print $0} ;if(NR!=1){print $0}}' input.csv > output.csv | |
| # Add rows | |
| awk -F"," 'BEGIN { OFS = "," } {$1="2012-02-29 16:13:00"; print}' input.csv > output.csv | |
| awk -F"," 'BEGIN { OFS = "," } {$0="XXXXX,"$0; print $0}' input.csv > output.csv |
| ################################################################################################ | |
| # Listing files | |
| # Recursively lists all files within the current directory | |
| find . | |
| # Recursively list all files ending with extension .txt | |
| find . | grep ".txt" | |
| # List files within current directory with a specific extension |
| // ============================================================ | |
| // Generate a test KeyValue Pair | |
| spark.conf.set("spark.sql.shuffle.partitions",2) | |
| val num = Seq((2000,10),(2001,20),(2000,20),(2002,30),(2003,30),(2004,50),(2004,100),(2004,250),(2005,250),(2005,25), | |
| (2006,150),(2006,225),(2007,250),(2007,125),(2008,250),(2009,25),(2010,250),(2010,125)) | |
| val rdd = sc.parallelize(num) | |
| val prdd = rdd.reduceByKey(_ + _).repartition(2) | |
| val srdd = rdd.sortByKey().repartition(2) |
| // Input | |
| // studentid,coursename | |
| // 01,CHEM|PHY|MATH | |
| // 02,CHEM|PHY | |
| // 03,MATH|COMP|CHEM | |
| // 04,MATH|PHY | |
| // 05,HIST|MARKT|BIOL | |
| // 06,BIOL|PHY | |
| // 07,BOTONY|ZOOL | |
| // 08,BOTONY|COMP |