Skip to content

Instantly share code, notes, and snippets.

View girisandeep's full-sized avatar

Sandeep Giri girisandeep

View GitHub Profile
@girisandeep
girisandeep / apache_log_topnlinks.scala
Last active January 7, 2021 07:37
A example program to find top N IP Addresses from Apache Logs using Spark. This is part of CloudxLab.com.
@girisandeep
girisandeep / spark-custom-partitioner.scala
Last active December 9, 2020 02:54
An example of creating a custom partitioners in spark using scala
import org.apache.spark.Partitioner
class TwoPartsPartitioner(override val numPartitions: Int) extends Partitioner {
def getPartition(key: Any): Int = key match {
case s: String => {
if (s(0).toUpper > 'J') 1 else 0
}
}
}
@girisandeep
girisandeep / accumulator-example.scala
Last active May 21, 2021 05:56
An example of using accumulator in spark with scala
sc.setLogLevel("ERROR")
var file = sc.textFile("/data/mr/wordcount/input/")
var numBlankLines = sc.accumulator(0)
def toWords(line:String): Array[String] = {
if(line.length == 0) {numBlankLines += 1}
return line.split(" ");
}
var words = file.flatMap(toWords)
@girisandeep
girisandeep / custom-accum-v1.scala
Last active September 15, 2019 10:58
Create custom accumulator using Scala for Spark 1.x
class MyComplex(var x: Int, var y: Int) extends Serializable{
def reset(): Unit = {
x = 0
y = 0
}
def add(p:MyComplex): MyComplex = {
x = x + p.x
y = y + p.y
return this
}
@girisandeep
girisandeep / broadcast-example.scala
Last active September 15, 2019 10:57
An example of broadcast variables in spark using scala.
var commonWords = Array("a", "an", "the", "of", "at", "is", "am","are","this","that","at", "in", "or", "and", "or", "not", "be", "for", "to", "it")
val commonWordsMap = collection.mutable.Map[String, Int]()
for(word <- commonWords){
commonWordsMap(word) = 1
}
var commonWordsBC = sc.broadcast(commonWordsMap)
var file = sc.textFile("/data/mr/wordcount/input/big.txt")
def toWords(line:String):Array[String] = {
var words = line.split(" ")
@girisandeep
girisandeep / custom-accumulator-v2.scala
Last active February 10, 2023 15:59
Create custom accumulator using Scala for Spark 2.x
class MyComplex(var x: Int, var y: Int) extends Serializable{
def reset(): Unit = {
x = 0
y = 0
}
def add(p:MyComplex): MyComplex = {
x = x + p.x
y = y + p.y
return this
@girisandeep
girisandeep / csv-parsing-ex1.scala
Created June 21, 2017 20:38
This is a naive csv parsing program. It can't handle the comma in values.
var lines = sc.textFile("/data/spark/temps.csv");
var recordsRDD = lines.map(line => line.split(","));
recordsRDD.take(10);
@girisandeep
girisandeep / csv-parsing-ex2.scala
Created June 21, 2017 20:40
It is csv parsing example that uses opencsv library. It is inefficient
//CSV parsing program using opencsv library
//spark-shell --packages net.sf.opencsv:opencsv:2.3
//Or
//Add this to sbt: libraryDependencies += "net.sf.opencsv" % "opencsv" % "2.3"
import au.com.bytecode.opencsv.CSVParser
var a = sc.textFile("/data/spark/temps.csv");
var p = a.map(
line => {
@girisandeep
girisandeep / csv-parsing-ex3scala
Created June 21, 2017 20:42
Instead of using map, it uses mapPartitions. It is better than csv-parsing-ex2.scala
//CSV parsing program using opencsv library
//spark-shell --packages net.sf.opencsv:opencsv:2.3
//Or
//Add this to sbt: libraryDependencies += "net.sf.opencsv" % "opencsv" % "2.3"
import au.com.bytecode.opencsv.CSVParser
var linesRdd = sc.textFile("/data/spark/temps.csv");
def parseCSV(itr:Iterator[String]):Iterator[Array[String]] = {
val parser = new CSVParser(',')
for(line <- itr)
@girisandeep
girisandeep / saving-loading-sequencefiles.scala
Last active May 26, 2021 14:04
This example first create sequencefile and then loads it
//Save it
var rdd = sc.parallelize(Array(("key1", 1.0), ("key2", 2.0), ("key3", 3.0)), 2)
rdd.saveAsSequenceFile("pysequencefile1")
//Load it
import org.apache.hadoop.io.DoubleWritable
import org.apache.hadoop.io.Text
val myrdd = sc.sequenceFile(
"pysequencefile1",