We made it possible. Next, we'll make it beautiful.
Suffering-oriented programming
- 柔軟性向上
- 予想以上に複雑な使い方をしているケースが多かったので、設定ファイルも複雑化して対応する
- remove_tag_prefix とか add_tag_prefix の乱立をなんとかする
- 信頼性向上
- リトライしてもしょうがないエラーと、そうでないエラーを区別する
- エラー処理を非同期化する
- listenしているソケットを閉じずに再起動できるようにする
- 到達保証(at-least-once)をサポート
- 性能強化
- マルチコア環境下でのスケーラビリティを向上
- 互換性維持
- これ重要
ログを加工するときは、out_exec_filterなどのプラグインで加工した後のログを、もう一度別のプラグインにemitして保存したり転送したりすることになる。ここで再emitするときに、tagを変えないと、また加工用のプラグインにマッチしてしまう。そこで、フィルタリング系のプラグインには、add_tag_prefix といった設定オプションがある。
ところが、ストレージにtagも保存しておきたいときに問題が発生する。tagにはFluentdで処理する都合上付けられたprefixが付いる。この邪魔なprefixを消すために、output pluginには remove_tag_prefix といった設定オプションがあったりする。
このように、Fluentd内部でルーティングに使用するtag(意味の無いtag)と、外部から受け取って/読み取ってストレージに保存するtag(意味のあるtag)が、分離されていない。
Fluentd内部のマッチ処理にのみ使用する「内部ルーティングラベル」を導入する。
ログの構造が変わる:
- インタフェースとしては(Fluentdにログを投げたり、Fluentdから書き出されたログを扱うユーザーの視点では)従来通り tag, time, record の3要素からなるが、
- Fluentd内部では(Fluentdの設定ファイルを書くオペレータの視点では)label, tag, time, record の4要素とする。
デフォルトではlabelは空文字列とする。
こんな感じ:
<source>
type forward
</source>
# 加工したら@filteredラベルを付けて再emitする
<match access.**>
type exec_filter
...
label @filtered
</match>
<match error.**>
type exec_filter
...
label @filtered
</match>
# 加工されたログはここにくる
@filtered:
<match **>
type mongo
</match>
従来の設定ファイルとの互換性も保たれる。複雑なフィルタリング処理をしたい人だけが、labelを使用すれば良い。
Fluentd内部で複雑な処理ができるように、label付けを行ったり、ログをコピーしたりするプラグインを導入する。設定ファイルはこんな感じ:
<source>
type forward
</source>
# 入ってきたデータは、とりあえず全部routeプラグインに渡す
<match **>
type route
# 全ログを@archiveにコピー
<match **>
label @archive
copy
</match>
# production.** を @alert にコピー
<match production.**>
label @alert
copy
</match>
# **.events.** を @events にルーティング
<match **.events.**>
label @events
</match>
# **.error.** を @error にルーティング
<match **.error.**>
label @error
</match>
</match>
@archive:
...
@alert:
...
@events:
...
@error:
...
こっちに追記:Fluentd v11 内部ルーティングラベルの実装案あれこれ
Fluentdは、ログの書き出しに失敗すると自動的にリトライをする。しかし、リトライしても絶対に成功しない種類のエラーだった場合(例えば、ログの構造が間違っていて処理できないケース)、タイムアウトするまでリトライをし続ける。
リトライをし続けている間、キューが詰まってしまうので、他の正常なログが破棄されてしまう可能性がある。
リトライしても必ず失敗する例外は、Error#deterministic? で true を返す。この場合はリトライをしないで、すぐにログをエラーストリーム(後述)に流す。
リトライすると成功する可能性がある例外は、deterministic? で false を返す。この場合はリトライをして、タイムアウトしたらエラーストリーム(後述)に流す。
IOError#deterministic? と Errno::XXX#deterministic? は、false を返すようにする。NoMethodError や EncodingError などは、true を返す。
ただし、IOError#deterministic? が、本当にリトライしたら成功する種類のエラーかどうかは分からない(例えば「ファイルがなかった」という例外が投げられたとき、リトライしたら成功するのかは、実装に依存する)。このため、ちゃんと処理するには、プラグイン開発者は Error#deterministic? (や例外クラス)をまじめに実装する必要がある。
deterministic? のメソッド名が分かりにくい。
ログは、基本的に「投げっぱなし」にするもので、ログを投げたところエラーだったところで、どうしようも無いことが多い。ほとんど捨てるしかない。言い換えれば、Fluentdのようなログ処理システムでは、エラーを同期的に処理することはできない。
1回のEngine#emitの呼び出しで、複数のログをemitすることができる。この仕組みは、性能の最適化のために使われている(例えばin_forward)。
しかしこの仕組みは、「そのすべてのログの処理が成功するか、すべてのログの処理が失敗する」ように設計してある。このため、1回のemitの中に、異常な(だが無視しても良い)ログが1つでも混じっていた場合、他のすべての正常なログが破棄されてしまう。
ログの処理中にエラーが発生しても、例外は投げない。その代わりに、別のlabelを付けてemitする(別のプラグインにemitする)。
<source>
type forward
error_label @error
</source>
<match **>
type my_validator
label @validated
</match>
@error:
# emitできなかったログはこっちに来る
<match ...>
...
または、
<source>
type forward
</source>
<match **>
type forward
...
error_label @error
</match>
@error:
# writeできなかったログはこっちに来る
<match ...>
...
Fluentdコアの設計が複雑化する。しょうがない気もする。
現状、プラグインでDetachMultiProcessMixinを使うと、そのプラグインを別プロセスで動作させて、並列性が向上させることができる(下図)。
Fluentd v10 のプロセス構成:
Detached Input plugin
+-----------------------+
| |
| Input plugin |
| | |
| +--> Virtual Engine |
| | |
+------------------------------+ +----------|------------+
| | |
| Engine <----------------------<------------+
| | ^ |
+------------+ | | | | Detached Output plugin
| Supervisor | -- | | +- Normal Input plugin | +-----------------------+
+------------+ | +---> Normal Output plugin | | |
| +---> Virtual output plugin -----> Output plugin |
| +---> ... | | |
+------------------------------+ +-----------------------+
しかし、この図から分かるように、Engineプロセスにはすべてのデータが流れる。RubyのCPU処理性能はマルチコアでスケールしないので、Engineプロセスがボトルネックになって性能が伸びない。
Fluentd v11 のプロセス構成案:
_
+-------------------------------+
| |
| Engine |
| | ^ |
| | | |
| | +--- Input plugin |
| +----> Virtual output plugin |
| | | |
+----------|-------|------------+
+----------------+ | (1) |
| ProcessManager <-----------------+ | (2)
+----------------+ |
+---------------------+
| +-------------------------------+
| | |
+--> Engine |
| | |
| +----> output plugin |
| |
+-------------------------------+
- Input pluginは、同一プロセス内のEngineにデータをemitする
- Engineは、マッチしたOutput pluginがどのプロセスで動いているかを判別する。 a. もし同一プロセスであれば、単にそのプラグインにemitする。 b. もし別プロセスであれば:
- ProcessManager に問い合わせて、宛先のプロセスに繋がっているファイルディスクリプタをもらう (1)
- 宛先のプロセスに直接データを送る (2)
普通は1つのプラグインを1つのプロセスに割り当てて実行するが、とても負荷が高いプラグインについては、複数のプロセスに割り当て、並列動作させる。
実装が大変!子プロセスが不意に死んだ場合の処理や、シグナルハンドラの処理など、考慮すべきことが多い。
Fluentdプロセスを再起動すると、プラグインがlistenしているTCPソケットが閉じられてしまう。このため、再起動後に再listenされるまでの間、新しいコネクションを受け付けられなくなってしまう。
ここで、プラグインのshutdownはgracefulに行われるので、再起動時に処理途中のデータが失われることは無いが、その反面時間がかかる。このため、再listenされるまでの時間は長くなりがちで、無視できない。
ソケットは、必ずProcessManagerプロセスでlistenし、そのファイルディスクリプタを子プロセスに転送する。プラグインでは次のようなAPIを使う:
sock = Engine.listen(address, port) #=> #<TCPServer>
sock = Engine.listen_unix(path) #=> #<UNIXServer>
SocketManagerは、同じ addressport でlisten要求が来たら、以前にlistenしたファイルディスクリプタをそのまま返す。
ファイルディスクリプタの転送には、UNIXドメインソケットを使うが、JRubyで使えるかどうか分からない。Windowsで動く必要は無さそうだが、JRubyでは動いた方が良さそう。
現在のFluentdは、ログが重複しないようにするat-most-onceのサポートしている。このトレードオフとして、サーバが故障した場合などにログが失われることがある。
ところが、ログが重複してもいいから、何が何でも確実に1回はログが転送されて欲しいケース、つまりat-least-onceをサポートして欲しいケースも存在する。
実現方法を思いついていない。
at-least-onceをサポートするメリットが、at-least-onceをサポートすることによって複雑化するデメリットを上回るのか謎。
シンプルな実装方法が求められている。バッファプラグインだけで対応できないだろうか。
なるほど。ただ「DeterministicError クラスを継承しているクラスと、そうでないクラス」という分け方をすると、組み込みのIOErrorを deterministic なエラーに変更することができなくなってしまうので、困ります。
「module DeterministicError を include しているクラスと、そうでないクラス」ならOK。こちらの方がきれいな実装ではありますね。
スケールしないのはEngineそのものと言うよりは、バッファリング処理です。ログの書き出し処理はバッファリングされたチャンクに対して1回走るのに対し、バッファリング処理はメッセージ1つ1つに対して走るので、圧倒的に負荷が高い。特に排他制御のオーバーヘッドが大きいようです(input pluginはマルチスレッドで動作しているので、バッファリングする際に排他制御が必要)。シリアライズ(とbuf_fileの場合はI/O)も重い。
各プロセスへのプラグインの割り当て方を工夫すると、プロセス間通信を一切発生させずにスケールさせることができるはずです。
一番簡単な方法は、全プロセスそれぞれで全プラグイン実行する方法です。この場合、emitは確実に同一プロセス内のoutput pluginに対して行われるので、プロセス間通信が発生しない。