Skip to content

Instantly share code, notes, and snippets.

@solidpple
Created July 25, 2016 07:21
Show Gist options
  • Select an option

  • Save solidpple/cb156c8541e9dc18cf3dd1c42b5861eb to your computer and use it in GitHub Desktop.

Select an option

Save solidpple/cb156c8541e9dc18cf3dd1c42b5861eb to your computer and use it in GitHub Desktop.
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...").persist()
// 5분간의 이벤트 로그 파일을 처리하기 위해 주기적으로 불리는 함수
// 여기서 처리하는 시퀀스 파일이 (UserId, LinkInfo) 쌍을 갖고 있다.
def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[UserId, LinkInfo](logFileName)
val joined = userData.join(events) // (UserId, (UserInfo, LinkInfo)) RDD
val offTopicVisits = joined.filter {
case (userId, (userInfo, linkInfo)) =>
!userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed topics: " + offTopicVisits)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment