たごもりすメモ

コードとかその他の話とか

2012-05-09

Fluentd v0.11 の設計案、について

このエントリは以下の記事およびFluentdの現状を受けてのものであり、Fluentdの実装についての知識を前提とします。

Fluentd v0.11 の設計案 — Gist

また言うまでもないことですが、自分の使いかただったらこうだといいなー、という程度のものであり、それ以上の意味もそれ以下の意味もありません。

では順にいってみよう。

内部ルーティングラベルの導入

実際にはこれだけでは問題は解決しなくて、forward先のFluentdで、forward元のFluentdではどこまで処理をやったメッセージなのか、ということを知る必要がある場合があるので、結局タグの操作もする必要がある。そうするとラベルの操作で解決するケースとタグの操作も行われているケースが混在することになり、状況はむしろ悪くなる。

じゃあどうするかというと悩ましいけど。じつは個人的には現行 v0.10 のタグ操作 + out_route(は勝手に入れた) であんまり困ってなかったりして。

ラベルを導入するとしたら、以下のどちらかになると嬉しい。

  • include 構文の拡張
    • include [@filtered] FILEPATH のように書くと、FILEPATH 内の設定はすべて @filtered ラベルに関するものとして扱う
include [@error] /etc/fluentd/error.conf
include [@sampled] /etc/fluentd/mongodump.conf
include [@unknowns] /etc/fluentd/mongodump.conf
  • match 構文の拡張
    • match hoge.** @filtered のように書くと @filtered ラベルのついたうちで hoge.** にマッチするものを処理する
<match accesslog.**>
 ...
</match>
<match accesslog.** @filtered>
 ...
</match>

相反する提案なのがアレだけど。

includeの拡張は、複数のラベルに対して同じ処理を適用したい場合(全部Mongoに突っ込む、など)に設定ファイルを再利用可能にするためのもの。これにより似たような設定がファイル内に大量にコピペされるのを防ぐことができる、かもしれない。そこまで必要な人がどれだけいるか分からないので、いらないかもしれない。
ただし副作用的に、ラベルつきメッセージの処理を他の設定ファイルに追い出すことで、メインの設定(デフォルトの空ラベルを処理する設定)の見通しをよくする、という効果はあるかも。メイン設定ファイルの末尾に include [@error] hoge ; include [@sampled] pos ; みたいに書いてあれば、どのようなマイナーケースについてはどのファイルを確認すればいいかも明らかになって良い。

match の拡張は設定ファイルの構文をよりシンプルに保つためのもの。 @label: という記法はありていに言ってスジが悪いと思う。ファイルをぱっと見たとき、どこからどこまでがどのラベルについての処理なのかが明らかでない。そのような構文を採用するべきではないと思う。

というわけで、上記ふたつの提案はどちらも設定ファイルをシンプルに保つためのもの。例外ケース(ラベルつきメッセージの処理)の記述量が多くなりそうなケースに対処したければ include 拡張が欲しいし、ラベルつきメッセージ処理をうんざりするほど繰り返して書く必要があまり無さそうなら match 拡張が欲しいんじゃないかと思う。
両方あっても、いちおう衝突はしないかな。ラベルつきincludeされたファイル中にラベルつきmatchが書かれてたらどうするか、の優先順位が決めてあれば、あるいは起動時エラーになれば大丈夫か。

Error#deterministic? の導入

Error#mortal? とか Error#fatal? でいいんじゃないのかな

エラーストリームの導入

ほぼ全面的に賛成

ProcessManagerの導入 / SocketManagerの導入

結局流量の多い input plugin を担当するFluentd Engineのプロセスがボトルネックになってあんまり状況が改善しないような気もする、けど、よくわからない。input pluginを並列化するところまでいかないとダメなんじゃないかと思う(httpdのpreforkみたいなモデル)。自分のところでは現状、流量の極端に多い input plugin (in_forward) とそこからのメッセージの配送を担当するEngineがボトルネックになっている印象がある。

プラグイン単位で並列化するより、Engine/input plugin のセットを並列化し input plugin へのデータの配送をロードバランスする prefork モデルの方がCPUコアに対してより良くスケールするモデルになると思う。これはSocketManagerの導入を前提にすれば作れるはず。output pluginは集約処理する可能性がある以上こういうことはできないから、これは元の設計案にある通りプロセスをまたいでルーティングするような方法をとるしかなさそう。

ここまで書いたけど、汎用の input plugin に求められることじゃないかー。たとえば in_dstat みたいなプラグインはこのような仕組みにはまったくそぐわない。
ので、データ配送(の受信側)だけプラグインの種類を分けるとか、あるいは何かプラグインごとにフラグを立てさせるとか、そういう形で扱いを分けたほうがいいかもしれない。in_forwardだけ特別扱いってのも考えたけど in_scribe とか in_flume とかもあるか。

まあ、ともあれ、プラグイン単位で別コアにするより、全メッセージを均等に各コアに割り振っていかないとCPUコアを効率よく使うのはたぶん無理だと思う。

またFluentdプロセスが細分化されると、そのそれぞれで(しかも違う増大ペースで)メモリリーク的な挙動を示したときに本当に収拾がつかなくなる。ので、MaxRequestsPerChild的な機構はおそらくほぼ必須になると思う。ないと運用はだいぶ厳しい。

到達保証の導入

おれイラネ。

……だけじゃなくて。ぱっと考えるに、Fluentdインスタンス内でのエラーについてはメッセージの改変があるから、その前後での正当性保証はたぶん非常に難しいと思う、というか無理なんじゃないかな。
現実的には out_forward から in_forward に渡るときにメッセージの件数もしくはチェックサムをとって伝達の確実性保証をするしかないんじゃないかなと思う。バッファチャンク単位とかで。そうなると forward じゃないプロトコルになりそうなので、専用のプラグインを作るしかないか。(あんまり真面目に考えてない)

個人的に欲しいもの: プラグイン向けイベントスケジューラ的な何か

プラグイン側で一定時間ごとにリアルタイム集計処理をしていたりする場合などに、現状だと別スレッドを立てるなりCool::Io あたりを自分で使うなりしないといけなくて、実装負荷も実行時負荷もあんまり馬鹿にならない気がする。
一定時間毎や一定の条件(emitされたメッセージ数など?)でプラグイン側の処理をkickしてくれる機構があるといいような気がする。そうすれば非同期処理エンジンはFluent::Engine側にひとつだけあればいい。kick条件とkickされるメソッドはプラグインの initialize (かstart)でEngineに対して登録するような形にする。

以上

こんなもんかな?

2012-04-12

俺が本当の新人向け記事の書きかたを教えてやる

  1. 突然連絡が来る
  2. あたふたしながら受けることにする
  3. 担当する特集の他の章を書く人がビッグネームでひたすらびびる
  4. おろおろしながら着手する
  5. (途中省略)
  6. なんか校了を迎える
  7. 実物を眺めるまでは油断しない
あわせて読みたい

WEB+DB PRESS Vol.68
WEB+DB PRESS Vol.68
posted with amazlet at 12.04.12
WEB+DB PRESS編集部 編
技術評論社
売り上げランキング: 12542

著者の2番目! ……なのは単に掲載順だからですね。

2012-02-14

#fluentd な今だからこそふりかえる scribed のすべて

最近 fluentd というツールのことがたいへんよく話題に上がっており、かく言う自分もささやかながら使用している身なのだが、それはそれとして比較対象に上がってくるツールに scribed というものがある。これがどういうものなのか、話には聞いていてもよくは知らないという人が多いようなので、これもささやかながら触ってみている自分としてはここらで一度まとめておかねばなるまい、と思った次第である。
日本全国に10人くらいはいるかもしれない scribed のヘビーユーザ各位に捧げる。

なお記憶と経験だけを頼りに書き殴るので、意思決定の重要な局面とかで「これこれこういうブログにたごもりすなる者がこのようなことを書き残しており」などと引用するのはくれぐれも避けていただきたい。
また途中から思いっきりビール飲みながら書いたので文章自体の品質にも問題のある可能性がある。

そも scribed とは何かというと、みんな大好き facebook が2008年だかに公開したオープンソースソフトウェア*1で、主にログの配送と収集・集約を目的とする。同じく facebook が開発した Thrift*2 というRPCプロトコルをベースにしたもので、C++で書かれている。Thrift自体が依存関係にけっこううるさい上に scribed 自体の依存関係もだいぶ激しい状態なのでインストールが極めて面倒くさいのが特徴。詳しいソフトウェアとしての特性は後に譲る。

なお上述リンクのgithubプロジェクトが公開場所なのは間違いないのだが、Commit履歴を見ればわかるとおり、現時点でもう一年半、コードはほぼ放置状態。事実上、開発は停止しているといってもいい。動くのは動くんだけど。*3
このため、既に Boost ライブラリのバージョンについて問題が起きており boost 1.45 以下と組合せてビルドしないと動作しない。なお現時点(2012年2月14日)での boost 最新版は 1.48。詳細については こちら を参照のこと。またThriftが boost 1.36 以降を要求するため、boost はこの間のどれかのものを使う必要がある。これが scribed の面倒な依存関係の一部、ということになる。
その他に Hadoop HDFS への書き込みを使う場合についても、Hadoop側は新しくなり続けているが scribed 側がついていかない、ということになると今後どこかで動作しなくなることが考えられるので注意したい。

なお scribed について知っている限りでは、ドキュメントの類は 公式の設定ファイルの解説 くらいしかない。日本語で書かれた記事となると本当にさわりの使ってみた系のものしか読んだ覚えがないので壊滅的といっていいと思う。

scribedの機能

scribed はログを配送するためのデーモンである。ここでログといったが、実際にはこれは「カテゴリ」「メッセージ」というふたつの文字列からなる。カテゴリはscribedによる配送中にその経路や最終的な書き込み先を決定するためのラベルで、メッセージは通常であればログの行そのものとなる。もちろんscribeプロトコルに乗せてログ以外のものを流すことも、やろうと思えばできる。

scribeプロトコルおよび scribed そのものの作りとしては同期的な処理をするものになっている。クライアントからメッセージの送信がある scribed に対して行われた場合、受け取った側の scribed は設定ファイルの内容に従って手元のディスクへの書き込みなり他の scribed へのネットワーク転送なりを行うが、これらが完了した時点ではじめてクライアントに対して OK というレスポンスを返す。通信は全て TCP で行われ、またネットワーク送信時のエラー制御・再送処理などもしっかり作られているため、これらのメッセージ伝達の確実性においては実はかなり秀でている。*4

受信機能

ログの受信については完全にネットワーク経由のみである。port 1463*5を listen し、そこに送られてきたメッセージはどこからのものであってもすべて平等に(混ぜて)扱う。

libeventを用いた非同期I/Oおよび設定で指定可能なマルチスレッドにより、多数のコネクションを扱うことができる。手元では150程度は1プロセスで全く問題なく捌けることを確認している。

送信機能

特定の scribed にネットワーク経由で送信できる。きわめて安定している上に速くてまったく問題なし。

書き込み機能

ローカルディスク、もしくはHDFSに書き込む機能がある。HDFSに書き込む場合はビルド時に libhdfs を有効にする必要がある。
書き込み先は設定で指定したパス(例: /path/to)の下にカテゴリ名でディレクトリを作り、その下にファイルを作成する。 /path/to/category/file_access_log_00000 のような感じ。00000の部分は連番でファイルのローテート毎に数字が増えていく。
ファイルのローテーションをサポートしており、設定した1ファイルのサイズ上限(デフォルト1GB)を超えるか、もしくは設定した期間を過ぎるか(時間毎、もしくは日毎が指定可能)で次のファイルに書き込みが切り替わる。また最新のファイルに対してシンボリックリンクを張る機能もついていてたいへん便利。

これも極めて安定しつつ高速に動作する機能で、特にファイルのローテーション回りは実運用を考えて作られている、という印象がある。複数のサーバから送られたログが単一のディスクに整然とローテートされて格納されるのは後から調査する側からすると極めて扱いやすい。

バッファリング、分散

scribed はバッファリングの機能を持っている。これは性能特性を向上させるためのものではなく、ある特定のログの配送先(例えばネットワーク送信対象の別ホストの scribed)に対する書き込みが連続的に失敗する場合、一時的にそのログを確保しておき、正規の配送先が復活したときに再送するための一時的な置き場のことだ。詳しくは このあたり を読んでいただきたい。
これは実際にうまく動作する。何度も試したから間違いない。再送パフォーマンスもよく、設定を間違えさえしなければ新しくやってくるログの受け入れに負担をかけないまま障害中の再送を完了させてまたストリーム処理モードに戻ることができる。配送経路の途中のノードを割と気軽に停止できるのは運用の手間が格段に向上するので、なくてはならない機能と言っていいと思う。

またメッセージを複数の宛先に分散する機能もある。multi と bucket だ。multi はメッセージを複数の宛先にコピーして渡す。bucket は設定した複数の宛先にどれかひとつに渡す。

multi を使うことで「ローカルディスクに書き込みつつ次の scribed にネットワーク送信する」ということが可能になる。自分の手元ではこれを用いて複数のサーバのローカルディスクにログを書き込ませることでバックアップの代わりとしている。

bucket はメッセージの先頭にユーザ名などが書かれていた場合に、それをキーに使って配送先を選ぶことができる。したがってログ全体を複数の宛先に分割しつつ、特定のユーザのログは特定の宛先にのみ行くようにする、といった機能だ。またランダムで宛先を選ぶこともできるため、単純なロードバランスにも使うことができる。(……と思っていたんだけどなあ。)

scribedの性能特性

scribed は C++ で書かれており、また前述のように libevent をベースにした非同期I/Oライブラリを使用している。これらの要素と、おそらく真剣にパフォーマンスを考えてコードが書かれているのだろう、性能が良い。きわめて良い。
最終的にピーク時で150Mbpsを超える量のログデータを1プロセスで扱っているが、これでも性能的にはまったく問題ない。4Core HTのCPUの論理CPUひとつ分(12.5%)を使いきるところに全く届いていない。その半分も使ってない感じ。まだまだ行けるだろう。先に disk I/O wait の方がきびしくなると思う。*6

scribedの問題

さて機能も豊富、性能特性もきわめて良い、というscribedだが、問題はある。いっぱいある。順にいこう。

なおインストールが面倒くさいという問題もあるにはあるが、そこはまあアレですよ、要は慣れというか。自分はもう15分あれば1台いけるね。きっとみんなも10回くらいやればそうなるよ。あと他の言語はともかく、日本語に限って言えば自分がだいぶ細かくblogエントリ書いたので、それを読めばなんとかなるんじゃないかなー。そうだといいなー。

で、問題。

枯れてない機能が(いまだに)ある

たとえば先述の bucket だが、ランダムで送り先を指定できる、とあるのでこれがロードバランサに使えるかと思ったら、もう全然使えない。ひと桁メッセージでの各ノードへの配送が100回くらい連続したかと思ったら、その次に突然40000メッセージくらいがどかっと特定ノードに送られたりする。あのな。それじゃロードバランスになってねえだろ。おまえランダムって言葉の意味知ってんのか。いいかげんにしろ。

そのほか、各出力設定の細かいオプションなど、これ本当に動くのか? と言いたいところが随所にある。設定例の最後に書いてある Thriftfile 出力とか用途もわからないし試す気にもなれない。

ドキュメントがない

先述した通り公式の設定例解説みたいなものが唯一あるだけで、それも実際にはどう書いていいものかよくわからない説明だったりすることもある。ソースコードツリーの中の example を頼りになんとか推定し、それでもわからないものは実際に書いて動かし試してみるしかない。ひどく不便。
またコードが C++ なので、いや C++ を言い訳にするのは俺の C++ が劣っているせいなので良くないのだが、ともかくどう設定すればいいのかコードを読んでもぱっとは理解できないのがつらい。自分の感覚だと試行錯誤して確認した方がまだ早くて確実だった。

その上、唯一の頼りの設定例にすら 「(may not be in open-source yet)」と書かれた設定項目があったりして憤死しそうになる。あのな。いいかげんにしろ。俺はその設定項目が欲しいんだよ。外に出して釣りしてんじゃねえ。*7

拡張性がない

プラグインとかまったく考えていない構造になっており、自分でちょっとコード書いて動作追加して動かしたいなー、と思ってもそのようなことをする余地は一切ない。いや scribed そのものにがっつり手を入れる気になればやれないことはないんだろうが、そもそもコード全体が fb303 という facebook 内部のフレームワークべったりなためかなりクセがある。fb303べったりになる覚悟があるならやりきれるかもしれないが、自分には無理である。

周辺ツールがない

scribed はあくまでデーモンとして動作する単独のソフトウェアであり、普通なら他に必要となるような周辺ツールがほとんど無い。たとえばデーモンの起動・停止・再起動・状態報告をするための init script であったり、たとえばネットワーク経由で scribed にログを送りつけるためのエージェントツールであったり。
ログを送信するためのツールとしては scribe_cat というものが申し訳程度にコードに含まれているが、これは stdin からの入力をすべて読み込み、全部一括してサーバに送る、というたいへん難儀なつくりになっており、マトモに運用に耐えられるようなものではない。

これらについて、自分は エージェントinit script も自作することでどうにか運用まで持っていったが、このような時間/手間を払える場所ばかりでもないだろうことを考えると、世間一般で使われるには難易度が高過ぎるだろう、という気がする。

根拠もほとんどない妄想みたいな想像だけど、facebookはきっとWebサーバも自分たちで書いてて、そこから直接 Thrift のコード経由でログを scribed に送り付けてるんだろうなあ。あんだけ大きけりゃそれはアリかもしれないけど、俺達にはそれはちょっと無理だし。

先がない

さっきも書いたが、開発が止まって久しい。boostもHadoopも着々とバージョンが上がっている中で scribed のコードがいつまでも対応しないと、いずれはこれらのバージョンアップに置いていかれる可能性がある。
boostは最悪いちどビルドしてしまえばそのまま置いておけるが、Hadoopについてはクラスタのバージョンを一切上げないことなどできるわけもないので、HDFS書き込みを有効にして使用している場合は特にこの制約がいつか現実的なものになるだろうと思う。

まとめ

性能的には極めて優れている。いっぽうであれこれやったりするには拡張性・メンテナンス性のうえで難がある。

以下のようなケースでは scribed を使うのは現実的で、コストパフォーマンスに優れているだろう。

  • 大規模なWebサーバ群を抱えており、それらのログを集約してディスクに書き込んでおくことでサーバ障害によるログの紛失から守り、また有事のログ検索性を上げたい

以下のようなケースにはあまり向いていないと思う。

  • そもそもサービス規模が小さく高パフォーマンスを要求されない
  • ログに対して多種・多様な処理を行ったり、細かい配送先制御を行ったりしたい
  • 最新のストレージシステムへの書き込みなどに柔軟に対応したい

特にscribedがいかんと言うわけでない。自分が面倒を見ているシステムではついに稼動中の scribed 台数は2台にまで減ってしまったが、その2台、つまりアーカイブ目的での最終的な集約とディスクへの書き込みについては scribed を使い続けるだろうと思われる。
何事も目的とメリット・デメリットである。

Flume? 誰かがそのうちこの記事と似たようなのを書いてくれるんじゃないかなー。期待したいなー。

2011-11-29

Webサーバログ転送・ストリーム処理系私案

HTTPアクセスログをHiveが読める書式への変換やその他必要なデータ変換などストリーム処理で行いつつ転送して最終的にHDFSに時間ごとに書き込むぜー、というシステムを作ってる途中なんだけど、だいたい部品が揃いつつあるところでいったんまとめて書き出してみて見落としがないかどうか考えてみるテスト。

実在のシステムとは異なる可能性があるので(特に後日これを読む人は)あまり真に受けないほうがよいです。あと解析処理自体はHadoop上でHiveでやるのが大前提で、そこにデータをもっていくまでがここに書く話です。

(12/1 考えた末、構成を変えることにしたのでエントリ後半に追記した。)

前提システム

既にscribeを使用したログ収集・配送・保管系がある。各Webサーバは scribeline を使用してログをストリーム転送する。

scribelineのprimaryサーバとして配送用サーバ、secondaryサーバとしてログ保管サーバAが指定されている。配送用サーバは受け取ったものをすべてそのままログ保管サーバAに転送するだけの設定で、ログ保管サーバAは受け取ったログデータを (1) ローカルディスクに保存する (2) ログ保管サーバBに転送する。
ログ保管サーバBは受け取ったログデータをローカルディスクに保存する。これにより常に同じログデータが2ヶ所に保管されるため、これをもってバックアップ代わりとする。

配送用サーバの障害時にはscribelineは自動的にログ保管サーバAに接続する。障害から回復したら配送用サーバに接続し直すのでここの障害は単一障害点にはならない。

ログ保管サーバAもしくはBのどちらかが障害で停止した場合、その前段のscribeサーバは転送できなかったログデータをバッファリングし、転送先が回復したときに再送する。このため短時間で復旧できる場合は単純に立ち上げなおせばよいし、バッファされたデータがローカルディスクを圧迫する程度に長時間を必要とする復旧作業が見込まれる場合、他のサーバを使用してscribedを立ち上げ、そちらに転送するよう設定を変更すればよい。

従来のHDFSへの書き込みおよびログ変換

ログ保管サーバBは受け取ったログをローカルディスクに書くのと同時にHDFSに書き込む。これはscribedの type=hdfs 設定によるもので、libhdfsを用いて行われる。この時点でログデータは何の処理も行われておらず、Webサーバが出力した生データがそのまま書き込まれる。

ファイル分割もscribedによって行われる。時間単位での分割が可能。ただしこれはscribed上における時刻に従って行われるので、ログに記録されている時刻とは数秒から最大で10秒程度のズレを生じる。つまり、ある1時間のうち最後の数秒分は次の1時間分のログファイルに書かれる。
したがって「ある1時間に関するログの解析」を行う場合はファイル単位で分割されたログを見るだけでは不十分で、対象の時間範囲のログが含まれている可能性のある他のファイル(次の1時間の分など)から必要なだけのものを拾ってくる必要がある。これは比較的コストの高い処理を必要とする。

またHDFSには生ログが書かれているためHive解析の前処理として、生ログをパースおよび変換してHiveへ入力可能な形式(タブ区切りテキスト)で出力するMapReduceジョブを実行し、その結果をHiveテーブルのデータとしてLOADする。
これは全ログデータを処理対象とする極めて重いジョブで、1時間分のログを処理するのにだいたい20分から40分程度の時間を必要としていた。Hadoopクラスタの台数が比較的少ないのが原因でもある。ここは 100% Map のみの処理であるため、クラスタ台数を増やせば線形に所要時間が減る類のものではある。

fluentdによるログのストリーム処理を含んだ構成

fluentd により期待できる機能として以下のものがある。

  • TimeSlicedOutputを継承した出力プラグインによる、ログメッセージに記録された時刻を用いての出力ファイル分け
  • out_exec_filter プラグイン(もしくはその改造版)による、ログ転送途中での変換処理の実行

前者により、ログ本文に記録された時刻を用いて、正確に時刻によるファイル分けを行うことができる。この時刻をファイル名(もしくは同時刻範囲のデータを格納するディレクトリ名)に埋めておくことで、ファイル内容を見ることなく解析対象時刻から対象データの決定を行うことが可能となる。データの移動や集積(24時間分をまとめて1日分として再編成する)などがすべてHDFS上でのファイル操作によって行えるようになるので極めて高い効率化が達成できる。

また後者の機能により、ログの変換処理をバッチ処理としてではなくストリーム処理として実現できる。ストリーム処理の出力としてHiveクエリが受け取れる形のデータを直接生成できれば、0分を迎えた瞬間にはもうその前の1時間分のデータがHiveでクエリ可能な書式でHDFS上に存在することになる。Webアクセス状況の解析をする上で現在起きていることが解析可能になるまでのギャップを縮めることには大きな意味があるため非常に価値の高い機構と言える。

今回はfluentdに頼らない機能としては以下のものがある。

  • 各Webサーバからのログの送出 (scribeline)
  • 全Webサーバからのログの受け取りおよび各所への再転送 (scribed)
  • 生ログのままのファイルとしての長期保存 (scribed)

各Webサーバからのログ送出については、単に現状サーバのほとんどがCentOS5のもので、rubyをセットアップするのが高コストだから、という理由に尽きる。あとは単純なログデータ送出に限る場合はfluentdはソフトウェアとして大き過ぎるということもある(これは特にクリティカルな理由ではないけど)。

送られてきたログの受け取りと再転送については、単純にパフォーマンスが理由。100台を超えるサーバから合計秒間40,000メッセージを超えるログが送られてきたとき、現状のfluentdでは残念ながら捌ききれない。やりようによってはやれないことはないかもしれない(し、実はそのためのオプションを最近scribelineに加えてある)が、あえてfluentdを使いたいほどの柔軟性が要求される場所でもないため、強行する利点がない。よって現状の scribed による構成のままにしてある。

長期保存用のファイルへの書き出しについても同様。scribedが現状でうまく処理できている上、アーカイブデータであればローテーションによる時間区切りのズレも特に問題にならない。なぜならそのファイルは、人間が何かあったときに見るときのためのものなので。あるいはHDFSが吹っ飛んだか何かでデータを再投入するときにも使うが、そのときも基本的には日単位での一括処理などになるため、細かいログ区切りのズレはほとんど気にする必要がない。1日分のログファイルに加えて翌日の最初のファイルを突っ込んでおくだけで済む。

fluentdノードを含む全体の構成

f:id:tagomoris:20111129190324p:image

全体像としてはこんな感じ。scribedサーバはこれまで使っていたものをそのまま使い、新しくfluentdクラスタを置く。
scribedは複数のfluentdノード(balancer)に対してscribeプロトコルでログデータを分配する。ここのノード数は現状だと性能上の問題で2ノード、障害時を考えて3ノードとするつもり。
balancerは in_scribe プラグインで受け取ったメッセージをworkerのノード群に対して out_forward プラグイン経由で送る。各balancerから全workerに対してロードバランスするような設定となり、workerはそれをin_forwardプラグインで受け取る。プロトコルは MessagePack RPC になる。

workerは in_forward から受け取ったメッセージを out_exec_filter に流し、外部のログ変換スクリプトに食わせる*1。その出力を out_hoop プラグインに渡し、時間ごとにファイルに分けつつHDFSに書き込む。

なおfluentdのout_exec_filterは1プロセスしかforkしない(よね?)ので、workerサーバのCPUコア数にあわせてfluentd+外部プロセスの組を複数立ち上げることになると思う。4Core HTのサーバだったらfluentdを4プロセスかな。まだ詳しく見てみてないのでやってみながら確認。

workerのノード数がどれくらいになるか、実はまだよくわかっていない。ただしHadoop Streamingでバッチ実行していたときに較べ、そこまでは効率は悪くならないと思う*2ので、Hadoop Streaming 9ノードで30分かかっていた処理なら4ノード程度で処理しきれるかも……。うーん、思ってたより必要だな。もうちょっと少なくなると嬉しいんだけど。まあそのへんは試しながら確認する。balancerノード上でworker用のfluentdプロセスも上げようかなあ。

Hoopサーバは1台あれば十分なのはベンチマークとったときにわかっている。NameNodeと共存でいいかな。

性能の話

balancerの性能が足りなくなった場合、1台足してscribedの配送サーバのバランス先に加えてやればいい。ただしscribedの設定変更は再起動を伴うので、実施したときに逃げていくscribelineからのコネクションをどうするかは考えてやる必要がある。配送サーバの再起動が終わってからscribelineのsecondaryに指定してあるログ集積サーバAの側もscribedを再起動してやれば配送サーバ側に接続が戻るんで、そこまで大きな問題でもないんだけど。
scribedの配送サーバも2台構成にしてやれば実はこのへんが超ラクになるんだけど、もちろん1台分コストが増えるのでなんだかなー、という感じ。そこまでやるかな的な。

workerの性能が足りなくなった場合、やはりworkerを足した上で、balancer用のfluentdの設定にバランス先を足して再起動、となる。これはbalancerの各ノードをひとつずつ順番にやればいい。ノード再起動に伴う転送データの再送などはscribedがやってくれる。

耐障害性の話

scribedノードはすべて再送機構を備えている。fluentdのin/out_forwardはそういった機構を備えていないが、Webアクセスログ解析はその特性上シビアなデータ保持が求められることはあまりないので、ノード障害時にそのノードが持っていたデータは失われるものの、大した量でもないから問題でもない。*3

ノード欠損自体は、scribed, fluentdともに障害ノードを転送先から外してくれるので大きな問題ではないし、復旧時の復帰も問題ないはず。1台壊れたときにキャパシティが足りなくなってカタストロフが起きないようにだけしておけばいい。

さて

このエントリをあらためて眺めて問題がないかどうか考える。なかったら実際に組み上げてみてテストだ。

やっぱり変えることにした(12/1追記)

変更後の構成はこんな感じ。
f:id:tagomoris:20111201153339p:image

変更点となぜそうすることにしたかを列挙する。

fluentdによるbalancerをやめてscribed deliverにバランスさせることにした

fluentdをbalancerにしてたのはオンラインでのバランス先変更をやる(やれる)と思ったときにそうしようと考えてたんだけど、現状ではそれはできない。……ことがわかったあとも構成をひきずってた。考えなおしてみたらあまり意味がなかった。

scribedでバランスする場合はバランス先の変更時にscribedを再起動すると scribeline の接続先がいちど変わってしまうのが問題だったんだけど、それは balancer がなくなって浮いた分のマシンを回して deliver を2台構成にすることで回避する。
全く同じ設定で deliver を2台上げておく(scribelineの接続先をその2台にしておく)ことで、設定変更・再起動を順番にやればログの取りこぼしも混乱もなくバランス先の変更ができる。実はこっちの方がバランス先変更時の手間も問題も少なく、しかもシンプルな構成にできた。
SPOFもなくなり、どこか1台のマシンが死んだからといってストリーム処理系にデータが流れなくなることも無くなるので、障害に対するデータ再生成バッチの活躍回数もぐっと減るから運用の負担も減る。ぐっど。

scribed関連で気にしないといけないのは SENDING_BUFFER 時のデータ大量転送モード*4で、このときは超巨大なThriftメッセージが飛び交う。普通にやるとこれがひとつのfluentd workerに行くので効率悪くて危険、分散したければ fluentd balancer を入れるしかない。
んだけど、変更後の構成だとバランサになる scribed の前にはもうscribedがいないので、SENDING_BUFFER状態を受ける側にはなりようがない。このため今回の構成ではこれを考えなくて済む。(逆に言うとバランサになるscribedの前にscribedがいる場合はこの問題を考える必要がある。)

ボトルネックはどこだ

workerはいくらでも増やせる。Hoopは前にとったベンチによるとよっぽどのことがない限り大丈夫。となるとあとは scribed deliver が問題になる。

ただし、scribed は起動時にスレッド数を指定できる。ディスクI/Oが問題になる場所(アーカイブサーバ)はストリーム処理側とは切り離されているから、そっち側で多少書き込みが待たされることがあっても解析側には問題がない。……とはいえそれでもボトルネックになる可能性は、あるにはある。
基本的には、scribelineから接続する先を分散させればいい。1サーバに複数プロセス立てたり複数のサーバを並列で使ったりする*5。scribelineを改造して、設定ファイルに並べて書いておいた接続先からランダムにひとつ選んで繋ぐようにしてやればいい。

……が、現状ではまずいらなそうなので、とりあえず scribed は master - slave でいいや。必要になったら考えようかなというところ。