Skip to content

Instantly share code, notes, and snippets.

@solidpple
Created July 25, 2016 07:19
Show Gist options
  • Save solidpple/27d29addd0f5af041eae9cac859d91d8 to your computer and use it in GitHub Desktop.
Save solidpple/27d29addd0f5af041eae9cac859d91d8 to your computer and use it in GitHub Desktop.
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...").persist()
// 5분간의 이벤트 로그 파일을 처리하기 위해 주기적으로 불리는 함수
// 여기서 처리하는 시퀀스 파일이 (UserId, LinkInfo) 쌍을 갖고 있다.
def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[UserId, LinkInfo](logFileName)
val joined = userData.join(events) // (UserId, (UserInfo, LinkInfo)) RDD
val offTopicVisits = joined.filter {
case (userId, (userInfo, linkInfo)) =>
!userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed topics: " + offTopicVisits)
}
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...")
.partitionBy(new HashPartitioner(100))
.persist()
val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
pairs.partitioner
import org.apache.spark.HashPartitioner
val partitioned = pairs.partitionBy(new HashPartitioner(2)).persist()
partitioned.partitioner
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment