Skip to content

Instantly share code, notes, and snippets.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
class SearchFunctions(val query: String) {
def isMatch(s: String): Boolean = {
s.contains(query)
}
def getMatchesFunctionReference(rdd: RDD[String]): RDD[Boolean] = {
rdd.map(isMatch)
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object WordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("Word Count")
val sc = new SparkContext(conf)
val input = sc.textFile("/home/starbucks/spark-1.6.1-bin-hadoop2.6/README.md")
val words = input.flatMap(line => line.split(" "))
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...").persist()
// 5분간의 이벤트 로그 파일을 처리하기 위해 주기적으로 불리는 함수
// 여기서 처리하는 시퀀스 파일이 (UserId, LinkInfo) 쌍을 갖고 있다.
def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[UserId, LinkInfo](logFileName)
val joined = userData.join(events) // (UserId, (UserInfo, LinkInfo)) RDD
val offTopicVisits = joined.filter {
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...").persist()
// 5분간의 이벤트 로그 파일을 처리하기 위해 주기적으로 불리는 함수
// 여기서 처리하는 시퀀스 파일이 (UserId, LinkInfo) 쌍을 갖고 있다.
def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[UserId, LinkInfo](logFileName)
val joined = userData.join(events) // (UserId, (UserInfo, LinkInfo)) RDD
val offTopicVisits = joined.filter {
val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
// pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:28
pairs.partitioner
// res40: Option[org.apache.spark.Partitioner] = None
import org.apache.spark.HashPartitioner
// import org.apache.spark.HashPartitioner
val partitioned = pairs.partitionBy(new HashPartitioner(2)).persist()
// partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[34] at partitionBy at <console>:31
partitioned.partitioner
// res41: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...").partitionBy(new HashPartitioner(100)).persist()
import org.apache.spark.storage.StorageLevel
val input = sc.parallelize(List(1, 2, 3, 4))
// input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[35] at parallelize at <console>:31
val result = input.map(x => x*x)
// result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[36] at map at <console>:33
result.persist(StorageLevel.DISK_ONLY)
// res43: result.type = MapPartitionsRDD[36] at map at <console>:33
println(result.count()) // 4
println(result.collect().mkString(",")) // 1, 4, 9, 16
val links = sc.objectFile[(String, Seq[String])]("links").partitionBy(new HashPartitioner(100)).persist()
val ranks = links.mapValues(v => 1.0)
for (i <- 0 until 10) {
val contributions = links.join(ranks).flatMap {
case (pageId, (link, rank)) =>
links.map(dest => (dest, rank / links.size))
}
rank = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)
// 각 URL의 도메인 네임을 해싱해 DomainNamePartitioner를 구현
class DomainNamePartitioner(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {
//
val domain = new Java.net.URL(key.toString).getHost()
val code = (domain.hashCode % numPartitions)
// 자바의 hashCode()에 기반해서 구현을 한다면, 음수를 리턴할 수 있으므로 getPartition()이 음수를 반환하지 않도록 주의해야 한다.
if (code < 0) {
code + numPartitions
// Dictionary 사용하는 방법
var dictObject = {}
dictObject['banana'] = '바나나';
dictObject['hong'] = '홍';
dictObject['monkey'] = '원숭이';
console.log(dictObject) // Object {banana: "바나나", hong: "홍", monkey: "원숭이"}
// Dictionary 출력
for (var key in dictObject) {
console.log("key : " + key +", value : " + dictObject[key]);