Skip to content

Instantly share code, notes, and snippets.

View mannharleen's full-sized avatar
👽

Harleen Mann mannharleen

👽
View GitHub Profile
@mannharleen
mannharleen / sparkStreaming-wordcount-window.scala
Created September 3, 2017 08:10
Wordcount from an input stream within 1) a batch 2) a window
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.{SparkConf, SparkContext}
/*
Streaming batch size = 10 seconds
Window size = 20 secs
Window slide interval = 10 secs
*/
@mannharleen
mannharleen / ssc-checkpoint-basics.scala
Created September 3, 2017 09:39
SparkStreaming context checkpoint
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.{SparkConf, SparkContext}
val context = StreamingContext.getOrCreate("file:///c:\\checkpoint",() => {
val ssc = new StreamingContext(new SparkConf().setAppName("aa").setMaster("local[4]"),Seconds(10))
ssc.checkpoint("file:///c:\\checkpoint")
ssc
})
@mannharleen
mannharleen / kafka-on-windows.cmd
Last active September 4, 2017 12:40
Run kafka on windows. This example runs a console based producer and consumer as well to test the kafka broker
>cd C:\kafka_2.11-0.11.0.0\bin\windows
//start zookeeper server
> zookeeper-server-start.bat C:\kafka_2.11-0.11.0.0\config\zookeeper.properties
//start kafka server
> kafka-server-start.bat C:\kafka_2.11-0.11.0.0\config\server.properties
//create topic
> kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
@mannharleen
mannharleen / sparkStreaming-kafka-withReceiver-basics.scala
Last active September 3, 2017 17:31
Spark Kafka integration: Based on "receiver based approach" as opposed to "direct approach"
/*
Based on "receiver based approach" as opposed to "direct approach"
*/
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.streaming.{StreamingContext, Seconds}
val ssc = new StreamingContext(new SparkConf().setAppName("sbtapp").setMaster("local[*]"),Seconds(10))
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "testgroupid", Map("test" -> 1))
@mannharleen
mannharleen / sparkStreaming-kafka-direct-basics.scala
Created September 3, 2017 17:30
Spark Kafka integration: Based on "direct approach"
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.streaming.{StreamingContext, Seconds}
import kafka.serializer.StringDecoder
val ssc = new StreamingContext(new SparkConf().setAppName("sbtapp").setMaster("local[*]"),Seconds(10))
val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder, StringDecoder](ssc, Map("bootstrap.servers" -> "localhost:9092"),Set("test"))
kafkaStream.map(_._2).print() //get the value part of the message
ssc.start
ssc.stop(true)
@mannharleen
mannharleen / spark-kafka-2receivers.scala
Created September 4, 2017 16:03
2 Spark receivers to read from 1 kafka topic that has 2 partitions. Each receiver is mapped to read from a single partition
/* Pre steps
//start zookeper
> zookeeper-server-start.bat C:\kafka_2.11-0.11.0.0\config\zookeeper.properties
//start kafka server
> kafka-server-start.bat C:\kafka_2.11-0.11.0.0\config\server.properties
//create a topic with 2 partitions
> kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-2partitions
/*
Sqoop exercise:
1. Connect to mysql DB and assertain that you have access to retail_db.departments
2. Import the table into hdfs
3. Add a row in mysql DB and import incremental data into hdfs
4. create a new DB in mysql and export data in hdfs to newDB.departments (insert only)
5. update hdfs file and export to update data in mysql DB
6. update hdfs file and export to UPSERT data in mysql DB
7. import from mysqlDB into hive
*/
@mannharleen
mannharleen / spark all file format types and compression codecs.scala
Created September 9, 2017 14:59
Text file, json, csv, sequence, parquet, ORC, Avro, newHadoopAPI
/*
Assume that the following "rdd" exists
val rdd = sc.parallelize(Array((1,1), (0,2), (1,3), (0,4), (1,5), (0,6), (1,7), (0,8), (1,9), (0,10)))
type of rdd -> org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1]
rdd.collect -> Array[(Int, Int)] = Array((1,1), (0,2), (1,3), (0,4), (1,5), (0,6), (1,7), (0,8), (1,9), (0,10))
*/
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.SQLContext
@mannharleen
mannharleen / hive file type conversion.sql
Created September 10, 2017 13:46
Convert file types using hive
-- create a avro table
create table t_avro (col1 Int) stored as AVRO;
insert into table t_avro values (1),(2),(3),(4);
-- *INFO* If we want to create avro with custom schema file (avsc) do the following
--Option1: create table t_avro (col1 Int) stored as AVRO tblproperties('avro.schema.url'='/user/hive/schemas/t_avro/t_avro.avsc')
--Option2: create table t_avro (col1 Int) stored as AVRO tblproperties('avro.schema.literal'='{
-- "name": "t_avro_schema",
-- "type": "record",
@mannharleen
mannharleen / sparkStreaming-flume-direct-basics.scala
Last active September 12, 2017 07:04
Spark flume integration: Based on "pull based (aka polling) approach"
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.streaming.{StreamingContext, Seconds}
val conf = new org.apache.spark.SparkConf().setAppName("a")
val ssc = new StreamingContext(conf, Seconds(10))
val dstream = createPollingStream(ssc,"quickstart.cloudera",19999)
//notice that we are using 'polling'stream
dstream.print