リポジトリ:https://github.com/fluent/fluentd/tree/v11
https://gist.github.com/3774036 の続き。
Fluentd v11 のプロセスモデルは Supervisor → Server → ProcessManager という3階層。 起動時に --disable-supervisor オプションを付けると Supervisor が省略され、Server → ProcessManager で動作する。
-
Supervisor: 子プロセスを立ち上げて Server を実行するクラス。もし子プロセスが不意に終了したら自動的に再起動する。fork(2) が使える環境で無いと動かない。コードは実装済み。実運用している別プロジェクト(PerfectQueue)から持ってきたコードがほとんど。
-
Server: Fluentdコアを実行するプロセス。エラー処理が未精査である点を除いては、実装済み。
-
ProcessManager:出力プラグインや入力プラグインなどの Agent の起動と終了を制御するモジュール。実行環境に合わせて、MultiprocessManager、SingleprocessManager、SimpleProcessManager の3つの実装がある。
-
SocketManager: MultiprocessManager や SingleprocessManager で、ソケットの listen を親プロセス(Server)に委譲して実行させるためのクラス。子プロセスが死んでも listen したソケットは close されないので、listen したソケットを閉じずに live restart ができるようになる。実装自体は完了しているが、プラグインから機能を呼び出すためのAPIがまだ無いので、どこかに追加が必要。
-
Agent: 自律的に動作するモジュール。Fluentd コアのアーキテクチャを一言で表すと、ProcessManager に Agent をたくさん乗っけて動かし、後はメッセージのルーティングだけ世話するプラットフォーム。
-
AgentGroup: 複数の Agent を所有するモジュール。AgentGroup を include しておくと、ProcessManager がそれを認識して、所有している子Agentの実行も面倒を見てくれる。
-
ProcessManager の実装:
- MultiprocessManager: 複数の子プロセスに Agent を割り当てて実行する実装。子プロセスは定期的には定期的に heartbeat を送り、不意に終了したら自動的に再起動する機能を持つ。アーキテクチャは複雑だが、別プロジェクト(PerfectQueue)で一度実装したコードをリファクタリングしたものなので、汚くは無い。
- SingleprocessManager: MultiprocessManager とほぼ同じだが、起動する子プロセスを1個に制限した実装。MultiprocessManager は、別の子プロセスで動作している Agent 同士がメソッドを呼び合うために UNIX ドメインソケットを使用するが、宛先のプロセスが死んでいると通信が失敗する可能性がある。例えば入力プラグインと出力プラグインを別のプロセスで動かしていると、バッファへの追記が失敗するケースがあり得る。このように性能向上を図れる一方でトレードオフがあるので、デフォルトは SingleprocessManager にする予定。
- SimpleProcessManager: 子プロセスを起動しないで、Server と同じプロセスですべての Agent を実行する実装。live restart ができないが、fork(2) が使えない環境でも動作する。
コードを読むなら Server → Engine → ProcessManager が順路。
- Collector: メッセージを受け取るモジュールが実装するべきインタフェース。出力プラグインは、Collector を実装した Agent。Collector#open は Writer を返し、Writer#append や Writer#write で実際にメッセージを書き込む。
- StreamSource: 入力プラグイン。 MessageBus を1つ参照する。入力プラグインは、StreamSource を実装した Agent。それぞれの入力プラグインは、自身が参照している MessageBus にメッセージを送ると、Fluentd コアが適切した Collector にルーティングしてくれる。
- MessageBus: メッセージを受け取って、適切な Collector にルーティングするモジュール。設定ファイルの木構造を反映した階層構造が組まれ(MessageBus#parent_bus で親を辿れる)、根には MessageBus を拡張した LabeledMessageBus が来る。LabeledMessageBus は、label で識別される複数のルーティングテーブルを持つ。別のルーティングテーブルを使ってメッセージを送るには open_label メソッドを、現在のルーティングテーブルを使ってメッセージを送るには open メソッドを使う。MessageBus は AgentGroup である。
- ErrorStreamCollector: 設定ファイルの <error> セクションを扱うモジュール。<error> は入力プラグインの中に書く = StreamSource から利用され、宛先の Agent にメッセージを送信中に例外が発生したら別の Agent にメッセージを送る。
コードを読むなら MessageBus#open → AgentGroup#configure_agent → StreamSource → ErrorStreamCollector が順路。
- Configurable: configure メソッドを提供し、設定ファイルをメンバ変数に格納するクラス。必要なら型変換やパースも行う。v10 からコピーしてきたコードが入っているが、型変換の部分に拡張性を持たせたいところ。
- Config::Parser: 変数展開用の ruby_context が未実装。
- Config::ValueParser: リテラルパーサ。ELEMENT_ARG_STRING_CHARSET と NONQUOTED_STRING_CHARSET に少々迷いが見られるが、たぶん初版リリース後に考える。
- Plugin: できたが、Plugin がグローバルな定数になっているなど、あまり美しくは無い。
- BufferedOutput: <secondary> を含めて実装完了。
- TimeSlicedなんとか: 未実装。
- Mixin系: 未実装。
- テストフレームワーク: ぜんぜん未着手!! rspec でいく予定。誰か…
- OutputBackwardCompatWrapper: 新Engineが旧Outputプラグインがにemitするためのラッパ。動いている。
- OutputForwardCompatWrapper: 旧プラグイン群が新Outputプラグインをインスタンス化して利用するするためのラッパ。動いている。
- InputBackwardCompatWrapper: 新Engineが旧Inputプラグインをインスタンス化して利用するためのラッパ。動いている。
- InputForwardCompatWrapper: 旧Inputプラグインが新Engineにemitするためのラッパ。動いている。
- Engineの初期化: Cool.io のデフォルトループ以外は動いている。デフォルトループはどこで走らせるか迷う。
- ロガーを付けた。conf.logger で取れる。たぶん Configurable#configure で @log に代入してやると使いやすい。
- dRuby によるデバッグインタフェースは v10 に付けたので持ってくる。
- fluentd コマンドを入れた。デーモン化にも対応。