Webサーバログ転送・ストリーム処理系私案
HTTPアクセスログをHiveが読める書式への変換やその他必要なデータ変換などストリーム処理で行いつつ転送して最終的にHDFSに時間ごとに書き込むぜー、というシステムを作ってる途中なんだけど、だいたい部品が揃いつつあるところでいったんまとめて書き出してみて見落としがないかどうか考えてみるテスト。
実在のシステムとは異なる可能性があるので(特に後日これを読む人は)あまり真に受けないほうがよいです。あと解析処理自体はHadoop上でHiveでやるのが大前提で、そこにデータをもっていくまでがここに書く話です。
(12/1 考えた末、構成を変えることにしたのでエントリ後半に追記した。)
前提システム
既にscribeを使用したログ収集・配送・保管系がある。各Webサーバは scribeline を使用してログをストリーム転送する。
scribelineのprimaryサーバとして配送用サーバ、secondaryサーバとしてログ保管サーバAが指定されている。配送用サーバは受け取ったものをすべてそのままログ保管サーバAに転送するだけの設定で、ログ保管サーバAは受け取ったログデータを (1) ローカルディスクに保存する (2) ログ保管サーバBに転送する。
ログ保管サーバBは受け取ったログデータをローカルディスクに保存する。これにより常に同じログデータが2ヶ所に保管されるため、これをもってバックアップ代わりとする。
配送用サーバの障害時にはscribelineは自動的にログ保管サーバAに接続する。障害から回復したら配送用サーバに接続し直すのでここの障害は単一障害点にはならない。
ログ保管サーバAもしくはBのどちらかが障害で停止した場合、その前段のscribeサーバは転送できなかったログデータをバッファリングし、転送先が回復したときに再送する。このため短時間で復旧できる場合は単純に立ち上げなおせばよいし、バッファされたデータがローカルディスクを圧迫する程度に長時間を必要とする復旧作業が見込まれる場合、他のサーバを使用してscribedを立ち上げ、そちらに転送するよう設定を変更すればよい。
従来のHDFSへの書き込みおよびログ変換
ログ保管サーバBは受け取ったログをローカルディスクに書くのと同時にHDFSに書き込む。これはscribedの type=hdfs 設定によるもので、libhdfsを用いて行われる。この時点でログデータは何の処理も行われておらず、Webサーバが出力した生データがそのまま書き込まれる。
ファイル分割もscribedによって行われる。時間単位での分割が可能。ただしこれはscribed上における時刻に従って行われるので、ログに記録されている時刻とは数秒から最大で10秒程度のズレを生じる。つまり、ある1時間のうち最後の数秒分は次の1時間分のログファイルに書かれる。
したがって「ある1時間に関するログの解析」を行う場合はファイル単位で分割されたログを見るだけでは不十分で、対象の時間範囲のログが含まれている可能性のある他のファイル(次の1時間の分など)から必要なだけのものを拾ってくる必要がある。これは比較的コストの高い処理を必要とする。
またHDFSには生ログが書かれているためHive解析の前処理として、生ログをパースおよび変換してHiveへ入力可能な形式(タブ区切りテキスト)で出力するMapReduceジョブを実行し、その結果をHiveテーブルのデータとしてLOADする。
これは全ログデータを処理対象とする極めて重いジョブで、1時間分のログを処理するのにだいたい20分から40分程度の時間を必要としていた。Hadoopクラスタの台数が比較的少ないのが原因でもある。ここは 100% Map のみの処理であるため、クラスタ台数を増やせば線形に所要時間が減る類のものではある。
fluentdによるログのストリーム処理を含んだ構成
fluentd により期待できる機能として以下のものがある。
- TimeSlicedOutputを継承した出力プラグインによる、ログメッセージに記録された時刻を用いての出力ファイル分け
- out_exec_filter プラグイン(もしくはその改造版)による、ログ転送途中での変換処理の実行
前者により、ログ本文に記録された時刻を用いて、正確に時刻によるファイル分けを行うことができる。この時刻をファイル名(もしくは同時刻範囲のデータを格納するディレクトリ名)に埋めておくことで、ファイル内容を見ることなく解析対象時刻から対象データの決定を行うことが可能となる。データの移動や集積(24時間分をまとめて1日分として再編成する)などがすべてHDFS上でのファイル操作によって行えるようになるので極めて高い効率化が達成できる。
また後者の機能により、ログの変換処理をバッチ処理としてではなくストリーム処理として実現できる。ストリーム処理の出力としてHiveクエリが受け取れる形のデータを直接生成できれば、0分を迎えた瞬間にはもうその前の1時間分のデータがHiveでクエリ可能な書式でHDFS上に存在することになる。Webアクセス状況の解析をする上で現在起きていることが解析可能になるまでのギャップを縮めることには大きな意味があるため非常に価値の高い機構と言える。
今回はfluentdに頼らない機能としては以下のものがある。
- 各Webサーバからのログの送出 (scribeline)
- 全Webサーバからのログの受け取りおよび各所への再転送 (scribed)
- 生ログのままのファイルとしての長期保存 (scribed)
各Webサーバからのログ送出については、単に現状サーバのほとんどがCentOS5のもので、rubyをセットアップするのが高コストだから、という理由に尽きる。あとは単純なログデータ送出に限る場合はfluentdはソフトウェアとして大き過ぎるということもある(これは特にクリティカルな理由ではないけど)。
送られてきたログの受け取りと再転送については、単純にパフォーマンスが理由。100台を超えるサーバから合計秒間40,000メッセージを超えるログが送られてきたとき、現状のfluentdでは残念ながら捌ききれない。やりようによってはやれないことはないかもしれない(し、実はそのためのオプションを最近scribelineに加えてある)が、あえてfluentdを使いたいほどの柔軟性が要求される場所でもないため、強行する利点がない。よって現状の scribed による構成のままにしてある。
長期保存用のファイルへの書き出しについても同様。scribedが現状でうまく処理できている上、アーカイブデータであればローテーションによる時間区切りのズレも特に問題にならない。なぜならそのファイルは、人間が何かあったときに見るときのためのものなので。あるいはHDFSが吹っ飛んだか何かでデータを再投入するときにも使うが、そのときも基本的には日単位での一括処理などになるため、細かいログ区切りのズレはほとんど気にする必要がない。1日分のログファイルに加えて翌日の最初のファイルを突っ込んでおくだけで済む。
fluentdノードを含む全体の構成
全体像としてはこんな感じ。scribedサーバはこれまで使っていたものをそのまま使い、新しくfluentdクラスタを置く。
scribedは複数のfluentdノード(balancer)に対してscribeプロトコルでログデータを分配する。ここのノード数は現状だと性能上の問題で2ノード、障害時を考えて3ノードとするつもり。
balancerは in_scribe プラグインで受け取ったメッセージをworkerのノード群に対して out_forward プラグイン経由で送る。各balancerから全workerに対してロードバランスするような設定となり、workerはそれをin_forwardプラグインで受け取る。プロトコルは MessagePack RPC になる。
workerは in_forward から受け取ったメッセージを out_exec_filter に流し、外部のログ変換スクリプトに食わせる*1。その出力を out_hoop プラグインに渡し、時間ごとにファイルに分けつつHDFSに書き込む。
なおfluentdのout_exec_filterは1プロセスしかforkしない(よね?)ので、workerサーバのCPUコア数にあわせてfluentd+外部プロセスの組を複数立ち上げることになると思う。4Core HTのサーバだったらfluentdを4プロセスかな。まだ詳しく見てみてないのでやってみながら確認。
workerのノード数がどれくらいになるか、実はまだよくわかっていない。ただしHadoop Streamingでバッチ実行していたときに較べ、そこまでは効率は悪くならないと思う*2ので、Hadoop Streaming 9ノードで30分かかっていた処理なら4ノード程度で処理しきれるかも……。うーん、思ってたより必要だな。もうちょっと少なくなると嬉しいんだけど。まあそのへんは試しながら確認する。balancerノード上でworker用のfluentdプロセスも上げようかなあ。
Hoopサーバは1台あれば十分なのはベンチマークとったときにわかっている。NameNodeと共存でいいかな。
性能の話
balancerの性能が足りなくなった場合、1台足してscribedの配送サーバのバランス先に加えてやればいい。ただしscribedの設定変更は再起動を伴うので、実施したときに逃げていくscribelineからのコネクションをどうするかは考えてやる必要がある。配送サーバの再起動が終わってからscribelineのsecondaryに指定してあるログ集積サーバAの側もscribedを再起動してやれば配送サーバ側に接続が戻るんで、そこまで大きな問題でもないんだけど。
scribedの配送サーバも2台構成にしてやれば実はこのへんが超ラクになるんだけど、もちろん1台分コストが増えるのでなんだかなー、という感じ。そこまでやるかな的な。
workerの性能が足りなくなった場合、やはりworkerを足した上で、balancer用のfluentdの設定にバランス先を足して再起動、となる。これはbalancerの各ノードをひとつずつ順番にやればいい。ノード再起動に伴う転送データの再送などはscribedがやってくれる。
耐障害性の話
scribedノードはすべて再送機構を備えている。fluentdのin/out_forwardはそういった機構を備えていないが、Webアクセスログ解析はその特性上シビアなデータ保持が求められることはあまりないので、ノード障害時にそのノードが持っていたデータは失われるものの、大した量でもないから問題でもない。*3
ノード欠損自体は、scribed, fluentdともに障害ノードを転送先から外してくれるので大きな問題ではないし、復旧時の復帰も問題ないはず。1台壊れたときにキャパシティが足りなくなってカタストロフが起きないようにだけしておけばいい。
さて
このエントリをあらためて眺めて問題がないかどうか考える。なかったら実際に組み上げてみてテストだ。
やっぱり変えることにした(12/1追記)
変更点となぜそうすることにしたかを列挙する。
fluentdによるbalancerをやめてscribed deliverにバランスさせることにした
fluentdをbalancerにしてたのはオンラインでのバランス先変更をやる(やれる)と思ったときにそうしようと考えてたんだけど、現状ではそれはできない。……ことがわかったあとも構成をひきずってた。考えなおしてみたらあまり意味がなかった。
scribedでバランスする場合はバランス先の変更時にscribedを再起動すると scribeline の接続先がいちど変わってしまうのが問題だったんだけど、それは balancer がなくなって浮いた分のマシンを回して deliver を2台構成にすることで回避する。
全く同じ設定で deliver を2台上げておく(scribelineの接続先をその2台にしておく)ことで、設定変更・再起動を順番にやればログの取りこぼしも混乱もなくバランス先の変更ができる。実はこっちの方がバランス先変更時の手間も問題も少なく、しかもシンプルな構成にできた。
SPOFもなくなり、どこか1台のマシンが死んだからといってストリーム処理系にデータが流れなくなることも無くなるので、障害に対するデータ再生成バッチの活躍回数もぐっと減るから運用の負担も減る。ぐっど。
scribed関連で気にしないといけないのは SENDING_BUFFER 時のデータ大量転送モード*4で、このときは超巨大なThriftメッセージが飛び交う。普通にやるとこれがひとつのfluentd workerに行くので効率悪くて危険、分散したければ fluentd balancer を入れるしかない。
んだけど、変更後の構成だとバランサになる scribed の前にはもうscribedがいないので、SENDING_BUFFER状態を受ける側にはなりようがない。このため今回の構成ではこれを考えなくて済む。(逆に言うとバランサになるscribedの前にscribedがいる場合はこの問題を考える必要がある。)
ボトルネックはどこだ
workerはいくらでも増やせる。Hoopは前にとったベンチによるとよっぽどのことがない限り大丈夫。となるとあとは scribed deliver が問題になる。
ただし、scribed は起動時にスレッド数を指定できる。ディスクI/Oが問題になる場所(アーカイブサーバ)はストリーム処理側とは切り離されているから、そっち側で多少書き込みが待たされることがあっても解析側には問題がない。……とはいえそれでもボトルネックになる可能性は、あるにはある。
基本的には、scribelineから接続する先を分散させればいい。1サーバに複数プロセス立てたり複数のサーバを並列で使ったりする*5。scribelineを改造して、設定ファイルに並べて書いておいた接続先からランダムにひとつ選んで繋ぐようにしてやればいい。
……が、現状ではまずいらなそうなので、とりあえず scribed は master - slave でいいや。必要になったら考えようかなというところ。