This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var accessLogs = sc.textFile("/data/spark/project/access/access.log.45.gz") | |
//Check whats in the RDD. Each record of accessLogs RDDs should be the line //from the files in folder | |
accessLogs.take(10) | |
//Keep only the lines which have IP | |
def containsIP(line:String):Boolean = return line matches "^([0-9\\.]+) .*$" | |
var ipaccesslogs = accessLogs.filter(containsIP) | |
//Extract only IP |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.Partitioner | |
class TwoPartsPartitioner(override val numPartitions: Int) extends Partitioner { | |
def getPartition(key: Any): Int = key match { | |
case s: String => { | |
if (s(0).toUpper > 'J') 1 else 0 | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sc.setLogLevel("ERROR") | |
var file = sc.textFile("/data/mr/wordcount/input/") | |
var numBlankLines = sc.accumulator(0) | |
def toWords(line:String): Array[String] = { | |
if(line.length == 0) {numBlankLines += 1} | |
return line.split(" "); | |
} | |
var words = file.flatMap(toWords) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MyComplex(var x: Int, var y: Int) extends Serializable{ | |
def reset(): Unit = { | |
x = 0 | |
y = 0 | |
} | |
def add(p:MyComplex): MyComplex = { | |
x = x + p.x | |
y = y + p.y | |
return this | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var commonWords = Array("a", "an", "the", "of", "at", "is", "am","are","this","that","at", "in", "or", "and", "or", "not", "be", "for", "to", "it") | |
val commonWordsMap = collection.mutable.Map[String, Int]() | |
for(word <- commonWords){ | |
commonWordsMap(word) = 1 | |
} | |
var commonWordsBC = sc.broadcast(commonWordsMap) | |
var file = sc.textFile("/data/mr/wordcount/input/big.txt") | |
def toWords(line:String):Array[String] = { | |
var words = line.split(" ") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MyComplex(var x: Int, var y: Int) extends Serializable{ | |
def reset(): Unit = { | |
x = 0 | |
y = 0 | |
} | |
def add(p:MyComplex): MyComplex = { | |
x = x + p.x | |
y = y + p.y | |
return this |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var lines = sc.textFile("/data/spark/temps.csv"); | |
var recordsRDD = lines.map(line => line.split(",")); | |
recordsRDD.take(10); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//CSV parsing program using opencsv library | |
//spark-shell --packages net.sf.opencsv:opencsv:2.3 | |
//Or | |
//Add this to sbt: libraryDependencies += "net.sf.opencsv" % "opencsv" % "2.3" | |
import au.com.bytecode.opencsv.CSVParser | |
var a = sc.textFile("/data/spark/temps.csv"); | |
var p = a.map( | |
line => { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//CSV parsing program using opencsv library | |
//spark-shell --packages net.sf.opencsv:opencsv:2.3 | |
//Or | |
//Add this to sbt: libraryDependencies += "net.sf.opencsv" % "opencsv" % "2.3" | |
import au.com.bytecode.opencsv.CSVParser | |
var linesRdd = sc.textFile("/data/spark/temps.csv"); | |
def parseCSV(itr:Iterator[String]):Iterator[Array[String]] = { | |
val parser = new CSVParser(',') | |
for(line <- itr) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Save it | |
var rdd = sc.parallelize(Array(("key1", 1.0), ("key2", 2.0), ("key3", 3.0)), 2) | |
rdd.saveAsSequenceFile("pysequencefile1") | |
//Load it | |
import org.apache.hadoop.io.DoubleWritable | |
import org.apache.hadoop.io.Text | |
val myrdd = sc.sequenceFile( | |
"pysequencefile1", |
OlderNewer