Created
August 4, 2012 17:25
-
-
Save umegaya/3258856 to your computer and use it in GitHub Desktop.
yueチラシの裏
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
socketpairを使えば双方向で書き込めるのでpopen://を改善できるかも | |
オブジェクトが無効になるタイミング: | |
signal: never | |
timer: timer:close()呼んだあと | |
session: session:close()呼んだあと | |
listener: never | |
filesystem: filesystem:close()のあと(inotify base) | |
process: process:close()の後 | |
* sessionは再接続するので、切断時には無効に近い状態となる。 | |
emittableをベースクラスにする。handlerもemittable. | |
emittable > handler : session, listener, filesystem, process | |
emittable : timer, signal | |
emittableはsmart pointer likeな機構を持っているが、refer(), unref()は自分で呼ぶ必要がある。 | |
* emittableのメモリ管理について | |
emittableごとにutil::arrayで管理する。(mapは使うか?) | |
serverがリストを持つようにする | |
server { | |
listners : map (listener address, listen) | |
sessions : map (remote address, session) | |
timers (powered by timerfd) : array (timer) | |
signals (powered by signalfd) : array (signal) | |
filesystems : map (filesystem root path, filesystem) | |
processes : map (process Id, process) | |
} | |
server::listen(listener addr, options) => yue.listen | |
server::open(remote addr, options) => yue.open | |
server::timer(start dur, duration) => yue.timer | |
server::signal(signo) => yue.signal | |
server::fs(path, options) => yue.fs | |
srever::proc(process path, options) => yue.proc | |
structure | |
yue_listener_t, void * | |
yue_session_t void * | |
yue_timer_t void * | |
yue_signal_t void * | |
yue_fs_t void * | |
yue_proc_t void * | |
yue_thread_t void * | |
それぞれmetatypeをもつ | |
local method_index = function (t, k) | |
local pk,f = t.__parse(k) | |
if not pk then error('wrong method name:' .. k) end | |
if t[pk] then t[k] = t[pk] | |
else t[k] = setmetatable({ __cdata = t.__cdata, __flag = f, __name = (t.__name .. "." .. pk), method_mt) | |
end | |
return t[k] | |
end | |
local async_launch = function (t, ft) | |
ft:emit(t.__cdata:__call(t.__name, t.__flag, ft, ...)) --> yue_emitter_call | |
end | |
local method_mt { | |
__index = method_index, | |
__call = function (t, ...) | |
if bit.band(t.__flag, FUTURE) then | |
local ft = yue.future() | |
yue.thread(async_launch):run(t, ft) | |
return ft | |
elseif bit.band(t.__flag, MCAST) then | |
local ft = yue.future() | |
yue.thread(async_launch):run(t, ft) | |
return ft | |
end | |
return t.__cdata:__call(t.__name, t.__flag, ...) --> yue_emitter_call | |
end | |
} | |
local emitter_mt { | |
__name = '', | |
__index = method_index, | |
__call = ffi.C.yue_emitter_call, --> == lua::emitter::call | |
__gc = function (t) | |
ffi.C.yue_emitter_unref(t.__cdata) | |
end | |
} | |
ffi.metatype("yue_listener_t*", extend(emitter_mt, { | |
})) | |
yue_session_tだけはraw sessionかどうかでmetatypeを切り替えられないからどうしよう => こんな感じ | |
ffi.metatype("yue_rawsession_t*", emitter_mt); | |
ffi.metatype("yue_session_t*", extend(emitter_mt, { | |
__call = yue_session_call, --> == lua::session::call | |
})); | |
ffi.metatype("yue_thread_t*", extend(emitter_mt, { | |
__call = yue_thread_call, --> == lua::thread::call | |
})); | |
local yue_mt = { | |
__call = function(t, ...) | |
return setmetatable({ __cdata = ffi.C["yue_" .. k.__type .. "_create"](...) }, emitter_mt) | |
end | |
} | |
yue = setmetatable(yue, { | |
__index = function(t, k) | |
t[k] = setmetatable({ __type = k, }, yue_mt) | |
return t[k] | |
end | |
}) | |
yue.accepted --> acceptしたconnectionだけ | |
yue.peer --> rpcしてきた相手へのコネクション | |
connectionを複数スレッドで利用する場合 => poolに登録しておく | |
で、アプリからはこんな風に呼ぶ | |
function open_with_cache(addr, opt) | |
return yue.pool(addr) or yue.session(addr, opt) | |
end | |
-->のはやめた。yue_session_createにオプションでcacheから取得するかどうかを与えられるようにして、cacheから返せるようにする。 | |
cache = false --> 重複するaddrがあっても新しく作って返す。cacheに登録されない。 | |
cache = true --> 重複するaddrがあればそれを返す。ない場合に新規作成されるとcacheに登録される。 | |
defaultはcache = true | |
*マルチスレッドにおける変数の扱い | |
local (upvalue) : そのスレッドで閉じる。そのスレッド上で実行されているcoroutineからのみ参照、変更可能 | |
global: スレッドで共有される。一番最初に宣言したスレッドがownerになり、他のスレッドはrpcでアクセスしてくる。 | |
namespace_mt = { | |
__index = function (t, k) | |
if type(v) == 'function' then | |
-- just set to t | |
return t[k] | |
else | |
return t.__thread.get(t, k) | |
end | |
end | |
__newindex = function (t, k, v) | |
if type(v) == 'function' then | |
-- just set to t | |
t[k] = v | |
else | |
t.__thread.set(t, k, v) | |
end | |
end | |
} | |
emittable | |
:wait(event) --> eventをまつ | |
:close() --> __cdataをnilにする。二度と復活しない | |
thread | |
:set --> rpcであるnamespaceのglobal変数に値を設定する | |
:get --> rpcであるnamespaceのglobal変数の値を得る | |
:cas --> rpcであるnamespaceのglobal変数をcompare and swapする | |
session | |
:{func} --> funcという名前のrpcを実行する wait,closeという名前のrpcは呼べない | |
:namespace --> 関連づけられたnamespaceオブジェクトを返す | |
socket | |
:read (addressを読み込める時とそうでない時がある:stream, datagram) | |
:write | |
listener | |
:namespace --> 関連づけられたnamespaceオブジェクトを返す | |
fs: emittable | |
proc | |
:read | |
:write | |
sessionのライフサイクル | |
1. yue.session(...)で生成される => server::session_poolからC++のポインタがアロケートされ、ref count +1 luaのテーブルにセットされてオブジェクトになる | |
== serverコネクションであれば、2以降はそのまま実行される。clientコネクションであれば、connectが呼ばれるまで、1の状態のまま == | |
2. C++のポインタはloop::openされ、handshakerにinsertされる | |
3. C++のポインタはloopからread/writeイベントでコールバックされながらhandshakeのフェーズを進行させる | |
4. handshakeが終了すると、openイベントがemitされる | |
5. openイベントをフックしている各スレッドがluaのコールバックを実行する(全スレッドが実行してもいいかどうか、1スレッドに限定することが必要か?) | |
6. 通信可能になっている。rpcのコールアンドレスポンスするか、dataイベントがemitされるかしている | |
7a. 何らかの理由で通信が切断する(リモート切断の場合) | |
8a. C++のポインタはloopからcloseイベントをコールバックされる。 | |
9a. sessionの状態はclosedになる。 | |
== もしユーザーがcloseを呼べば、7bからのフローが実行される。connectを呼べば2からのフローが再度実行される == | |
7b. ユーザープログラムが接続を切断しようとする(ローカル切断の場合)emittable:close()のコール。 | |
8b. closeイベントがキューに積まれる。以下9a~と同じ。 | |
9b. closeイベントが実行される。closeイベントがlua側にemitされる。 | |
10b. closeイベントをフックしている各vmスレッドがluaのコールバックを実行する。(これも5と同様の考察が必要)また、12aと順番は前後する | |
11b. C++のポインタのunrefが行われる。(もし10aの時点でキューに積まれているポインタがあれば、この時点ではまだ実際の終了処理は行われない) | |
12b. luaのコールバックを実行したあと、キューに積まれていたイベントに含まれるポインタについてもunrefされる | |
13b. 最終的にref countが0になり、メモリから解放される。 | |
sched_unref()で、最後のemit以後にポインタ自身のunrefを行うようにする | |
emittable pointerは基本的に複数スレッドからshareされる。のでデータを変更する処理をluaから呼ぶ場合にはmutexでロックしたり、gcc-atomic pluginを使うような対処が必要。mutexでのロックはできるだけさける | |
そういえば、luaコード内でも1回だけ待ちたいときと複数回待つときとあるような気がする | |
watchとonceにするかな | |
watch: そのまま呼んでもfutureを返す async_watchもfutureを返すcb(意味はない) | |
once: そのまま呼んだ場合、emitの引数を戻り値にする async_onceはfutureを返す | |
timer:asnyc_watch("tick"):on(function (ft) | |
ft:on(function (c) | |
print('tick!!', c) --> ズッcbされる | |
end) | |
end) | |
timer:async_once("tock"):on(function (c) | |
print('tick!!', c) --> 一回だけ | |
end) | |
timer:watch("tick"):on(function (c) | |
end) | |
local ft = timer:watch("tick") | |
local c = timer:once("tick") | |
watch = yue.method({__cdata = ffi.C.yue_timer_create(start_dur, intv), __name="watch", __f = bit.bor(0, MCAST)}) | |
みたいに最初のmetatable(timer_mt)で設定しておけばOKか | |
watchのように無限にコールバックされるには | |
C++側ではmsgidが指定されていないadd_watcherだとそうなる。(そしてfiberを生成する) | |
futureとの関連づけが難しいか?どちらかというとfutureがfiberを生成しなくてはいけない | |
eg) | |
future_mt.on(self, cb) | |
for v in ipair(self.recved) do | |
yue.thread(cb):run(unpack(v)) | |
end | |
self.cb = cb | |
end | |
future_mt.__call(self, ...) | |
if self.cb then | |
yue.thread(cb):run(...) | |
else | |
table.insert(self.recved, pack(...)) | |
end | |
end | |
function yue.future() | |
return setmetatable({ recved:{} }, future_mt) | |
end | |
これにどうやって時間制限を付けるというのか。 | |
client session作成 | |
-> luaから参照される (userdata) luaのがベージコレクターがラップしているluaオブジェクトを回収する際にunrefされる | |
-> C++コードから参照される (emittable::wrap) wrapがunrefされれば(dtorがよばれることで)解放 | |
-> loop::openされて、loopから参照される。loop::closeが呼ばれたときにunrefされる | |
-> read/write/closeのためtask::ioに積まれる。task::ioが処理されればunref | |
-> emitのため、fabric::taskに積まれる。fabric::taskが処理されればunref | |
1. connectが呼ばれた場合 | |
loop::openでreferされるのでrefc 1 or 2 | |
-> handshakerに登録されてrefc 2 or 3 | |
-> handshakerが完了したらhandshakerがunrefしてrefc 1 or 2 | |
-> handshakerが失敗したら ==> 3. local connection close | |
-> establishになる。establishイベントがemitされる | |
2. remote connection closeの場合 | |
-> event handler session::on_readがdestroyを返す | |
-> closeイベントがタスクキューに積まれる | |
-> session::on_close state=>CLOSEDになる。closeイベントがemitされる(refc==2 or 1のまま) | |
-> loopからreferされていたポインタがunrefされる。(refc==1 or 0) | |
refc == 1の場合、emittable::wrapかluaがpointerを参照している。これらがpointerをunrefしたら、server::emitter_finalizerで解放される | |
refc == 0の場合(loopしか参照していない場合)、その場でserver::emitter_finalizerで解放されてしまう | |
-> しかし、CLOSEDのタイミングで他のスレッドがちょうどこのポインタをキューに積んでる可能性ってないのか。 | |
-> thread Aでreadを処理していて、結果としてemitが積まれてそれが処理される前に、別スレッドから同じfdが取り出されることはある? | |
-> readでemitを積む直前でそのthreadがsuspendして、refc==0になった後にresumeされたらまずい(解放されたpointerがキューに積まれる) | |
-> しかし、readを処理しているってことは、まだfdはselectorの中に戻っていない | |
-> あるスレッドでのreadの処理中に他のスレッドでのreadが走ることはない(他のスレッドのpoller::waitでそのfdがとれてこないから.さらに最低でもepollはスレッドセーフ) | |
-> emitが積まれた後、fdがselectorに戻り、別スレッドで取り出されてclose関連の処理が走ることはある | |
-> その場合はすでにpointerはreferされている(emitに積まれた場合)したがってメモリは解放されない! | |
-> emitが処理されて、unrefされればそのタイミングで解放される。 | |
3. local connection closeの場合 | |
-> sessionにfinalizeフラグをたてて、自らcloseイベントをタスクキューに積むそれ以降は2.と同じ。luaから参照されててもされてなくても、5.の処理があればうまく動く。 | |
4. connectの再呼び出し | |
-> 1.へ | |
5. closeの呼び出し | |
-> 3.と同じにしたい。最後のリンクを削除できるように、closeイベントがemitされた場合には、finalizeフラグがたっている場合には、__cdataをnilにして、__gcが呼ばれるようにする。__gcはsessionポインタのunref()を呼び出して、そこでemitter_finalizerが呼ばれる | |
server session: | |
listener on_readで作成される。(refc == 0) | |
handshakerに登録(refc == 1) | |
loop::openが呼ばれる(refc == 2) | |
handshakeが失敗した場合、refc == 1となって、closeのフロー(==> 3.) | |
handshakeが成功した場合、refc == 1となって、stateはwait acceptとなり、listener acceptが呼ばれる(refc == 2)。 全スレッドのlistenerがemitされるとまずいな。 | |
emit oneとemit allとかがいるかもな。listenerの場合はemit one.session stateはemit all.emit oneは先頭のポインタが絶えず入れ替わるようにして、負荷分散する | |
本来はlistner on_readを呼んだスレッドのVMを直接emitするのがいいかもしれない。ただ、その場合存在しないかもしれない(全スレッドで同じポートをemitしていない場合) | |
ので、このアルゴの方が良さそう | |
emit oneがあるとして、選ばれたlistenerがクライアントのRPCを呼び出して、認証することになる。認証が成功したらsession:grant()を呼ぶ | |
stateがestablishとなる。emitもされる。 | |
remote/local におけるconnection closeは2,3と同じ。ただしconnectは呼び出せない。closeがemitされたら、常に5。と同じように振る舞うべき。 | |
以上の考察から | |
finalizeステートはいらないことがわかった。closeになったときにだれも参照していなければ解放されるし、そうでなければ、保持されており、もう一回connectを呼び出すことで接続された状態に再度移行できる。remote closeでcloseになった場合には、(あるいは、まだestablishしているconnectionをcloseする場合には)close()を呼び出すことになる。 | |
実行モデル | |
複数のスレッドが1つずつluaJIT VMを持っている。 | |
オブジェクトは、C++のemittable pointerをLuaのテーブルでラップした構造 | |
coroutineがそれぞれのVMでは実行され、objectを待つことができる。 | |
通常、すべてのVMが同じコードで動作するため、全部のthreadが同じポートをlistenすることになるが、同じポートへのlistenはlistener emitterを共有するだけである。 | |
emittable::emitのマルチスレッドにおけるスケーラビリティを向上させる対応 | |
要件: | |
1. 複数スレッドからemitされても問題なく動作する | |
2. 同じemittableに対する複数のemitは呼ばれた順にwatcherに届く(コールバックされる) | |
3. 最初に見つかったやつにだけコールバック、全部にコールバックを選択できる | |
4. 高速である(一回のemitでwatcherのリストをなめている間mutexがブロックしないでほしい) | |
1. 2.の条件から何らかのキューに積むことが必要。native thread側のtask queueが一番いいけど、emitはどのスレッドからも呼ばれる。呼ばれたスレッドのtask queueに積んでいては2.が満たせない。emittableをどれかのスレッドに所属させてそのスレッドのqueueに積む、というやり方が考えられるがどうか。 | |
4. が相当難しい。キューに積まれれば特に問題ないけど。 | |
emittableにキューが内蔵されていて、どっかのスレッドで処理されるタイミングでキューに積まれた全部のeventが処理される。はどうか。 | |
キューに積まれた全部のイベントが処理されるときには、キューからそのときのツリーをatomicに抜いて処理する。 | |
今のemit listと同じような実装によって、あるスレッドがキューにeventを積みながら、ツリーをatomicにどっかの別のスレッドで処理することができる | |
watcherのadd/deleteもこのキューにのせる | |
排他制御は結構大変。 | |
threadのタスクキューに乗ってない状態で、一個でもイベントが積まれると、イベントを積んだスレッドのタスクキューに乗る。 | |
一度threadのタスクキューに乗ってから、emitが開始するまではemittableの内臓キューに積まれていく | |
emitが開始した後、完了するまで:どうする? | |
emitが完了した後、タスクキューに乗ってない状態に戻る。 | |
emittable { | |
owner_thread //null if not on queue, else which thread that this emittable on the queue | |
eventlist //concurrent linked queue of event (emit/add watcher/del watcher) | |
watcherlist //watcher list | |
add(event); | |
add(watcher); | |
del(watcher); | |
emit() | |
} | |
emittable::add(event) { | |
eventlist.add(event); | |
if (cas(&owner_thread, NULL, current_thread)) { | |
owner_thread->que().mpush(task::process_emit(this)); | |
} | |
} | |
emittable::emit() { | |
if (cas(&owner_thread, current_thread, NULL)) { | |
eventlist *list = eventlist.pick(); | |
while (list) { | |
dispatch(list->data); | |
list = list->next; | |
} | |
} | |
else ASSERT(false); | |
} | |
emittable::dispatch(event *e) { | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment