この章ではRindaの拡張例を見ていきます。
以下のコマンドでインストールできます。
gem install more_rinda
ソースコードは https://github.com/seki/MoreRinda にあります。"test"や"sample"のディレクトリに今回取り上げるもの以外にもいろいろなサンプルが置いてあるので一度覗いてみて下さい。
前章では「新たなプロセスを起動する eval 操作は良いアイデアがなく用意できなかった」といいましたが、UNIX環境に限って稼働するものを作成しました。ここではLindaのevalがどういうものかを確認した後、rinda_evalの用法や実装を見て行きましょう。
Lindaでタプルを生成する操作にはoutとevalの二つがあります。out操作はRinda::TupleSpaceのwriteに対応します。
0から9までの二乗根のタプルの生成は次のようにします。
/* C-Linda */
for (i = 0; i < 10; i++)
out("sqrt", i, sqrt(i));
/* Rinda */
10.times do |n|
ts.write([:sqrt, n, Math.sqrt(n)])
end
eval操作はout操作にそっくりに見えますが、プロセスが生成される点が違います。なんと新しいプロセス側で引数の評価を行い、その結果からタプルを生成します。次の疑似コードは10個のプロセスを生成して、それぞれが一つのタプルを生成するものです。sqrtは生成されたプロセスで計算されます。
/* C-Linda */
for (i = 0; i < 10; i++)
eval("sqrt", i, sqrt(i));
ふつうに考えたら、引数の評価をした後にプロセスが生成されそうに見えますが、C-Lindaはライブラリではなく、プリプロセッサか言語の拡張らしいのでそんな芸当ができるようです。
eval操作をRubyでポータブルに実装するのはちょっと難しいので、Rinda::rinda_evalはfork()を持つUNIX属だけを対象とすることにしました。このモジュールメソッドは新しいプロセスを生成しブロックを実行します。また、引数のタプルスペースへの参照をブロックへの引数として与えます。ブロックが返したArrayをタプルスペースに追加します。
10.times do |n|
Rinda::rinda_eval($ts) do |ts|
[:sqrt, n, Math.sqrt(n)]
end
end
C-Lindaのevalみたいな字面を提供できなかったのは残念ですが、実用的なAPIになったと思います。この例では計算結果のタプルを生成するものですが、ワーカプロセスの生成に使うことが多いと思います。10個のワーカプロセスを生成する例を示します。これは無限ループですけど適当な条件で終了させてもよいですね。
新しいプロセスにわける利点は二つあります。一つは別のアドレス空間ができるということ、もう一つは(これはMRI Rubyに限定されますが)マルチコアの恩恵にあずかれることです。ここでは後者について、簡単な例を通して説明して行きましょう。
ruby-1.9のスレッドはOSのネイティブなスレッドを利用して作られていますが、同時に実行されるのはただ一つだけという制約があり、スレッドを生成するだけではマルチコアの利用しきることができません。
以下の実験では時間のかかりそうなフィボナッチ関数を利用します。フィボナッチはメモ化やn-1も持ち回ることによって高速に実行することができますが、ここ欲しいのは時間のかかる処理なので素朴に実装します。
まずはそれぞれ30のフィボナッチ関数を三回実行するのにかかった時間を計測してみましょう。
require 'benchmark'
def fib(n)
n < 2 ? n : fib(n - 2) + fib(n - 1)
end
def task(n)
puts "fib(#{n}) = #{fib(n)}"
end
puts Benchmark.measure{
[30, 30, 30].each{|x| task x}
}
fib(30) = 832040
fib(30) = 832040
fib(30) = 832040
1.420000 0.000000 1.420000 ( 1.414553)
この処理を並行してすすめるためにスレッドを使った例に書き換えてみましょう。これを2コアのマシンで実行してみましょう。(シングルコアでも同じ結果になるんですけどね)
require 'benchmark'
def fib(n)
n < 2 ? n : fib(n - 2) + fib(n - 1)
end
def task(n)
puts "fib(#{n}) = #{fib(n)}"
end
puts Benchmark.measure{
[30, 30, 30].map{|x| Thread.new{task x}}.map{|y| y.join}
}
fib(30) = 832040fib(30) = 832040
fib(30) = 832040
1.440000 0.000000 1.440000 ( 1.435304)
処理時間がほとんど変わりませんね。がっかりです。この理由はRubyのスレッドはインタプリタごとに同時に一つしか走行できないという制約があるからです。Ruby 1.8の場合はグリーンスレッドという仮想的なスレッドなため一つのネイティブスレッドしか使えず、ネイティブスレッドが使われている1.9でもGIL(Global Interpreter Lock)があるため複数のネィティブスレッドを同時に動かしません。
本当にプロセスを起動する、rinda_evalを使ったバージョンに書き換えてみましょう。これならマルチコアを使い切れます。
require 'benchmark'
require 'rinda/tuplespace'
require 'rinda/eval'
def fib(n)
n < 2 ? n : fib(n - 2) + fib(n - 1)
end
def task(n)
puts "fib(#{n}) = #{fib(n)}"
end
place = Rinda::TupleSpace.new
DRb.start_service
puts Benchmark.measure{
[30, 30, 30].each {|x|
Rinda::rinda_eval(place) {|ts| [:result, x, task(x)]}
}.each {|x|
place.take([:result, x, nil])
}
}
fib(30) = 832040
fib(30) = 832040
fib(30) = 832040
0.010000 0.010000 0.910000 ( 0.716385)
少しスレッドバージョンよりもコード量が増えてしまいましたが、タスクごとに新しいプロセスを起動し、複数CPUコアを利用できることを実感できたのではないでしょうか?
ちなみにJava VM上で走るJRubyはスレッドでマルチコアが使えます。
[~]$ jruby thread_task.rb
Took 1 sec
Took 2 sec
Took 3 sec
user system total real
1.587000 0.000000 1.587000 ( 1.543000)
(注:JRubyの結果に替えて下さい => 井上さん)
これはスレッドバージョンをJRubyで走らせた結果です。実際にはJVMを立ち上げる作業そのものに時間が取られるのですが、それでも2秒ちょっとです。JRubyを使える環境でしたら迷わずJRubyを使えば良いと思いますが、MRIに依存している環境の場合、お手軽に並列処理をするためのツールとしてrinda_evalを忍ばせておくのも良いのではないでしょうか。
ところでGILについてしばしば見かける誤解があります。readやwrite、sleepなどブロックしそうな操作で、ひとつのRubyスレッドがブロックするとネイティブスレッドが停止し、Rubyスレッドの切り替えが停まるのではないか、というものです。実際にはRubyスレッドではこういった長いシステムコールの影響を受けるのはそのRubyスレッドだけです。(拡張ライブラリなどで本当に停止させてしまうモノもありますから注意が必要です)ですから、I/O、ネットワーク越しのサービスなど外部資源との待合せに関してはRubyスレッドは十分に有効な解決策となります。例えば、Webページのクロウラーなどは多くの時間をI/O待ちに使いますね。
さきほどのtaskをフィボナッチからsleepに変更して実験してみましょう。時間のかかる処理の代わりに外部資源を待っているイメージです。
require 'benchmark'
def task(n)
sleep(n * 0.1)
end
puts Benchmark.measure{
[30,30,30].map{|x| Thread.new{task x}}.map{|y| y.join}
}
0.000000 0.000000 0.000000 ( 3.001487)
fib(n)の代わりにsleep(n * 0.1)としました。いくつ処理してもだいたい3秒で済みますね。これはreadやwriteなどでも同様です。複数のI/Oを複数のRubyスレッドで解くのは、Cで言うところのネイティブスレッド一つでselect()と非同期モードのread()/write()を組み合わせて多数のクライアントを相手にするのによく似ています。Rubyスレッドを使うと、TCPのストリームから意味のあるパケットを組み上げて上位のレイヤーに返す処理を直感通りに書けるところは非常に楽です。実際、dRubyの実装は非同期I/Oを複数のRubyスレッドで解いています。 (脚注: といっても一つのプロセスが数十のクライアントと同時に通信するケースです。クライアント数が数千、数万単位にはEventMachineなどのもっと適した解があります。)
(コラムっぽい?)
アクターモデルという考え方があります。プロセスとプロセスの間を片道のメッセージを送信しあうことによって連携させるのが基本となるアイデアです。メッセージ送信のプリミティブは往復ではなく、片道です。メッセージを送信する際には「宛先」「メッセージ内容」を組にして相手の状態は気にせずに送ります。メッセージを受信するプロセスは、自分の都合のよいときに届いたメッセージを一つずつ取り出します。プロセス間でオブジェクト(メモリ)が共有されていなければ、自分の資源が安全な状態になってからメッセージを読み出すように気をつけることで、マルチスレッドプログラミングにおける資源の排他制御などの問題を回避することができると言われています。 片道のメッセージの交換で並行処理を進めていくのはLindaでのプログラミングにそっくりですね。
アクターモデルの鍵は、メッセージパッシングと共有されない空間です。Erlangは生粋のアクターモデルで両方を提供しますが、多くの無難な言語におけるアクターモデルは後付けです。Rubyで書いても簡単に書けます。ライブラリでアクター風メッセージパッシングを実現するのは容易ですが、共有されない空間を実現するのは大変です。
もしも本当に共有されていない空間が必要であれば、OSの助けを借りるのが簡単です。本当にプロセスを分けてしまえば良いのです。rinda_evalは簡単にプロセスを作れるのでこういった局面でも有効です。クラスの定義や前処理の結果など基本的なオブジェクトの状態はまるまるコピーで手に入れつつ、共有されていない空間を手に入れることができます。メッセージパッシングのミドルウェアにはRindaのタプルスペースがそのまま使えます。
簡単にプロセスを生成できるrinda_evalの中身を覗いてみましょう。
require 'drb/drb'
require 'rinda/rinda'
module Rinda
module_function
def rinda_eval(ts)
Thread.pass # FIXME
ts = DRbObject.new(ts) unless DRbObject === ts
pid = fork do
Thread.current['DRb'] = nil
DRb.stop_service
DRb.start_service
place = TupleSpaceProxy.new(ts)
tuple = yield(place)
place.write(tuple) rescue nil
end
Process.detach(pid)
end
end
実は20行にも満たない短いスクリプトですね。いろいろ難しそうに見えますが肝は2点です。forkを通して子プロセスを起動している点と、タプルスペースの参照を子プロセスにうまく手渡す点です。
forkは親プロセスのメモリ空間や資源をそのままコピーした新しいプロセス作成するUnixシステムコールメソッドです。Rubyにおけるforkメソッドも同様で、Rubyオブジェクトの状態などがそのまま子プロセスに引き継がれます。forkを呼ぶ瞬間の親プロセスの状態を子プロセスに伝えられるのを利用して、子プロセスの初期状態を準備することができます。 forkした後に子プロセスに情報を送る、あるいは子プロセスから親プロセスへ情報を送るにはどうすればいいでしょう。古典的なUNIXプログラミングではpipeやsocketpairなどが利用されてきました。どちらも親子のプロセス間に成立するストリームです。rinda_evalではプロセス間の情報のやりとりにタプルスペースを使います。
result = nil
pid = fork do
result = "hello"
end
p "result: #{result}, pid : #{pid}"
上の結果は"result: , pid : 72287"となります。
そこでタプルスペースを利用して値をやりとりします。
parent = Rinda::TupleSpace.new
DRb.start_service
pid = fork do
child = DRbObject.new(parent)
DRb.start_service
child.write([:result, "hello"])
end
_, result = parent.read([:result, nil])
p "result: #{result}, pid : #{pid}"
結果は"result: hello, pid : 72317"となります。
これの凄い事なんですが、タプルスペースには値渡ししかできないのですが親プロセスから子プロセスへはクロージャがつかえるので、子プロセスにProc, Lambda, Blockなど渡し、演算結果のみ返す事ができます。
calc = Proc.new{|a| a * 2}
pid = fork do
child = DRbObject.new(parent)
DRb.start_service
child.write([:result, calc[3]])
end
_, result = parent.read([:result, nil])
p "result: #{result}, pid : #{pid}"
結果は"result: 6, pid : 72401"です。
以前TupleSpaceで分散階乗サービスを作る例がありましたが、あれの場合はあらかじめサーバサイドが計算式(この例の場合は階乗)を定義していなければいけません。しかしながらforkとタプルスペースを使った場合、あらゆる式を親プロセスで定義し、子プロセスに分散処理させたあとで結果だけもらう事ができます。これを汎用的な仕組みにしたのが20行弱のrinda_evalになります。
逆にTupleSpace分散階乗サービスと比較しての短所ですが、forkコマンドに依存しているため、複数のマシンにまたがっての分散処理はできません。
Rindaのタプルスペースは複雑なプロセス間通信を容易してくれますが、タプルスペースすべてが揮発性のメモリに収まっているため、システムがクラッシュした際に全てのデータを失ってしまいます。そこがネックで使ってくれない人もいるようなので、自分なりに永続化のしくみとその制約にかんして考えてみました。
この節ではRinda::TupleSpaceをおさらいしながら、TupleSpaceに永続化したPTupleSpaceの概要を紹介します。
PTupleSpaceはTupleSpaceのサブクラスです。タプルの状態の変化を逐次二次記憶にログして、次回の起動に備えます。PTupleSpaceを再起動すると最後の(最新の)タプルの状態のままに復元されます。
PTuplespaceを利用するのはきわめて簡単です。以下に利用例を載せます。
require 'rinda/ptuplespace'
store = Rinda::TupleStoreLog.new('ts_log')
Rinda::setup_tuple_store(store)
DRb.install_id_conv(Rinda::TupleStoreIdConv.new)
ts = Rinda::PTupleSpace.new
DRb.start_service('druby://localhost:23456', ts)
ts.restore
ts.write(['Hello', 'World'])
p ts.read_all(['Hello', nil])
p ts.take(['Hello', nil])
x = ts.write(['Hello', 'cancel'], 2)
p ts.read_all(['Hello', nil])
ref = DRbObject.new(x)
ref.cancel
p ts.read_all(['Hello', nil])
x = ts.write(['Hello', 'World'])
p DRbObject.new(x)
肝心なのは Rinda::TupleStoreLog.newにファイル名を指定し、それをRinda::setup_tuple_store(store)に渡すだけです。
DRb.install_id_conv(Rinda::TupleStoreIdConv.new)
これはGC(Garbage Collection)を回避するための仕組みです。詳細に関してはGCの章で取り扱います。
(注:"restore" "cancel"の用法に関して説明をしていただけますか? => 咳さん)
この節ではPTupleSpaceをKVSと比較した際の考察、そして永続化機能として考えた時に必須のクラッシュ&リカバリーに関して考えていきます。
APIの視点からストレージとしてのTupleSpaceをおさらいします。
TupleSpaceはタプル群を扱う集合構造です。同じ情報を複数持つことができるので、Bagと言えるでしょう。
最近の流行言葉にKVSという言葉ありますね。キーと値で表現するなら、同じキーを持つ要素の重複を許すストレージです。キーしかなくて、キーが値、にも見えますが。
これに対してHashは一つのキーに一つの値が関連付けられる辞書です。
TupleSpaceで辞書を模倣するのはやっかいです。[キー, 値]というタプルで辞書を構成仕様とした場合を考えてみましょう。まずデータを読むのは次のように書けそうです。
@ts.read([キー, nil])
では要素の追加はどうでしょう。
@ts.write([キー, 値])
このような単純なwriteでは重複を防ぐことはできません。全体をロックして、そのキーのタプルを削除してからwriteする必要があります。
def []=(key, value)
lock = @ts.take([:global_lock])
@ts.take([key, nil], 0) rescue nil
@ts.write([key, value])
ensure
@ts.write(lock) if lock
end
このグローバルなロックは実はデータを読むときにも必要です。なぜなら、そのキーの情報を別のスレッドが更新中かもしれないからです。
def [](key)
lock = @ts.take([:global_lock])
_, value = @ts.read([key, nil], 0) rescue nil
return value
ensure
@ts.write(lock) if lock
end
要素の増減がないケースでは前章で示した通り、グローバルなロックは不要です。だれかが更新中はその要素は取り出せませんが、更新が終わればまた書き戻されるはずです。ですから、単に要素が読めるまでreadで待ってしまえば良いことになり、局所的なロックとなります。
eachはどのように実装したらよいでしょう。TupleSpace全体を順に走査するうまい方法はありません。read_allで全ての要素のArrayを生成して、その配列にeachを委譲することになります。
def each(&blk)
lock = @ts.take([:global_lock])
@ts.read_all([nil, nil]).each(&blk)
ensure
@ts.write(lock) if lock
end
要素数が少ないうちは気になりませんが、多くなると損している気がしますね。
分散ハッシュテーブルなどでもeachやkeysを低コストで実装するのは難しいかもしれません。
流行のストレージには、常にキーでソートされているシーケンスを持つものがあります。並んでいることを利用して、大きな空間をブラウズするのが得意です。キーを工夫することでバージョン付きの情報を蓄えることもできます。RindaのTupleSpaceには、タプルを順序付けて並べることはできませんから、これを低コストで模倣するのは難しいです。
ところであなたが欲しかった集合は本当にHashでしたか?
タプルは実世界の「カンバン」によく似ています。タプルをプロセス間でリレーしながら仕事を進めていく様子は、「カンバン」を持ち回って仕事を行うのにそっくりです。Rindaの世界では「カンバン」はTupleSpaceを介してプロセスからプロセスへ渡り歩きます。
PTupleSpaceの提供する永続化は、TupleSpaceに蓄えられたカンバンの束にのみ作用します。プロセスが持っているカンバンをPTupleSpaceが知ることはできず、永続化されません。また、待合せている様子も永続化の対象ではありません。プロセスがあるカンバンを待っている、という状況までは再現できないのです。
TupleSpaceに期待する機能がカンバンの貯蔵庫であると考えた場合には、これで充分と言えるでしょう。PTupleSpaceにwriteした情報は再起動後もそのまま手に入ります。多くのアプリケーションではこれで間に合うかもしれません。ArrayやHashをそのままdRubyで公開する、あるいはログ付きで公開するのに比べて、TupleSpaceはどのくらい便利なのでしょうか。おそらく、RindaのTupleSpaceの強力なパターンマッチングにはある程度のアドバンテージがあるでしょう。そのパターンマッチングと引き換えに、あまり効率のよいデータ構造を使うことができませんでした。実装には線形探索が残っていて、要素数が増えたときに不安があります。
TupleSpaceの本来の役割であるプロセス間の協調についてはどうでしょうか。PTupleSpaceに異常が起きてクラッシュしてしまった、再起動が必要になった、といった状況を想像してみましょう。まず、PTupleSpaceプロセスが停止することにより、readやtakeなどの待合せのRMIを実行していたプロセスではdRubyの例外があがります。PTupleSpaceが再起動されるとタプル群の最後の状態に復元されます。待合せをしていたプロセスは再起動したことを(知るのは難しいのですが)知ったのち、例外が発生した操作をやり直すことになります。しかし、そのように再開するスクリプトを書くのは難しく面倒です。
また、RMIのために抱え込む厄介な問題もあります。writeやtakeなど、タプルの状態を変える操作を考えてみましょう。通常のメソッド呼び出しでは処理が終われば呼び出した側に直ちに制御がもどりますが、RMIではサーバ側のメソッドの終了と、RMIの終了の間にソケット通信が行われます。つまり、処理が終わる前に例外が発生したのか、結果を伝える間に例外が発生したのか知ることができません。PTupleSpaceが二次記憶にタプルの操作をログしたあとに、クライアントにその完了が届く前にクラッシュしてしまう可能性があります。(全てがうまくいってからログする実装を選んでも、クライアントにタプルが届いたのち、ログするまえにクラッシュする可能性があります)
異常終了といえば、プロセス側のクラッシュも考えられますね。PTupleSpaceの対象外ですがちょっと想像してみましょう。カンバンをプロセスが取り出したままクラッシュしてしまうと、復元する方法がありません。次の短いスクリプトを見てみましょう。
def succ
_, count = @ts.take([:count, nil])
count += 1
yield(count)
ensure
@ts.write([:count, count) if count
end
これは[:count, 整数]のタプルを取り出し、一つ大きくしてまた書き込むスクリプトです。カンバンを取り出し、カウンタを一つ進め、最後にTupleSpaceに書き戻します。カンバンがプロセスにある間は、別のプロセスはカンバンをTupleSpaceから読んだり、取り出したりすることはできないので安全にカウンタを操作できます。さて、もしもカンバンがプロセスにある間にそのプロセスがクラッシュしたらどうなるでしょう。PTupleSpaceは自身の中にあるカンバンしか復元できませんから、そのカンバンは失われたままです。このカウンタを操作するプロセス群は全て停まってしまいます。こういった使い方(協調に使うケースの多くはそうなんだと思うのですが)をする場合、TupleSpaceだけでなく関係するプロセス群も再起動する必要があるだけでなく、TupleSpace内のタプルも初期状態にする必要があります。せっかくタプルの状態を復元できるようにしたというのに‥。
PTupleSpaceはTupleSpace自体の永続化を目的としたもので、それ自体はおそらく期待した通りに動作すると思います(そういうつもりで作ったので)。しかし、それだけでは協調するプロセス群をもとに戻すことはできません。ちょっとだまされた気分ですよね。
この章では以下の事について学びました。
- 並列計算をするためのrinda_eval
- 永続化のしくみを提供するためのptuplespace
永続化に関しては可能な事は可能なのですが、TupleSpaceにただ永続化を提供するだけでは十分でなく、それに即した独自なものが必要なのではないかと考えるようになりました。次章ではその考えの末たどりついたDripについて解説していきます。