Skip to content

Instantly share code, notes, and snippets.

@umegaya
Created August 4, 2012 17:25
Show Gist options
  • Save umegaya/3258856 to your computer and use it in GitHub Desktop.
Save umegaya/3258856 to your computer and use it in GitHub Desktop.
yueチラシの裏
socketpairを使えば双方向で書き込めるのでpopen://を改善できるかも
オブジェクトが無効になるタイミング:
signal: never
timer: timer:close()呼んだあと
session: session:close()呼んだあと
listener: never
filesystem: filesystem:close()のあと(inotify base)
process: process:close()の後
* sessionは再接続するので、切断時には無効に近い状態となる。
emittableをベースクラスにする。handlerもemittable.
emittable > handler : session, listener, filesystem, process
emittable : timer, signal
emittableはsmart pointer likeな機構を持っているが、refer(), unref()は自分で呼ぶ必要がある。
* emittableのメモリ管理について
emittableごとにutil::arrayで管理する。(mapは使うか?)
serverがリストを持つようにする
server {
listners : map (listener address, listen)
sessions : map (remote address, session)
timers (powered by timerfd) : array (timer)
signals (powered by signalfd) : array (signal)
filesystems : map (filesystem root path, filesystem)
processes : map (process Id, process)
}
server::listen(listener addr, options) => yue.listen
server::open(remote addr, options) => yue.open
server::timer(start dur, duration) => yue.timer
server::signal(signo) => yue.signal
server::fs(path, options) => yue.fs
srever::proc(process path, options) => yue.proc
structure
yue_listener_t, void *
yue_session_t void *
yue_timer_t void *
yue_signal_t void *
yue_fs_t void *
yue_proc_t void *
yue_thread_t void *
それぞれmetatypeをもつ
local method_index = function (t, k)
local pk,f = t.__parse(k)
if not pk then error('wrong method name:' .. k) end
if t[pk] then t[k] = t[pk]
else t[k] = setmetatable({ __cdata = t.__cdata, __flag = f, __name = (t.__name .. "." .. pk), method_mt)
end
return t[k]
end
local async_launch = function (t, ft)
ft:emit(t.__cdata:__call(t.__name, t.__flag, ft, ...)) --> yue_emitter_call
end
local method_mt {
__index = method_index,
__call = function (t, ...)
if bit.band(t.__flag, FUTURE) then
local ft = yue.future()
yue.thread(async_launch):run(t, ft)
return ft
elseif bit.band(t.__flag, MCAST) then
local ft = yue.future()
yue.thread(async_launch):run(t, ft)
return ft
end
return t.__cdata:__call(t.__name, t.__flag, ...) --> yue_emitter_call
end
}
local emitter_mt {
__name = '',
__index = method_index,
__call = ffi.C.yue_emitter_call, --> == lua::emitter::call
__gc = function (t)
ffi.C.yue_emitter_unref(t.__cdata)
end
}
ffi.metatype("yue_listener_t*", extend(emitter_mt, {
}))
yue_session_tだけはraw sessionかどうかでmetatypeを切り替えられないからどうしよう => こんな感じ
ffi.metatype("yue_rawsession_t*", emitter_mt);
ffi.metatype("yue_session_t*", extend(emitter_mt, {
__call = yue_session_call, --> == lua::session::call
}));
ffi.metatype("yue_thread_t*", extend(emitter_mt, {
__call = yue_thread_call, --> == lua::thread::call
}));
local yue_mt = {
__call = function(t, ...)
return setmetatable({ __cdata = ffi.C["yue_" .. k.__type .. "_create"](...) }, emitter_mt)
end
}
yue = setmetatable(yue, {
__index = function(t, k)
t[k] = setmetatable({ __type = k, }, yue_mt)
return t[k]
end
})
yue.accepted --> acceptしたconnectionだけ
yue.peer --> rpcしてきた相手へのコネクション
connectionを複数スレッドで利用する場合 => poolに登録しておく
で、アプリからはこんな風に呼ぶ
function open_with_cache(addr, opt)
return yue.pool(addr) or yue.session(addr, opt)
end
-->のはやめた。yue_session_createにオプションでcacheから取得するかどうかを与えられるようにして、cacheから返せるようにする。
cache = false --> 重複するaddrがあっても新しく作って返す。cacheに登録されない。
cache = true --> 重複するaddrがあればそれを返す。ない場合に新規作成されるとcacheに登録される。
defaultはcache = true
*マルチスレッドにおける変数の扱い
local (upvalue) : そのスレッドで閉じる。そのスレッド上で実行されているcoroutineからのみ参照、変更可能
global: スレッドで共有される。一番最初に宣言したスレッドがownerになり、他のスレッドはrpcでアクセスしてくる。
namespace_mt = {
__index = function (t, k)
if type(v) == 'function' then
-- just set to t
return t[k]
else
return t.__thread.get(t, k)
end
end
__newindex = function (t, k, v)
if type(v) == 'function' then
-- just set to t
t[k] = v
else
t.__thread.set(t, k, v)
end
end
}
emittable
:wait(event) --> eventをまつ
:close() --> __cdataをnilにする。二度と復活しない
thread
:set --> rpcであるnamespaceのglobal変数に値を設定する
:get --> rpcであるnamespaceのglobal変数の値を得る
:cas --> rpcであるnamespaceのglobal変数をcompare and swapする
session
:{func} --> funcという名前のrpcを実行する wait,closeという名前のrpcは呼べない
:namespace --> 関連づけられたnamespaceオブジェクトを返す
socket
:read (addressを読み込める時とそうでない時がある:stream, datagram)
:write
listener
:namespace --> 関連づけられたnamespaceオブジェクトを返す
fs: emittable
proc
:read
:write
sessionのライフサイクル
1. yue.session(...)で生成される => server::session_poolからC++のポインタがアロケートされ、ref count +1 luaのテーブルにセットされてオブジェクトになる
== serverコネクションであれば、2以降はそのまま実行される。clientコネクションであれば、connectが呼ばれるまで、1の状態のまま ==
2. C++のポインタはloop::openされ、handshakerにinsertされる
3. C++のポインタはloopからread/writeイベントでコールバックされながらhandshakeのフェーズを進行させる
4. handshakeが終了すると、openイベントがemitされる
5. openイベントをフックしている各スレッドがluaのコールバックを実行する(全スレッドが実行してもいいかどうか、1スレッドに限定することが必要か?)
6. 通信可能になっている。rpcのコールアンドレスポンスするか、dataイベントがemitされるかしている
7a. 何らかの理由で通信が切断する(リモート切断の場合)
8a. C++のポインタはloopからcloseイベントをコールバックされる。
9a. sessionの状態はclosedになる。
== もしユーザーがcloseを呼べば、7bからのフローが実行される。connectを呼べば2からのフローが再度実行される ==
7b. ユーザープログラムが接続を切断しようとする(ローカル切断の場合)emittable:close()のコール。
8b. closeイベントがキューに積まれる。以下9a~と同じ。
9b. closeイベントが実行される。closeイベントがlua側にemitされる。
10b. closeイベントをフックしている各vmスレッドがluaのコールバックを実行する。(これも5と同様の考察が必要)また、12aと順番は前後する
11b. C++のポインタのunrefが行われる。(もし10aの時点でキューに積まれているポインタがあれば、この時点ではまだ実際の終了処理は行われない)
12b. luaのコールバックを実行したあと、キューに積まれていたイベントに含まれるポインタについてもunrefされる
13b. 最終的にref countが0になり、メモリから解放される。
sched_unref()で、最後のemit以後にポインタ自身のunrefを行うようにする
emittable pointerは基本的に複数スレッドからshareされる。のでデータを変更する処理をluaから呼ぶ場合にはmutexでロックしたり、gcc-atomic pluginを使うような対処が必要。mutexでのロックはできるだけさける
そういえば、luaコード内でも1回だけ待ちたいときと複数回待つときとあるような気がする
watchとonceにするかな
watch: そのまま呼んでもfutureを返す async_watchもfutureを返すcb(意味はない)
once: そのまま呼んだ場合、emitの引数を戻り値にする async_onceはfutureを返す
timer:asnyc_watch("tick"):on(function (ft)
ft:on(function (c)
print('tick!!', c) --> ズッcbされる
end)
end)
timer:async_once("tock"):on(function (c)
print('tick!!', c) --> 一回だけ
end)
timer:watch("tick"):on(function (c)
end)
local ft = timer:watch("tick")
local c = timer:once("tick")
watch = yue.method({__cdata = ffi.C.yue_timer_create(start_dur, intv), __name="watch", __f = bit.bor(0, MCAST)})
みたいに最初のmetatable(timer_mt)で設定しておけばOKか
watchのように無限にコールバックされるには
C++側ではmsgidが指定されていないadd_watcherだとそうなる。(そしてfiberを生成する)
futureとの関連づけが難しいか?どちらかというとfutureがfiberを生成しなくてはいけない
eg)
future_mt.on(self, cb)
for v in ipair(self.recved) do
yue.thread(cb):run(unpack(v))
end
self.cb = cb
end
future_mt.__call(self, ...)
if self.cb then
yue.thread(cb):run(...)
else
table.insert(self.recved, pack(...))
end
end
function yue.future()
return setmetatable({ recved:{} }, future_mt)
end
これにどうやって時間制限を付けるというのか。
client session作成
-> luaから参照される (userdata) luaのがベージコレクターがラップしているluaオブジェクトを回収する際にunrefされる
-> C++コードから参照される (emittable::wrap) wrapがunrefされれば(dtorがよばれることで)解放
-> loop::openされて、loopから参照される。loop::closeが呼ばれたときにunrefされる
-> read/write/closeのためtask::ioに積まれる。task::ioが処理されればunref
-> emitのため、fabric::taskに積まれる。fabric::taskが処理されればunref
1. connectが呼ばれた場合
  loop::openでreferされるのでrefc 1 or 2
-> handshakerに登録されてrefc 2 or 3
-> handshakerが完了したらhandshakerがunrefしてrefc 1 or 2
-> handshakerが失敗したら ==> 3. local connection close
-> establishになる。establishイベントがemitされる
2. remote connection closeの場合
-> event handler session::on_readがdestroyを返す
-> closeイベントがタスクキューに積まれる
-> session::on_close state=>CLOSEDになる。closeイベントがemitされる(refc==2 or 1のまま)
-> loopからreferされていたポインタがunrefされる。(refc==1 or 0)
refc == 1の場合、emittable::wrapかluaがpointerを参照している。これらがpointerをunrefしたら、server::emitter_finalizerで解放される
refc == 0の場合(loopしか参照していない場合)、その場でserver::emitter_finalizerで解放されてしまう
   -> しかし、CLOSEDのタイミングで他のスレッドがちょうどこのポインタをキューに積んでる可能性ってないのか。
-> thread Aでreadを処理していて、結果としてemitが積まれてそれが処理される前に、別スレッドから同じfdが取り出されることはある?
-> readでemitを積む直前でそのthreadがsuspendして、refc==0になった後にresumeされたらまずい(解放されたpointerがキューに積まれる)
-> しかし、readを処理しているってことは、まだfdはselectorの中に戻っていない
-> あるスレッドでのreadの処理中に他のスレッドでのreadが走ることはない(他のスレッドのpoller::waitでそのfdがとれてこないから.さらに最低でもepollはスレッドセーフ)
-> emitが積まれた後、fdがselectorに戻り、別スレッドで取り出されてclose関連の処理が走ることはある
     -> その場合はすでにpointerはreferされている(emitに積まれた場合)したがってメモリは解放されない!
-> emitが処理されて、unrefされればそのタイミングで解放される。
3. local connection closeの場合
-> sessionにfinalizeフラグをたてて、自らcloseイベントをタスクキューに積むそれ以降は2.と同じ。luaから参照されててもされてなくても、5.の処理があればうまく動く。
4. connectの再呼び出し
-> 1.へ
5. closeの呼び出し
-> 3.と同じにしたい。最後のリンクを削除できるように、closeイベントがemitされた場合には、finalizeフラグがたっている場合には、__cdataをnilにして、__gcが呼ばれるようにする。__gcはsessionポインタのunref()を呼び出して、そこでemitter_finalizerが呼ばれる
server session:
listener on_readで作成される。(refc == 0)
handshakerに登録(refc == 1)
loop::openが呼ばれる(refc == 2)
handshakeが失敗した場合、refc == 1となって、closeのフロー(==> 3.)
handshakeが成功した場合、refc == 1となって、stateはwait acceptとなり、listener acceptが呼ばれる(refc == 2)。 全スレッドのlistenerがemitされるとまずいな。
emit oneとemit allとかがいるかもな。listenerの場合はemit one.session stateはemit all.emit oneは先頭のポインタが絶えず入れ替わるようにして、負荷分散する
本来はlistner on_readを呼んだスレッドのVMを直接emitするのがいいかもしれない。ただ、その場合存在しないかもしれない(全スレッドで同じポートをemitしていない場合)
ので、このアルゴの方が良さそう
emit oneがあるとして、選ばれたlistenerがクライアントのRPCを呼び出して、認証することになる。認証が成功したらsession:grant()を呼ぶ
stateがestablishとなる。emitもされる。
remote/local におけるconnection closeは2,3と同じ。ただしconnectは呼び出せない。closeがemitされたら、常に5。と同じように振る舞うべき。
以上の考察から
finalizeステートはいらないことがわかった。closeになったときにだれも参照していなければ解放されるし、そうでなければ、保持されており、もう一回connectを呼び出すことで接続された状態に再度移行できる。remote closeでcloseになった場合には、(あるいは、まだestablishしているconnectionをcloseする場合には)close()を呼び出すことになる。
実行モデル
複数のスレッドが1つずつluaJIT VMを持っている。
オブジェクトは、C++のemittable pointerをLuaのテーブルでラップした構造
coroutineがそれぞれのVMでは実行され、objectを待つことができる。
通常、すべてのVMが同じコードで動作するため、全部のthreadが同じポートをlistenすることになるが、同じポートへのlistenはlistener emitterを共有するだけである。
emittable::emitのマルチスレッドにおけるスケーラビリティを向上させる対応
要件:
1. 複数スレッドからemitされても問題なく動作する
2. 同じemittableに対する複数のemitは呼ばれた順にwatcherに届く(コールバックされる)
3. 最初に見つかったやつにだけコールバック、全部にコールバックを選択できる
4. 高速である(一回のemitでwatcherのリストをなめている間mutexがブロックしないでほしい)
1. 2.の条件から何らかのキューに積むことが必要。native thread側のtask queueが一番いいけど、emitはどのスレッドからも呼ばれる。呼ばれたスレッドのtask queueに積んでいては2.が満たせない。emittableをどれかのスレッドに所属させてそのスレッドのqueueに積む、というやり方が考えられるがどうか。
4. が相当難しい。キューに積まれれば特に問題ないけど。
emittableにキューが内蔵されていて、どっかのスレッドで処理されるタイミングでキューに積まれた全部のeventが処理される。はどうか。
キューに積まれた全部のイベントが処理されるときには、キューからそのときのツリーをatomicに抜いて処理する。
今のemit listと同じような実装によって、あるスレッドがキューにeventを積みながら、ツリーをatomicにどっかの別のスレッドで処理することができる
watcherのadd/deleteもこのキューにのせる
排他制御は結構大変。
threadのタスクキューに乗ってない状態で、一個でもイベントが積まれると、イベントを積んだスレッドのタスクキューに乗る。
一度threadのタスクキューに乗ってから、emitが開始するまではemittableの内臓キューに積まれていく
emitが開始した後、完了するまで:どうする?
emitが完了した後、タスクキューに乗ってない状態に戻る。
emittable {
owner_thread //null if not on queue, else which thread that this emittable on the queue
eventlist //concurrent linked queue of event (emit/add watcher/del watcher)
watcherlist //watcher list
add(event);
add(watcher);
del(watcher);
emit()
}
emittable::add(event) {
eventlist.add(event);
if (cas(&owner_thread, NULL, current_thread)) {
owner_thread->que().mpush(task::process_emit(this));
}
}
emittable::emit() {
if (cas(&owner_thread, current_thread, NULL)) {
eventlist *list = eventlist.pick();
while (list) {
dispatch(list->data);
list = list->next;
}
}
else ASSERT(false);
}
emittable::dispatch(event *e) {
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment