- 内容の質はメモ書きレベル
- ほぼ思いつきで書いているので正しさの保証も無し
- ここで紹介したアルゴリズムに興味を持った人は、オリジナルの論文を読むことを推奨
- PlumtreeとHyParViewという分散アルゴリズムの紹介
- 基本はゴシップベースのグループ管理/ブロードキャスト用のアルゴリズム
- スケーラブルで耐障害性の高さが特徴
- 数万ノード規模に容易にスケール
- 例えば全体の六割(! この値はテキトウ)のノードとかが死んでもメッセージを配送可能
- 一時的に配送不可な状態に陥ってしまってもすぐに回復
- ! どの程度までの障害に耐えられるか、等は今回は触れないので興味のある人は後述の論文を参照のこと
- 効率も良い
- 正常系ではゴシップアルゴリズム特有の配送メッセージの冗長性を上手く排除している
- ただし、確実な配送保証はない
- 実装次第だが、メッセージの欠損や重複、到達順序逆転、はあり得る
- ! 無限のメモリや処理時間を仮定して良いなら防げなくないが、現実的ではない
- それらを使って実装したppgというErlangのライブラリの紹介
- この文書の作成時のバージョンは__0.1.2__
- 機能や実装の詳細というよりは、上述のアルゴリズムを実装に落とし込む際に問題となった点(and それにどう対処したか)、を書き残しておくことが主な目的
想定する分散環境:
- 規模が大きく成り得る (e.g., 数万ノードオーバー)
- 構成は途中で変わるもの (追加・削除が任意のタイミングで発生する)
- ノードは突然死ぬ
- ノードは突然重くなる
- 任意の二つのノードは通信可能 (! この制約は不要にしたい)
- 二つのノードを繋ぐ通信チャンネルはFIFOで信頼できる (e.g., メッセージの重複や欠損はない)
両者は密接に(?)関係している:
- 全てのメンバを把握しているなら、ブロードキャストは容易
- 逆にブロードキャストが可能なら「受信者は送信元に返信して」というメッセージを投げることで、メンバ一覧は取得可能 (構成が途中で変わらないなら)
以下では、__メンバ管理__と、その上での__ブロードキャスト__を実現しているライブラリ等について触れる (方法の違いを中心に)。
あまり詳しくないので軽く。
- http://erlang.org/doc/man/pg2.html
- 標準ライブラリに含まれているプロセスグループ管理モジュール
pg2:create/1
でグループを作って、pg2:join/2
で参加、pg2:get_members/1
でメンバ一覧取得- 同じクラスタのどのノードでも透過的にメンバ取得が可能
- ブロードキャスト関数は提供されておらず、自前で実装する必要がある (ちなみに、pg2の前身のpgというモジュールにはあった)
- 例えば、以下の方法があり得る:
- a) 単にメンバ一覧に対して
lists:foreach/2
でメッセージを送信する - b) Plumtreeを使ってメッセージを送信する (Plumtree用語でいう"ピアサンプリングサービス"をpg2に担当させる)
- a) 単にメンバ一覧に対して
- 例えば、以下の方法があり得る:
- 内部ではglobalを使ってメンバ管理をしている:
- メンバの追加・削除時には、全てのノードでロックを確保
- メンバ情報は全てのノードがローカル(ETS)に保持している
- => 全てのメンバ(正確にはノード)が他の全てのメンバを知っている形になるので、フルメッシュ型ともいえる (下の図も参照)
- 利点:
- ブロードキャストのコストが低く、柔軟でもある
- 全メンバがメンバ一覧を把握しているので、要求に応じていろいろなブロードキャスト方法が選択し得る
- 一番オーバヘッドを抑えたいなら
lists:foreach/2
- 途中の経路(e.g., ラックを跨ぐのは最小限にしたい)を工夫したいなら配送用のスパニング木を組むことも可能 (利用者側が)
- 一番オーバヘッドを抑えたいなら
- 欠点:
- メンバ管理のコストが高い:
- 追加・削除時にノード数に比例したコストが掛かる (i.e., スケール性に限界がある)
- 全てのノード(! グループを利用しないものも含む)が、全てのグループのメンバ一覧を保持する
- => グループ数が多い場合には不適切
- クラスタ内の特定のノードが極端に重い場合の挙動も気になる (TODO: 要調査)
- 「ブロードキャスト」をどう実現するかは利用者側の責任
- メンバ管理のコストが高い:
※ 良く知らないので、他の人から聞いた話をもとに想像で記述
- https://github.com/uwiger/gproc
- メンバ管理:
- リーダを選んで、それが(それのみが)一つのグループ内に属するメンバを管理する
- スター型といえる
- ブロードキャスト:
- ブロードキャストしたい場合は「いったんリーダに送信」して「リーダがそれを全メンバに転送」する
- 利点:
- 非リーダのメンバの責務・負荷が軽い
- リーダさえ選んでしまえば、その後のメンバの追加・削除コストは低い
- 送信されるメッセージ群に何らかの制約を課すことが容易 (e.g., 到達順序の保証)
- 欠点:
- リーダの負荷が重い (全てのメッセージがいったんリーダを経由し、リーダは全メンバに対して転送する必要がある)
- グループサイズに応じたスケールが難しい
- もしリーダが重くなったら全ての処理(e.g., メンバ追加削除、メッセージ配送)が滞る恐れがある
- リーダの負荷が重い (全てのメッセージがいったんリーダを経由し、リーダは全メンバに対して転送する必要がある)
- (後述の)HyparViewを使ってメンバ管理を、Plumtreeを使ってブロードキャストの実現を、行っている
- メンバ管理:
- 各メンバは、グループ内のメンバのサブセット(隣人)のみを把握している
- 全メンバの一覧をローカルに保持しているノードは存在しない
- ブロードキャスト:
- ゴシップベース
- 基本(?)は、全ての隣人にメッセージを配送
- ただし、メンバの追加や削除のない静止状態では、隣人の内の一つにのみメッセージを送ることで効率化を図る
- 配送用のスパニング木が形成される
- 利点:
- スケーラブル
- 各メンバは自分の隣人のみを把握していれば良く、隣人の数は
log10(グループサイズ)
程度で十分
- 各メンバは自分の隣人のみを把握していれば良く、隣人の数は
- メンバの追加・削除コストが低い
- 各メンバは自分の隣人のみを(省略)
- たくさんグループを作っても平気
- 正常時(静止状態)では、スパニング木に基づいた配送を行うので、メッセージ転送回数は最適に近い(ものにできないこともない。理屈上は)
- 最大ホップ数(latency)は、他に比べて劣ることが多い
- メンバの死亡やスローダウンを自動でハンドリングしてくれる
- ある隣人が死んだら新しい隣人を迎え入れ、特定の経路が重くなったら自動で別の経路を選択してくれる
- どこかの単一のノード(メンバ)の異常により、他が影響を受けることがほぼない
- 大規模システムでは重要な特性
- スケーラブル
- 欠点:
- メンバ一覧の取得がコスト高
- ローカルで把握している人がいないので、知りたい場合にはブロードキャストが必要
- メンバの追加・削除が発生している状態で、正確なメンバ一覧を取得するのはなかなか面倒
- オーバヘッド増
- メンバに関する知識を分散させている関係上、メッセージ配送時に個々のメンバが行う必要がある仕事は増えている
- 正確性(?)の保証は薄い
- いわゆる「送信側が無限回メッセージを送れば、他のメンバは無限回メッセージを受信する」的なモデル
- メンバの追加・削除時(が一度に大量に発生した場合)に、特定のメッセージの消失や重複、到達順序の逆転などは発生し得る
- ただしそのような状態に陥っても、自動で回復する
- メンバ一覧の取得がコスト高
HyParViewレイヤーでのグラフ構造:
Plumtreeレイヤー(eagerエッジのみ)でのグラフ構造:
※ 良く知らないので、他の人から聞いた話をもとに想像で記述
- 内部のブロードキャストにPlumtreeを利用しているらしい (e.g., riak_metadata)
- ただしメンバ管理部分にはHyParViewは使っておらず、おそらく(もともとriak_coreに備わっていた)自前の仕組みを用いている模様
ノード数に応じて、一つのメンバの追加のコストがどう変わるか。 (各ノードにつき一つのメンバを追加した場合の平均処理時間)
ノード数 | pg2 (serial) | pg2 (parallel) | ppg (serial) | ppg (parallel) |
---|---|---|---|---|
1 | 0.07ms | 0.17ms | ||
50 | 17.24ms | 94.54ms | 11.59ms | 4.17ms |
100 | 37.37ms | 151.65ms | 13.59ms | 6.76ms |
注記: 上記ノード群は一つのマシン上で起動した 注記: 削除の際の数値は省略したが、ほぼ同様の傾向となるものと思って良い
TODO: ppgのオーバヘッド分だけ重いけど、許容範囲、ということを示す
TODO: あとで埋める
TODO: 「30秒の間に、500メンバが追加・ブロードキャスト・削除を行う」というケースで、まれに数個のメッセージ欠損が発生することを示す (動的な状態の話。静止状態では欠損なし)
TODO: あとで埋める
以下の論文の内容を極端に単純化(and 意訳)したもの:
- Plumtree: Epidemic Broadcast Trees
- HyParView: a membership protocol for reliable gossip-based broadcast
- どちらも(要旨を掴む分には)平易なので、興味のある人は一読してみることをお勧め
動機の概要(?)としては、
-
- スケーラブルかつ故障耐性の高いブロードキャストとして、ゴシップ(or epidemic)ベースのアルゴリズムがある
- ゴシップ:
- 始めてメッセージを受け取ったメンバは、グループに属するメンバの(無作為に選択された)サブセットに、メッセージを転送する
- => 各人が自分の隣人に伝えることを繰り返せば、その内に皆に情報が伝わるよね?
- => たまたまその時に誰か一人がいなくても、その人の隣人には、別の経路で情報が伝わるはず
-
- でもどうやって自分の隣人を把握すれば良いのか?
- HyParViewを使おう
- ! 他にも提案手法はいろいろあるので、その辺りに興味がある場合には論文を参照のこと
-
- 隣人全てにメッセージを転送するのは無駄では?
- 異常時のことを考えると冗長な経路は欲しいけど、正常時には一人に一回だけ伝えれば十分なはず
- Plumtreeを使おう
- メンバ管理を担当
- 冗長性を有する接続グラフ(?)の構築・維持が目的
- __Hybrid Partial View__の略
- 部分ビュー:
- 各メンバは、グループのメンバ全員ではなく、部分ビューに含まれる隣人のみを把握している
- 「小さなActiveView(e.g.,
log10(N)
)」と「大きなPassiveView(e.g.,log10(N) * C
)」という2つのビューを保持している- ActiveViewのメンバ同士はTCPで接続しており、メッセージの配送は、この経路を使って行われる
- TCPを使っているのは死活監視を兼ねているため (これにより早急なダウン検知およびリカバリが可能)
- TCP接続が切れたらメンバが死んだと判断して、PassiveViewから補填する
- ActiveViewには対称性がある (i.e., 「AがBの隣人なら、BもAの隣人である」)
- 静止状態では、ActiveViewの構造は安定している
- PassiveViewのメンバ切断時の予備用
- 普段は死活監視も行わず保持するだけ
- ただし、定期的に他のメンバの内容とシャッフルして、新鮮さが保たれるようにしている
- ActiveViewのメンバ同士はTCPで接続しており、メッセージの配送は、この経路を使って行われる
- メンバの死活監視方法: reactiveモデルとshuffleモデル(?)
- メンバ追加時のエントリポイントとなるコンタクトサービス(e.g., リーダ的なメンバ)が必要となる
TODO: 図で説明
メモ (基本は図で説明):
- 使うメッセージ: JOINメッセージ, FORWARD_JOINメッセージ, CONNECTメッセージ
- 処理の流れ:
-
- 追加メンバは最初にコンタクトサービスにJOINメッセージを投げる
-
- コンタクトサービスはActiveViewに追加して、FOWARD_JOINを使って新規メンバを転送
-
- 一定回数ランダムウォーク
-
- 途中でPassiveViewに追加
-
- 最後にActiveViewに追加
-
- ActiveViewに追加したらCONNECTメッセージを返送
-
- ActiveViewが溢れたらランダムにDISCONNECT (PassiveViewに移す。次節も参照)
TODO: 図で説明
- b) leaveとDISCONNECT、NEIGHBOR
- c) SHUFFLE
- 正式名称
- ブロードキャストを担当
- eagerとlazy, pushとpull
- 初期状態はゴシップと同様
- GOSSIP <=> PRUNEによりスパニング木を形成
- 障害対応はIHAVEで
- ノードの追加・削除はHyParView任せ
- ピアサンプリングサービスの話も
- 最初は皆eager
基本は上記2つのアルゴリズムの素直な実装。
ただし、実装に落としこむ際にいくつか難しい問題もあったので、その辺りを中心に記述。
> ppg:create(foo).
> {ok, Channel0} = ppg:join(foo, self()).
> {ok, Channel1} = ppg:join(foo, self()).
> {ok, Channel2} = ppg:join(foo, self()).
> ppg:broadcast(Channel0, bar).
> flush().
Shell got bar
Shell got bar
Shell got bar
ok
> ok = ppg:leave(foo, self()).
> ppg:broadcast(Channel0, bar).
> flush().
Shell got bar
Shell got bar
ok
! グラフを接続性をどう維持するかが主関心
- globalを使っている (※ 最新版(v0.1.3)ではevelに置き換え済み)
- HyParViewは、皆が共通のコンタクトサービスを経由してJOINすること、で接続性を達成している
- 論文のままの実装では、切断する時はする
- ActiveViewの安定性の問題
- そもそもPassiveViewレベルで切断することもありえなくはない (確率的にかなり低いとしても)
- ではどうするか:
- 半順序
- 最終的にはrejoin
- ErlangだとTCP接続が不要な話 (でも"接続"という概念は必要。後述のPlumtreeでのメッセージ保持期間の話と関わってくる)
TODO: もしかしたら書くかも (FOREIGNERメッセージの追加とか)
! 配送効率の最適化が主関心 (最適な経路の発見・選択、重い経路の回避)
- 論文での説明の紹介
- ただし、実際には実時間に基づく最適化の方が使い勝手が良いことが多そう
- ホップ数が少ない経路でも、間に極端に重たいノードが含まれている場合には避けたい
- ihave/graftの仕組みに統合
- 論文中では、これは障害対策としてのみ記述されている
- ihaveに対する要求は「(タイミングは)いつか隣人に送信されること」のみ
- 「piggybackできる」とかの話もその一環
- ppgではこのihaveのタイムアウトを、経路最適化のためにも使っている
- e.g., 「経路Aよりも経路Bのメッセージの到達時間が50ms早いなら、後者を使うように変更する」
- ただし、代償としてihaveのpiggybackによる転送回数軽減がやり難くなった
- 論文中では、これは障害対策としてのみ記述されている
- 論文で扱われていない(別論文を参照、的な扱い)が実際には重要な問題
- 2つの方法を思いついた
- a) 参照カウントベース
- b) 有効期限ベース
- 最終的には後者を採用
- 最初はメモリ使用量を抑えられるので前者だった
- 後者はメッセージの転送回数が少ないのと、他の仕組みとの直交性が高い (後述の「重いノード問題」も同じ枠組みで扱える)
- TODO:
- 有効期限ベースの問題
- その期間内の処理を守れないメンバをどう扱うか
- TODO: 古いメッセージに対するGRAFT要求は単に無視
- TODO: tick/tack(ping/pong)による応答性チェックと切断
- それによるリスク
- メッセージの重複配送が発生し得るケースについて
- 非同期システムによくある問題: 死んだのか、単にスローダウンしてるのか分からない
- Erlangにはlink/monitorがあるので死活監視は楽になっているが、結局「恒常的にスローダウンしている」or「一時的にちょっと遅れただけ」かどうかの識別には約に立たない
- 三つのケースがある(! この分類には直行性がない):
- a) 自分が重い場合
- b) 相手が重い場合
- c) 経路が主にノードでふさがれている場合
- => このケースのためにping/pongが必要となった (何かのついで、は無理) => TODO: そもそもping/pongについてどこかで触れる (オリジナルの論文にはない話)
- ではどうするか?
- むりやり同期システムの枠組みに落とし込む
- タイムアウトしたら接続を強制的に切る (ここで論理的な接続の概念が重要となる。毎回別プロセス立てるのでも良い)
- plumtreeの障害耐性、回復の容易さ(locality)により、気軽に切断できるのが魅力
- 怪しいならとりあえず別経路で、が気軽にできる
- 経路変更に全体の合意が必要なアルゴリズムでは厳しい
- むりやり同期システムの枠組みに落とし込む
ppg:get_members/1
は重いので普段は使いたくないので、以下の方法を推奨:
- JOINした人 == メッセージ送信者:
- 送信者は、送信メッセージに
ppg:join/2
が返したppg:channel() = pid()
を含めるようにする
- 送信者は、送信メッセージに
- 受信者:
- メッセージに含まれる
ppg:channle()
をチェックして、未知のものなら、新規メンバが追加された、と判断する - 同時にそのPIDをmonitorする
ppg:leave/1
時が呼ばれた場合は、上記プロセスはいなくなるので、受信者にDOWNメッセージが飛ぶことになる- => これによるメンバの削除(leave)は検出可能
- メッセージに含まれる
コード例:
%% 送信者側
> {ok, Channel} = ppg:join(Group, self()).
> ppg:broadcast(Channel, {msg, Channel, Body}).
%% 受信者側
> receive
{msg, Channel, _Body} ->
Channels1 =
case maps:is_key(Channel, Channels) of
true -> Channels0; % 既に知っている
false ->
%% 新しいメンバがJOINした (あとの初めてのメッセージ)
Monitor = monitor(process, Channel),
maps:put(Channel, Monitor, Channels0),
do_something;
{'DOWN', Monitor, _, Channel, _} ->
Channels1 =
case Channels0 of
#{Channel := Monitor} -> maps:remove(Channel, Channels0); % メンバがleaveした
_ -> Channels0
end,
do_something
end.
合うケース:
- 極端に規模が大きい
- ノードの追加・削除(or ダウン)が頻繁
- グループをたくさん作りたい
- メッセージ配送の信頼性はそれなりで良い
- 完全にP2Pなシステム
合わないケース:
- 信頼性大事
- オーバヘッドを極端にまで切り詰めたい
- 物理的な構成を厳密に反映したい
- グラフに対する制約 (物理構成の意識)
- ピアサンプリングサービスを入れ替え可能に
- メッセージサイズやihaveのpiggyback周りの最適化
- Erlangプロトコル以外への対応
- コンタクトサービスもスケーラブルに