We made it possible. Next, we'll make it beautiful.
Suffering-oriented programming
- 柔軟性向上
- 予想以上に複雑な使い方をしているケースが多かったので、設定ファイルも複雑化して対応する
- remove_tag_prefix とか add_tag_prefix の乱立をなんとかする
- 信頼性向上
- リトライしてもしょうがないエラーと、そうでないエラーを区別する
- エラー処理を非同期化する
- listenしているソケットを閉じずに再起動できるようにする
- 到達保証(at-least-once)をサポート
- 性能強化
- マルチコア環境下でのスケーラビリティを向上
- 互換性維持
- これ重要
ログを加工するときは、out_exec_filterなどのプラグインで加工した後のログを、もう一度別のプラグインにemitして保存したり転送したりすることになる。ここで再emitするときに、tagを変えないと、また加工用のプラグインにマッチしてしまう。そこで、フィルタリング系のプラグインには、add_tag_prefix といった設定オプションがある。
ところが、ストレージにtagも保存しておきたいときに問題が発生する。tagにはFluentdで処理する都合上付けられたprefixが付いる。この邪魔なprefixを消すために、output pluginには remove_tag_prefix といった設定オプションがあったりする。
このように、Fluentd内部でルーティングに使用するtag(意味の無いtag)と、外部から受け取って/読み取ってストレージに保存するtag(意味のあるtag)が、分離されていない。
Fluentd内部のマッチ処理にのみ使用する「内部ルーティングラベル」を導入する。
ログの構造が変わる:
- インタフェースとしては(Fluentdにログを投げたり、Fluentdから書き出されたログを扱うユーザーの視点では)従来通り tag, time, record の3要素からなるが、
- Fluentd内部では(Fluentdの設定ファイルを書くオペレータの視点では)label, tag, time, record の4要素とする。
デフォルトではlabelは空文字列とする。
こんな感じ:
<source>
type forward
</source>
# 加工したら@filteredラベルを付けて再emitする
<match access.**>
type exec_filter
...
label @filtered
</match>
<match error.**>
type exec_filter
...
label @filtered
</match>
# 加工されたログはここにくる
@filtered:
<match **>
type mongo
</match>
従来の設定ファイルとの互換性も保たれる。複雑なフィルタリング処理をしたい人だけが、labelを使用すれば良い。
Fluentd内部で複雑な処理ができるように、label付けを行ったり、ログをコピーしたりするプラグインを導入する。設定ファイルはこんな感じ:
<source>
type forward
</source>
# 入ってきたデータは、とりあえず全部routeプラグインに渡す
<match **>
type route
# 全ログを@archiveにコピー
<match **>
label @archive
copy
</match>
# production.** を @alert にコピー
<match production.**>
label @alert
copy
</match>
# **.events.** を @events にルーティング
<match **.events.**>
label @events
</match>
# **.error.** を @error にルーティング
<match **.error.**>
label @error
</match>
</match>
@archive:
...
@alert:
...
@events:
...
@error:
...
こっちに追記:Fluentd v11 内部ルーティングラベルの実装案あれこれ
Fluentdは、ログの書き出しに失敗すると自動的にリトライをする。しかし、リトライしても絶対に成功しない種類のエラーだった場合(例えば、ログの構造が間違っていて処理できないケース)、タイムアウトするまでリトライをし続ける。
リトライをし続けている間、キューが詰まってしまうので、他の正常なログが破棄されてしまう可能性がある。
リトライしても必ず失敗する例外は、Error#deterministic? で true を返す。この場合はリトライをしないで、すぐにログをエラーストリーム(後述)に流す。
リトライすると成功する可能性がある例外は、deterministic? で false を返す。この場合はリトライをして、タイムアウトしたらエラーストリーム(後述)に流す。
IOError#deterministic? と Errno::XXX#deterministic? は、false を返すようにする。NoMethodError や EncodingError などは、true を返す。
ただし、IOError#deterministic? が、本当にリトライしたら成功する種類のエラーかどうかは分からない(例えば「ファイルがなかった」という例外が投げられたとき、リトライしたら成功するのかは、実装に依存する)。このため、ちゃんと処理するには、プラグイン開発者は Error#deterministic? (や例外クラス)をまじめに実装する必要がある。
deterministic? のメソッド名が分かりにくい。
ログは、基本的に「投げっぱなし」にするもので、ログを投げたところエラーだったところで、どうしようも無いことが多い。ほとんど捨てるしかない。言い換えれば、Fluentdのようなログ処理システムでは、エラーを同期的に処理することはできない。
1回のEngine#emitの呼び出しで、複数のログをemitすることができる。この仕組みは、性能の最適化のために使われている(例えばin_forward)。
しかしこの仕組みは、「そのすべてのログの処理が成功するか、すべてのログの処理が失敗する」ように設計してある。このため、1回のemitの中に、異常な(だが無視しても良い)ログが1つでも混じっていた場合、他のすべての正常なログが破棄されてしまう。
ログの処理中にエラーが発生しても、例外は投げない。その代わりに、別のlabelを付けてemitする(別のプラグインにemitする)。
<source>
type forward
error_label @error
</source>
<match **>
type my_validator
label @validated
</match>
@error:
# emitできなかったログはこっちに来る
<match ...>
...
または、
<source>
type forward
</source>
<match **>
type forward
...
error_label @error
</match>
@error:
# writeできなかったログはこっちに来る
<match ...>
...
Fluentdコアの設計が複雑化する。しょうがない気もする。
現状、プラグインでDetachMultiProcessMixinを使うと、そのプラグインを別プロセスで動作させて、並列性が向上させることができる(下図)。
Fluentd v10 のプロセス構成:
Detached Input plugin
+-----------------------+
| |
| Input plugin |
| | |
| +--> Virtual Engine |
| | |
+------------------------------+ +----------|------------+
| | |
| Engine <----------------------<------------+
| | ^ |
+------------+ | | | | Detached Output plugin
| Supervisor | -- | | +- Normal Input plugin | +-----------------------+
+------------+ | +---> Normal Output plugin | | |
| +---> Virtual output plugin -----> Output plugin |
| +---> ... | | |
+------------------------------+ +-----------------------+
しかし、この図から分かるように、Engineプロセスにはすべてのデータが流れる。RubyのCPU処理性能はマルチコアでスケールしないので、Engineプロセスがボトルネックになって性能が伸びない。
Fluentd v11 のプロセス構成案:
_
+-------------------------------+
| |
| Engine |
| | ^ |
| | | |
| | +--- Input plugin |
| +----> Virtual output plugin |
| | | |
+----------|-------|------------+
+----------------+ | (1) |
| ProcessManager <-----------------+ | (2)
+----------------+ |
+---------------------+
| +-------------------------------+
| | |
+--> Engine |
| | |
| +----> output plugin |
| |
+-------------------------------+
- Input pluginは、同一プロセス内のEngineにデータをemitする
- Engineは、マッチしたOutput pluginがどのプロセスで動いているかを判別する。 a. もし同一プロセスであれば、単にそのプラグインにemitする。 b. もし別プロセスであれば:
- ProcessManager に問い合わせて、宛先のプロセスに繋がっているファイルディスクリプタをもらう (1)
- 宛先のプロセスに直接データを送る (2)
普通は1つのプラグインを1つのプロセスに割り当てて実行するが、とても負荷が高いプラグインについては、複数のプロセスに割り当て、並列動作させる。
実装が大変!子プロセスが不意に死んだ場合の処理や、シグナルハンドラの処理など、考慮すべきことが多い。
Fluentdプロセスを再起動すると、プラグインがlistenしているTCPソケットが閉じられてしまう。このため、再起動後に再listenされるまでの間、新しいコネクションを受け付けられなくなってしまう。
ここで、プラグインのshutdownはgracefulに行われるので、再起動時に処理途中のデータが失われることは無いが、その反面時間がかかる。このため、再listenされるまでの時間は長くなりがちで、無視できない。
ソケットは、必ずProcessManagerプロセスでlistenし、そのファイルディスクリプタを子プロセスに転送する。プラグインでは次のようなAPIを使う:
sock = Engine.listen(address, port) #=> #<TCPServer>
sock = Engine.listen_unix(path) #=> #<UNIXServer>
SocketManagerは、同じ addressport でlisten要求が来たら、以前にlistenしたファイルディスクリプタをそのまま返す。
ファイルディスクリプタの転送には、UNIXドメインソケットを使うが、JRubyで使えるかどうか分からない。Windowsで動く必要は無さそうだが、JRubyでは動いた方が良さそう。
現在のFluentdは、ログが重複しないようにするat-most-onceのサポートしている。このトレードオフとして、サーバが故障した場合などにログが失われることがある。
ところが、ログが重複してもいいから、何が何でも確実に1回はログが転送されて欲しいケース、つまりat-least-onceをサポートして欲しいケースも存在する。
実現方法を思いついていない。
at-least-onceをサポートするメリットが、at-least-onceをサポートすることによって複雑化するデメリットを上回るのか謎。
シンプルな実装方法が求められている。バッファプラグインだけで対応できないだろうか。
Error#deterministic? については、メソッドを追加するよりも例外を分けてしまったほうが良い気がします。
プロセス構成については、 Engine.emit がボトルネックになるようなケースはパックされてないメッセージが大量に
発生して純粋にメソッドの呼び出し回数が増える以外に思いつかないので、本当にEngineをスケールする必要が
あるならInputからマルチプロセス化してプロセス間通信は最低限にする設計がいいと思います。