This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.SparkConf | |
import org.apache.spark.SparkContext | |
import org.apache.spark.SparkContext._ | |
class SearchFunctions(val query: String) { | |
def isMatch(s: String): Boolean = { | |
s.contains(query) | |
} | |
def getMatchesFunctionReference(rdd: RDD[String]): RDD[Boolean] = { | |
rdd.map(isMatch) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.SparkConf | |
import org.apache.spark.SparkContext | |
import org.apache.spark.SparkContext._ | |
object WordCount { | |
def main(args: Array[String]) { | |
val conf = new SparkConf().setMaster("local").setAppName("Word Count") | |
val sc = new SparkContext(conf) | |
val input = sc.textFile("/home/starbucks/spark-1.6.1-bin-hadoop2.6/README.md") | |
val words = input.flatMap(line => line.split(" ")) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val sc = new SparkContext(...) | |
val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...").persist() | |
// 5분간의 이벤트 로그 파일을 처리하기 위해 주기적으로 불리는 함수 | |
// 여기서 처리하는 시퀀스 파일이 (UserId, LinkInfo) 쌍을 갖고 있다. | |
def processNewLogs(logFileName: String) { | |
val events = sc.sequenceFile[UserId, LinkInfo](logFileName) | |
val joined = userData.join(events) // (UserId, (UserInfo, LinkInfo)) RDD | |
val offTopicVisits = joined.filter { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val sc = new SparkContext(...) | |
val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...").persist() | |
// 5분간의 이벤트 로그 파일을 처리하기 위해 주기적으로 불리는 함수 | |
// 여기서 처리하는 시퀀스 파일이 (UserId, LinkInfo) 쌍을 갖고 있다. | |
def processNewLogs(logFileName: String) { | |
val events = sc.sequenceFile[UserId, LinkInfo](logFileName) | |
val joined = userData.join(events) // (UserId, (UserInfo, LinkInfo)) RDD | |
val offTopicVisits = joined.filter { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3))) | |
// pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:28 | |
pairs.partitioner | |
// res40: Option[org.apache.spark.Partitioner] = None | |
import org.apache.spark.HashPartitioner | |
// import org.apache.spark.HashPartitioner | |
val partitioned = pairs.partitionBy(new HashPartitioner(2)).persist() | |
// partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[34] at partitionBy at <console>:31 | |
partitioned.partitioner | |
// res41: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val sc = new SparkContext(...) | |
val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...").partitionBy(new HashPartitioner(100)).persist() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.storage.StorageLevel | |
val input = sc.parallelize(List(1, 2, 3, 4)) | |
// input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[35] at parallelize at <console>:31 | |
val result = input.map(x => x*x) | |
// result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[36] at map at <console>:33 | |
result.persist(StorageLevel.DISK_ONLY) | |
// res43: result.type = MapPartitionsRDD[36] at map at <console>:33 | |
println(result.count()) // 4 | |
println(result.collect().mkString(",")) // 1, 4, 9, 16 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val links = sc.objectFile[(String, Seq[String])]("links").partitionBy(new HashPartitioner(100)).persist() | |
val ranks = links.mapValues(v => 1.0) | |
for (i <- 0 until 10) { | |
val contributions = links.join(ranks).flatMap { | |
case (pageId, (link, rank)) => | |
links.map(dest => (dest, rank / links.size)) | |
} | |
rank = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// 각 URL의 도메인 네임을 해싱해 DomainNamePartitioner를 구현 | |
class DomainNamePartitioner(numParts: Int) extends Partitioner { | |
override def numPartitions: Int = numParts | |
override def getPartition(key: Any): Int = { | |
// | |
val domain = new Java.net.URL(key.toString).getHost() | |
val code = (domain.hashCode % numPartitions) | |
// 자바의 hashCode()에 기반해서 구현을 한다면, 음수를 리턴할 수 있으므로 getPartition()이 음수를 반환하지 않도록 주의해야 한다. | |
if (code < 0) { | |
code + numPartitions |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Dictionary 사용하는 방법 | |
var dictObject = {} | |
dictObject['banana'] = '바나나'; | |
dictObject['hong'] = '홍'; | |
dictObject['monkey'] = '원숭이'; | |
console.log(dictObject) // Object {banana: "바나나", hong: "홍", monkey: "원숭이"} | |
// Dictionary 출력 | |
for (var key in dictObject) { | |
console.log("key : " + key +", value : " + dictObject[key]); |
OlderNewer