class: center, middle
テクノロジー開発2部 岡田 遥来
- 分散並列処理について
- Sparkについて
- 具体例(アクセスログのセッション化)
ある処理の実行時間を短縮したいとき
- DBへのアクセスがm回あって、それぞれ通信にn秒かかる場合、どんなに頑張ってもm×n秒かかる
- ハイスペックなマシンは高い
- 限界がある
テキストファイルに含まれる単語と、その出現頻度を数える
Lorem ipsum dolor sit amet, consectetur adipiscing elit.
Quisque consectetur euismod hendrerit.
Suspendisse quis posuere justo, et hendrerit ipsum.
Cras porttitor metus ac libero vehicula, nec congue lectus consequat.
....
Map[String, Int] // Map(cursus -> 103372, eros -> 119604, tincidunt -> 196561,...
import collection.{mutable => mu}
val histogram: mu.Map[String, Int] = mu.HashMap.empty
io.Source.fromFile("/path/to/text").getLines().foreach { line =>
line.replaceAll("[,.]", "").split(" ").map(_.toLowerCase.trim).foreach { word =>
val i = histogram.getOrElseUpdate(word, 0)
histogram(word) = i + 1
}
}
import collection.{mutable => mu}
import collection.{concurrent => con}
val histogram: mu.Map[String, Int] = con.TrieMap.empty
io.Source.fromFile("/path/to/text").getLines().toStream.par.foreach { line =>
line.replaceAll("[,.]", "").split(" ").map(_.toLowerCase.trim).foreach { word =>
val i = histogram.getOrElseUpdate(word, 0)
histogram(word) = i + 1
}
}
???
- そもそもロジックが並列化しづらい
- 円周率やGCD計算などの普通の計算をどう並列化するのか
- データセットに対する処理の場合、一定の単位で区切って振り分ける
- ワーカー間のデータ転送
- 各ワーカーへの処理の割り当て & 結果の集約
- あるワーカーが落ちたときのリトライ処理
- 台数が増えるということは故障確率も増える
- Scalaで実装された分散並列処理フレームワーク
- Apacheトップレベルプロジェクトの一つ
- Standaloneクラスタや、YARN, Mesosクラスタ上で動作
- マネージドサービスもある
- AWS: ElasticMapReduce
- GCP: Cloud Dataproc
- インメモリ処理を基本とする
- RDDに定義された高階関数(map, filter,...)を使って処理を記述
- Scalaのコレクション操作をする要領でロジックを書けば、勝手に分散並列化されるイメージ
- Resilient Distributed Dataset (耐障害性分散データセット)
- 遅延評価されるimmutableなScalaのコレクションのように作られている
- map, filter, flatMap, groupByなどは "Transform"
- これだけを書いても実行されない。RDDを生成する
- foreach, count, saveAsTextFileなどは "Action"
- 実行され、結果を生成する
- map, filter, flatMap, groupByなどは "Transform"
タイムスタンプ IPアドレス
2016-04-15T00:00:01 203.0.113.0
2016-04-15T00:00:22 203.0.113.42
2016-04-15T00:05:01 203.0.113.2
2016-04-15T00:28:01 203.0.113.0
...
それぞれのIPアドレスについて、30分以上間の空いてないアクセスの塊にまとめる。
IPアドレス アクセス回数 滞在時間(秒)
203.0.113.0 2 1680
203.0.113.42 1 0
203.0.113.2 1 0
...
- コードの完全版はこちら
case class Access(timestamp: DateTime, ipAddress: String)
case class Session(ipAddress: String, accessCount: Int, duration: Long)
object Main {
def main(args: Array[String]): Unit = {
val sc = new SparkContext()
val accessLogs: RDD[String] = sc.textFile("/path/to/access_log/*")
accessLogs
.map { line =>
val fields = line.split(' ')
Access(DateTime.parse(fields(0)), fields(1))
}
.groupBy(_.ipAddress)
.flatMap { case (ipAddress, accesses) =>
val sorted = accesses.toList.sortBy(_.timestamp)
sessionize(sorted)
}
.map(s => s"${s.ipAddress} ${s.accessCount} ${s.duration}")
.saveAsTextFile("/path/to/output")
}
def sessionize(sortedAccesses: Seq[Access]): Seq[Session] = ???
}
- Transformは複数回実行されることがあるため、外部へI/Oを行う場合は冪等にする必要がある
- データ欠損時のリトライや、RDD#cache() しないでRDDを使い回す場合
- 2016/4/15現在、Spark公式で配布されているバイナリはScala 2.10でビルドされている
- Scala 2.11で書いたアプリが動かない
- Spark on EMRも同様
- 分散並列処理を書く際には検討の価値あり
- 以下のような場合は、分散シェルやタスクキュー等、素朴なツールでも十分と思う
- データセット全体でキーごとにgroupingする、などのシャッフル処理がない
- 各マシンで処理した結果を集計する必要がない
- 「バカパラ」と呼ぶらしい (@shunsukeaihara さん談。ネガティブな意味ではない)
- 初めてのSpark (O'REILLY)
- 分散シェルまとめ
- Apache Sparkのご紹介