たごもりすメモ

コードとかその他の話とか。

#fluentd meetup in Japan に行ってきた&しゃべってきた

Fluentd meetup in Japanなるイベントをやるけどしゃべらない? というお誘いがあったのでありがたくお受けして参加し、1セッションしゃべってきた。
まだ世に出て半年足らずのミドルウェアのイベントなのに集いも集ったり120人*1、まるまる半日間ひたすら高濃度な時間だった。話してみると、みんなfluentdがフォーカスしてるあたりにやっぱり問題意識をもっていて、ああやっぱりこれは出るべくして出たのだな、という印象だった。
あとからtogetterのまとめページも見たけど異様に長い。どんだけ盛況だったかがわかる。開催時間中、Twitterの日本のトレンドに #fluentd が出てたしな。

会場がほんとにすばらしく、運営もUstream無線LAN解放、電源の確保から飲み物提供まで極めて良い状態だった。主催や運営協力の方々およびフューチャーアーキテクト様、ほんとうにありがとうございました。

しゃべってきた

スライドはこれ。節目に入れていた猫画像は kbysmnr さんからクルトンさんの画像をご提供いただきました。感謝感謝。すごい評判よかった。w

あとUstreamで録画されてます。テキトー英語でわかりにくいところとか、記載を省略した詳細などもあるので、スライド眺めて気になるところがある人は動画で見てみるといいかもしれません。

内容としてはログ集計のための前処理を準リアルタイムにストリーム処理としてやりたい、そのための分散ストリーム処理基盤をfluentdでつくったよ、という話です。設計上どうすればよいのかといったあたりからfluentdのプラグイン構成・設定内容までをひととおり話しました。対象データがWebアクセスログですが、基本的にはどのようなデータに対しても同じように適用できると思います。汎用的なストリーム処理基盤として使う機会があればそれはそれでやりたいですね。
またいきなり極端に大きな規模のものを見せてしまったので尻込みする人が出るとマズいなーと後から思いました。基本的には同じ構成の2サーバ構成とかで簡単にストリーム処理の仕組みは作れるので、小さいデータ規模に対してそういうところから始めるのはアリだと思います。しかもけっこう簡単にスケールアウトできますし。これは今度、別途エントリを起こそうかなと思っています。

内容について疑問をもったことなどがあれば、いつでもTwitter等で質問してもらえればと思います。

雑感

実際にWebサービスでロギングの裏側として使っている、という話から、参加者みんなとの問題意識の共有、fluentd開発者 frsyuki やコミッタ repeatedly とのいくつかの議論などなどまで、たいへん充実してた。
参加者から次々と「プラグインを書きたい」という反応が出てくるのが何よりすごい。プラグインのバリエーションはこのソフトウェアにとって何より巨大な共有財産になると思うので、あの空気ができていたのは本当にすばらしい結果だったと思う。(なんともう実際にプラグイン書かれている!)

自分も会場でさくっとひとつ書いてた。サービスに投入してみてからリリースすると思うけど、やっぱりプラグインが書きやすいのはいいよね。まだまだ書きたいものが山ほどある。コードを書く気をかきたてられるミドルウェアなんて本当にいつ以来だろうね。まだまだしばらくはこの熱気が業界を席巻するような気がする。

おつかれさまでした!

*1:キャンセルでの入れ替えもけっこうあった割に驚異的に高い出席率だったらしい

DeNA x livedoor 合同勉強会でしゃべった

なんか新宿を離れる前にやっとこうぜ! という話があったらしくて昨年末に企画が立った*1合同勉強会に参加してた。

聞いた内容と懇親会の内容については社内イベント的なものなので詳細は省くとしてスゲー楽しかった。またやりたいですね!

しゃべってきた

で、そこでしゃべるがよいと水を向けられたので、ここのところやってきたことについて話してきた。

いま絶賛稼動中(でいろいろ問題あって調整中)のfluentdクラスタの話、というよりは、そこに至るまでの試行錯誤について。こういう話はもう外でする機会もあんまりなさそうだったので簡単にまとめた。峠の話は社内システム(自分が作ったものには峠の名前がついている)の由来についての前振り!
Fluentdクラスタの話は今週末のFluentd meetup in Japanで詳細な実装を中心にすると思うけど、これまでの経緯とかはそっちでは省略すると思うので、興味がある人はこのスライドを簡単に眺めるとよいかもしれません。

*1:ので名称がlivedoorのまま

UserAgent判定器 Project Woothee はじめました

UserAgent判定ライブラリはCPANに数多くあるし他の言語でも似たようなものだと思うが、ライブラリや言語をまたがって一致した結果を返してくれるようなものは存在しない(と思う)。が、特にHadoopを使うようになってJavaの事情をある程度無視できなくなってくると、これがたいへん問題に思えてきた。Javaで書かれたUserAgent判定ロジックが欲しいが、普段書くコードはJavaではない*1ので、他の言語でも全く同じように判定してくれるライブラリが欲しい。結果が食い違っていたり、新しいUserAgentを判定したいときに片方だけ対応されて片方は置き去りになったりすると大変困る。

ということで、作った。v0.1.0。現状ではJavaPerlの実装がある*2

https://github.com/tagomoris/woothee https://github.com/tagomoris/woothee
(移動しました: 最近の多言語対応User-Agentパーサライブラリ woothee について - tagomorisのメモ置き場)

(v0.2.0 でruby/pythonが追加された。see: http://d.hatena.ne.jp/tagomoris/20120613/1339583174)

もちろんこのテのライブラリは作れば終わりではなく、むしろその後に延々と続くもぐら叩き*3が本番とも言えるので、作ったといって喜んでいるわけにはいかない。

にしてもいちおうひととおりAPIは整備して livedoor bloglivedoor news のログをぶちこんでも目立った誤検知はなさそうに見えたので、まあそこそこ使えるんではないかなと思う。

機能

UserAgent文字列をぶちこむと、判定結果を保持したハッシュを返す。基本的にはそれだけ。

// import is.tagomor.woothee.Classifier;
// import is.tagomor.woothee.DataSet;
Map<String,String> r = Classifier.parse("user agent string");
r.get("name");        // => name of browser (or string like name of user-agent
r.get("category");    // => "pc", "smartphone", "mobilephone", "appliance", "crawler", "misc", "unknown"
r.get("os");          // => os from user-agent, or carrier name of mobile phones
r.get("version");     // => version of browser, or terminal type name of mobile phones

Javaならこうだし、Perlだとこっち。

use Woothee;

Woothee::parse("Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0)");
# => {'name'=>"Internet Explorer", 'category'=>"pc", 'os'=>"Windows 7", 'version'=>"8.0"}

Woothee::parse("Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)");
# => {'name'=>"Googlebot", 'category'=>"crawler", 'os'=>"UNKNOWN", 'version'=>"UNKNOWN"}

Woothee::parse("Opera/9.80 (Android; Opera Mini/6.5.27452/26.1305; U; ja) Presto/2.8.119 Version/10.54");
# => {'name'=>"Opera", 'category'=>"smartphone", 'os'=>"Android", 'version'=>"9.80"}

たいへんかんたんですね!

既知のメジャーなクローラのみ判定するAPIもいちおう作ってある。これだけ高速にやりたいケースが手元であったので。

use Woothee;
Woothee::is_crawler("Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0)"); # => 0
Woothee::is_crawler("Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"); # => 1

もう2012年でもあることなので、古いブラウザやケータイなどの判定はだいぶいいかげん。あとLinuxBSDなどもかなりいいかげん(Androidを除いて)だし、ThunderbirdIceweaselでのアクセスも見付けたけど、ブラウザ名称としてはUNKNOWN扱いで放置したりしてある。マイナーなものに対応するよりは速度をそこそこ高目に保っておきたい傾向。

hive udf

これは自分で使うので簡単に書いてみた。

add jar woothee.jar;

create temporary function parse_agent as 'is.tagomor.woothee.hive.ParseAgent';
create temporary function is_pc as 'is.tagomor.woothee.hive.IsPC';
create temporary function is_smartphone as 'is.tagomor.woothee.hive.IsSmartPhone';
create temporary function is_mobilephone as 'is.tagomor.woothee.hive.IsMobilePhone';
create temporary function is_appliance as 'is.tagomor.woothee.hive.IsAppliance';
create temporary function is_crawler as 'is.tagomor.woothee.hive.IsCrawler';
create temporary function is_misc as 'is.tagomor.woothee.hive.IsMisc';
create temporary function is_unknown as 'is.tagomor.woothee.hive.IsUnknown';
create temporary function is_in as 'is.tagomor.woothee.hive.IsIn';

SELECT
 count(is_pc(parsed_agent)) as pc_pageviews,
 count(is_in(parsed_agent, array('pc', 'mobilephone', 'smartphone', 'appliance'))) as total_pageviews
FROM (
    SELECT parse_agent(useragent) as parsed_agent FROM access_log WHERE date='today'
) x

こんなふうに一発でクエリできる*4。超便利。

woothee.Classifier.parse() Iを2〜3行のwrapperでくるんであるだけなので、pigとかでも必要ならすぐ書けるんじゃないかな。

なんで作ったの

先に書いた通り、JavaPerlで完全に一致する挙動のUserAgent判定機が欲しかった、ということ。どうやって作るのか、作ったあとのメンテナンスが本当に継続できるのか、といった問題は少し前に知恵熱が出るほど考えたんだけど、結局押し切った。
で、このへんの一番の問題である「同じように見えるんだけど微妙に違う」答えを返すとかそういうのに悩まされるのは絶対に避けたかったので、まず返り値に使うデータを言語をまたいだひとつのデータセットとして(yamlで)作った。同じようにテストケースについても、UserAgent文字列とその判定結果、というセットを言語をまたいで(yaml)で作った。
Perl/Javaのテストケースはこれらのyamlを読み込んで走るだけなので、余程おかしい書き方をしない限りは言語間での挙動のミスマッチは避けられるはず、程度のことは考えている。
実装方法についても、言語をまたいで実装の共有ができないかと、正規表現の集合で構成するとか何らかのDSLを発明して各言語のコードを生成するとかあれこれ考えたけど、結局はそれぞれに実装している。やってみてわかったが部分文字列探索と簡単な正規表現の集合にしかならないので、別言語で実装し直すことのコスト自体はそんなに高くなかった*5

で、このライブラリでは複数の言語の実装をまとめて収容しているけど、結局大事なのはテストケースの共有だけなのかな、と思う。もちろん、たとえば南アフリカで一大勢力を誇っている携帯キャリアの判定を日本でしたいかと言われるとそんなことはないので、docomo/au/SoftBankの判定を行ってしまうこのライブラリが世界各国で必要かというとそんなことはないだろう。なので部分的にテストケースの実行や判定そのものをon/offできた方がいいんだろうなとは思う。万が一広く使われるようになったら、configure的な処理をビルド前に入れて、各国向けのコードを生成する、とかやる日が来るかもしれない。

が、それでも Windows/OSX/MSIE/Chrome/Safari/Firefox/iOS/Andorid/Googlebot といった、アクセス元の6割以上を占めるようなものの判定が世界中で共有されうるのは確かだと思う。そういった試みをそろそろ始めてもいいんじゃないかなということを頭に置きながらこのコードを書いていたし、いま改めてそう思ってこのエントリを書いている。

patches welcome!

パッチを大募集中です! 特にテストケースについて、「こんな端末がこんなUAだよ! これから増えるからサポートしてよ!」的なものを募集しております。UAの例の文字列だけでもよいのでお寄せいただけますとたいへん助かります。
またPerl/Java以外の言語の実装についてですが、Javaを見ながらPerlを書いたら7時間くらいで書けたので、他の言語でもそれなりに素早く書けそうな気がします。書いてしまったから入れろ! という心強いご連絡をお待ちしております。

*1:し、Javaで書きたいとも思わない

*2:あとhive udfの実装も

*3:新しいブラウザへの対応、既存ブラウザのバージョンアップによるUserAgentパターンの変化への追従、新OSや新クローラへの対応、などなどなどなど

*4:countで使えるように is_xxx 系のは全部 TRUE or NULL を返す

*5:Perlだと強力な拡張正規表現を使って云々とかもちょっと考えたが、簡単に速度計ってみたら遅くて使えなさそうだった。

Hive UDFを自分で追加するときの注意点メモ

メモ。CDH3u2 (hive-0.7.1-cdh3u2) での話。

(1/23 HiveServerについていくつか追記した)

Hiveで自分でつくった関数(User Defined Function: UDF)を使いたい! と思い艱難辛苦を乗り越えJavaのコードを書きjarにまとめたとする。書くまでの話はWikiの該当ページなどを熟読するのがよろしい。

で、じゃあどうやってHive起動時に読み込めばいいの、という話。

add jarコマンド

hiveコマンドを起動するマシンの適当なディレクトリにjarファイルを置き、そのディレクトリをカレントディレクトリとしてhiveを起動して以下のコマンドを実行する。

hive> add jar udfclass.jar;
hive> create temporary function myfunc as 'my.package.udf.Class';
hive> select myfunc(col1) from x_table where ...

これが存外うまくいくのでヨッシャヨッシャと思う。罠である。

(1/23追記) このjarのパスはhiveを実行しているマシン上のローカルファイルシステムのパスらしい。 /home/username/tmp/hoge.jar みたいに書けば読み込める。HiveServerに接続して実行するときはHiveServerを実行しているマシン上のjarの絶対パスで指定するのがよい。

hive-default.xml の hive.aux.jars.path を設定する

カレントディレクトリがどうとかに依存していると運用がうまくいくわけがないので、特定の場所にjarファイルを置いてそこを見るように設定したい。どうするか。これもWikiから設定項目リストを眺めると、ぴったりなもの hive.aux.jars.path というのがあるので、これを使えばいい、らしい。
とはいえ「パスを書け」としか書かれてなくてなにを書けばいいのかよくわからん。どうすればいいのかは機能追加時のチケットにあった。

  • ローカルファイルシステムを指定したい場合は file:///home/username/tmp/lib みたいに指定する(jarのあるディレクトリを指定)
  • HDFS上に置いたjarを指定したい場合は /hdfs/dir/path/tmp/lib のように書く

普通に / から始めるとHDFS上のパスを指定したことになるので注意。ディレクトリを指定することにも注意。末尾の / はあってもなくても大丈夫っぽい。

ということで、以下のように書く。

<!-- <configuration> ... </configuration> 内だよ -->
<property>
  <name>hive.aux.jars.path</name>
  <value>file:///home/username/tmp/lib</value>
</property>

その上で create temporary function などを実行。

hhive> create temporary function myfunc as 'my.package.udf.Class';
hive> select myfunc(col1) from x_table where ...

これでmapreduceが起動して万歳! だったらよかったんだけど。 うまくいかなかった。以下実例。

hive> create temporary function parse_agent as 'is.tagomor.woothee.hive.ParseAgent';                       
OK
Time taken: 0.25 seconds
hive> select parse_agent(agent),agent from access_log where service='top' and yyyymmdd='20111201' limit 30;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201110281852_0528, Tracking URL = http://namenode.analysis.livedoor:50030/jobdetails.jsp?jobid=job_201110281852_0528
Kill Command = /usr/local/hadoop-0.20.2-cdh3u2/bin/hadoop job  -Dmapred.job.tracker=master101.analysis.livedoor:50031 -kill job_201110281852_0528
2012-01-19 19:45:50,204 Stage-1 map = 0%,  reduce = 0%
2012-01-19 19:46:08,302 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_201110281852_0528 with errors
java.lang.RuntimeException: Error while reading from task log url
        at org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor.getErrors(TaskLogProcessor.java:130)
        at org.apache.hadoop.hive.ql.exec.ExecDriver.showJobFailDebugInfo(ExecDriver.java:903)
        at org.apache.hadoop.hive.ql.exec.ExecDriver.execute(ExecDriver.java:694)
        at org.apache.hadoop.hive.ql.exec.MapRedTask.execute(MapRedTask.java:123)
        at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:130)
        at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:57)
        at org.apache.hadoop.hive.ql.exec.TaskRunner.run(TaskRunner.java:47)
Caused by: java.io.IOException: Server returned HTTP response code: 400 for URL: http://10.xxx.xx.xxx:50060/tasklog?taskid=attempt_201110281852_0528_m_000008_0&all=true
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1436)
        at java.net.URL.openStream(URL.java:1010)
        at org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor.getErrors(TaskLogProcessor.java:120)
        ... 6 more
Ended Job = job_201110281852_0528 with exception 'java.lang.RuntimeException(Error while reading from task log url)'
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.MapRedTask

なんかこんなエラーになって全く上手く動かない。どうやってもダメ。jarをlocal filesystemに置いてもhdfsに置いてもダメ。もうね……。

環境変数 HIVE_AUX_JARS_PATH をセットする

jarのパスはもうひとつセットする方法があって、環境変数で指定する。これは local filesystem のみ指定できる。

export HIVE_AUX_JARS_PATH=/home/username/tmp/lib
hive

その上でクエリを実行。

hive> create temporary function parse_agent as 'is.tagomor.woothee.hive.ParseAgent';
OK
Time taken: 0.26 seconds
hive> select parse_agent(agent),agent from access_log where service='top' and yyyymmdd='20111201' limit 3;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201110281852_0529, Tracking URL = http://namenode.analysis:50030/jobdetails.jsp?jobid=job_2
01110281852_0529
Kill Command = /usr/local/hadoop-0.20.2-cdh3u2/bin/hadoop job  -Dmapred.job.tracker=namenode.analysis:50031 -k
ill job_201110281852_0529
2012-01-19 19:53:33,768 Stage-1 map = 0%,  reduce = 0%
2012-01-19 19:53:34,779 Stage-1 map = 56%,  reduce = 0%
2012-01-19 19:53:35,786 Stage-1 map = 100%,  reduce = 0%
2012-01-19 19:53:36,794 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_201110281852_0529
OK
DATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATA
DATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATA
DATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATADATA

できたできた!!!!

create temporary function を自動化したい

毎回 create temporary function を打つのは嫌だし HiveServer 使う場合どうするんだよというのもあるので、こいつをhive起動時に自動的に実行するようにしたい。
hiveコマンドにはオプション -i filename というのがあって、これを指定すると filename の中に書いたクエリを自動的に実行してくれるらしい。

ということで、やってみた。

$ cat hive/hiverc.hql 
create temporary function parse_agent as 'is.tagomor.woothee.hive.ParseAgent'
$ hive -i hive/hiverc.hql 
Hive history file=/tmp/edge-dev/hive_job_log_edge-dev_201201211526_471516605.txt
hive>

できたできた! 起動したhive CLIで普通に指定の関数が使えたよ! *1

(1/23追記) この -i オプション(やその他 hive コマンドのオプションとして一般に見られているもの)は --service hiveserver を指定したときには使えない。注意。

auxpathオプション (1/23追記)

jarを読み込むディレクトリの指定方法がもうひとつあった。HiveServerまわりのためhiveコマンドのコード読んでたら見付けた。

$ hive --auxpath /path/to/lib/dir # CLI
$ hive --auxpath /path/to/lib/dir --service hiveserver # HiveServer

これを指定すると HIVE_AUX_JARS_PATH にセットするのと同じ効果が得られる。

まとめ

Hive UDF用のjarを追加する方法はいくつかあるけど、運用を考えると環境変数 HIVE_AUX_JARS_PATH をセットしておくのがよいでしょう。動くし。
create temporary function はめんどくさいので初期化用クエリファイルを用意しといて hive -i で読み込むとよいでしょう。以下のようにしてもいいレベル。

alias hive="hive -i hive/hiverc.hql"

わーい、いろいろ楽になったぞー!

HiveServerの場合 (1/23追記)

HiveServerを使う場合はちょっと事情が異なる。まず -i オプションが使えない(起動時にセットアップクエリを走らせることができない)ので、外部から接続したときに初期化処理としてこれらのクエリを実行してやる必要がある。
また create function の結果はそのセッション内でしか有効ではないので、繋ぎなおすたびに初期化処理は実行する必要がある。jarの読み込みについても同じ。

jarの読み込み元(AUX_JARS_PATH)についてはHiveServerの起動時に環境変数か --auxpath で指定する。*2
ただしこれらを指定したHiveServerに対しても jar ファイルの読み込みについては、どうせ add jar しないといけないっぽいようだ。しないと失敗する。起動時のコマンドでは読み込んでいるようなんだけど……Javaのコードまで追ってないのでよくわからん。

ということで結局 add jar を実行することになるので、もうあれこれ考えるのも面倒くさいし、自分は HiveServer 上の絶対パスで指定することにした。

Hive CLIをHiveServerに接続して使う場合

普通に add jar と create temporary function してからクエリを実行すればいい。

$ hive -h localhost -p 10000
[localhost:10000] hive> add jar /path/of/hiveserver/localfs/lib/hoge.jar;
[localhost:10000] hive> create temporary function hoge as 'package.of.hoge.Hoge';
[localhost:10000] hive> select hoge(col1) from ...;

そういえば hive cli のHiveServerに接続する機能がCDH3u2使ってみたらいつの間にか取り込まれてましたね。ぐっど。

HiveServerにThrift接続して使う場合

thrift client でHiveServerに接続したら、その接続上で execute() を何回か発行して add jar や create temporary function を実行する。
そのとき以下の点に注意する。

  • add jar の引数の与えかた(jarのパス指定)は Hive CLI 経由でHiveServerに繋ぐ場合と同じ、つまりHiveServer上のローカルパスで指定する
  • add jar / create function はそれぞれ一度のexecute()に1行ずつ実行する
    • セミコロンで繋げてえいやと行けないかと思ったけど無理だった、エラーになる
    • というかセミコロンあるとエラーになるので注意な

まあだいぶめんどくさい、けど、コードからクエリが実行される分には、実行対象クエリの前に client オブジェクトをセットアップする(初期化用クエリを実行したあとのclientインスタンスを渡してくれる)メソッドをひとつ書けばいいので、まあ許容範囲内だった。

ということでだいたい解決しました!

*1:当初うまくいかないと書いておりましたがクラス名をtypoっておりまして大変申し訳ございませんでした。

*2:hive.aux.jars.pathはやっぱり動かなかった。原因がわからん。

Hoop(httpfs)とwebhdfsの違い

Hadoop 1.0.0リリースされた。まあ中身のほとんどはただの 0.20.x 安定板リリースなので特別に言うことはないんだけど。詳しくは以下のblogを読むのがよろしい。

hadoopのバージョン表記について - 科学と非科学の迷宮

ただしひとつだけびっくりしたのは、webhdfsなる機能が入ってきたこと。(このblogでよく話題にしている)Hoopと並んでそんなようなものがあること自体は知ってたけどあんまり興味なかったのだが、Apache Hadoopのパッケージに(Hoopより先に)入ったとなるとちょっと注目せざるをえない。
が、httpfs(Hoop)とwebhdfsじゃ名前も似てて超まぎらわしい。いったい何がなんなの。

なお自分はWebHDFSはAPIリファレンスを読んだだけで、実際にはカケラも触っていない。その状況での理解による内容なので、注意して読んでください。

先に結論だけ書く

どちらもクライアントからはHTTP REST APIで叩く。可能なオペレーションの幅も同じ。認証もユーザ名ベースの仮認証(pseudo auth)かKerberosのどちらか。ただしサーバ側をどのノードが担当するかという点、およびHTTPの通信パターンが異なる。
ぶっちゃけて言うと、これまでJava実装のHDFSClientがやっていた通信をHTTP REST APIで置き換え可能にする、というものがWebHDFSだと言っていい。

このため、機能的にはほぼ同じだが、性能特性はいくらか異なることが考えられる。どのようなアクセスパターンでどちらが良いかという確定的なことはまだ言えない。ぶっちゃけだいぶ実装による。ので、実際に試すしかないと思う。クライアントの実装がたぶん重要。
あと安定性にも違いが出るかもしれない。webhdfsのHTTPサーバ部分の実装がhdfsコアと同じ完成度なのかどうか、という点が重要。ここの完成度が高いならwebhdfsも安定するだろう。Hoop(httpfs)は今のところ安定性の面で問題を見付けたことはない。

Hoop(httpfs)の構造

Hoop Serverを立てる場合の構造および通信パターンとプロトコルをざっくり図示すると、こんな感じ。

クライアントは全ての操作について、Hoop Serverに対してHTTP REST APIでリクエストを投げる。Hoop Serverはリクエストを受け取ったら、それに対応するHDFSアクセス操作をJava実装のHDFSClient経由でHadoopクラスタに対して行い、結果をクライアントに戻す。

構造は単純だ。Hoop ServerはただのプロキシサーバでHDFS側から見ると普通のクライアントアプリケーションなので、ぶっちゃけ我々でも書こうと思えば書ける。

WebHDFSの構造

対してWebHDFSの場合、ノード間の関係および通信パターンはこんな感じ。

つまり、全てのNameNodeとDataNodeがHTTPをしゃべる。いっぽうプロキシサーバ的なものは存在せず、HDFSClientが各ノードと行っていたような通信を、そのかわりにHTTPを使って行うこととなる。HDFSClientの実装に依存するしかなかったクラスタとの直接通信をHTTPで代替できるようになったということだ。

いっぽう面倒くさい部分がある。周知の通りHDFSはディレクトリ構造およびファイルのメタデータをNameNodeが、ファイル実体をDataNodeが管理している。このため、ディレクトリ構造やファイルのメタデータに関する操作ならNameNodeにリクエストを投げてレスポンスを見ればいいのだが、ファイルの内容に関する操作(ファイル作成・追記、ファイルの内容取得)については、以下のような手順を踏む必要がある。

  1. NameNodeのリクエストを送信
  2. NameNodeのレスポンスによりDataNodeのどれかにリダイレクトされる
  3. DataNodeにリクエストを送信
  4. DataNodeから結果が返ってくる

上記のように一回のHTTPリクエストで処理が完結しない。これはクライアントの実装を行う上ですこし面倒だし、性能面でも影響を与える可能性がある。*1
いっぽう間にプロキシを挟まないためDataNodeと直接通信をする部分においてはスループットがかなり出ることも予想され、その面ではパフォーマンスに関する期待は持てる。つまりは結局アクセスパターン次第ですね、ということになるかもしれない。

なおHTTPリクエストのパターンが面倒なのは、将来的には解決される可能性がある。本来ならレスポンスとして 'Expect: 100 Continue' を使って直接DataNodeに処理を引き継ぐようにしたいらしい。現状でNameNodeのレスポンスがDataNodeへのリダイレクトになっているのはJettyのバグのせいであるとREST APIに明言されている。
これが解決されたら、クライアント側で2回のHTTPリクエストをハンドリングする必要がなくなるかもしれない*2。ただし、性能特性としては変わらないままだろう。

雑感

NameNodeおよびDataNodeへの直接の通信は、これまでJava実装によるHDFSClientに頼るしかない状況だった。これがHTTP REST APIという平易なもので置き換え可能になったという点で、WebHDFSの存在意義は大きいと思う。つまり誰でも各言語ネイティブのHDFSクライアントライブラリが書けるようになったということだ。
WebHDFSのREST APIリファレンスはだいぶしっかりした作り*3になっていて、仕様的にちゃんと考えられているんだなあ、という気もする。レスポンスJSONスキーマなんかもちゃんと書かれているのでだいぶ実装にあたっての心理的障壁は低い*4。このAPIの互換性がしっかりと守られるなら、もしかしてJava実装のHDFSClientベースでクライアントライブラリを使うよりもWebHDFSベースでクライアントライブラリを書いてしまった方がバージョン互換性が保たれるということにもなりそうだ。

とはいえ、全てのクライアントアプリケーションがNameNodeおよびDataNodeのすべてと相互に通信するというのはなかなか面倒だし、トラブル時に何をどう考えればいいのかも複雑になる。なので、通信パターンの単純化、ということだけ考えてもhttpfs(Hoop)の存在意義はある。実際の運用を考えるとこの面はあまり軽視したものでもない。

ということで最後にはいつもの通りだが、用途とアクセスパターン、運用状況にあわせて適宜選択するのがよろしいでしょう。ああ、誰かHadoop 1.0.0のWebHDFS実装でベンチマークをとってくれないか・・・!

おまけ

ふと考えたんだが、HTTP REST APIでクライアントのリクエストを受け取ってWebHDFSでHadoopクラスタに処理を投げるプロキシサーバがあると、実は一番良いんじゃなかろうか。本当かな。

*1:前半のNameNodeからのレスポンスをキャッシュできるのかどうか、できるとしたらどの条件下でキャッシュできるのか、が明らかになれば少し違うかもしれない。

*2:HTTPクライアントライブラリが処理を裏側でやってくれるなら。

*3:ただしPDFのみ。なぜ……。

*4:この点Hoopはだいぶダメダメ。。。

2011年を振り返ってみる

思えば、年頭から年末までずっとプログラマとして生活してた。世間的にいろいろあったし自分が受けた影響も決して小さくはなかったけど、プログラマとしてのみ見ればだいぶ充実した年だった。

2010あとしまつ期、ログ収集・解析初期 1月 2月 3月

あるいはscribed/hadoop/hive導入期。この頃、次の仕事はログ収集だー、ということであれこれ始める。実は当初は解析まわりを自分でやることになるとは思ってもみなかったのだが、あれこれあってHadoopを触りはじめて今に至っている。

ホスト管理アプリケーション yabitz (ヤビツ)のコードを公開した - tagomorisのメモ置き場 ホスト管理アプリケーション yabitz (ヤビツ)のコードを公開した - tagomorisのメモ置き場

社内で使うために書いて、いまも元気に動いているツール。ちょこちょこメンテナンスや機能追加などしてたりする。最初はあまり大きなツールにならないだろうと思ってsinatraで書きはじめたんだけど、これは明らかに失敗だった。Railsにしときゃよかった。まあ修正はしやすいんだけどね……。

Apacheログパーサを書いた - tagomorisのメモ置き場 Apacheログパーサを書いた - tagomorisのメモ置き場

これは今思うとなんでこの時期に書いたんだっけという代物だけど、今でももちろん使ってる。このときCPANに公開しそこねたせいでいまだにCPAN authorになれてません。やっとこうかなーどうしようかなーということはその時にやっとけ、という教訓ですね。

ほかにもHiveServerいじったりnode.jsいじったり、あれこれやってた。そして3.11をきっかけに、仕事も何もかも手元のMacBookAir1台でどうにかなる環境を整えた。

釣りの季節Hadoop/node.js期 4月 5月 6月 7月

hadoopジョブの投入方法をあれこれ考えたりして継続しつつ、shibと名付けたHiveクライアントWebアプリケーションをこの時期にずっと作ってたかな。node.jsまわりの細かなあれこれをblogエントリにすることが多かった。

node.jsの非同期I/Oにおけるデータ受信のパターンのバリエーション - tagomorisのメモ置き場 node.jsの非同期I/Oにおけるデータ受信のパターンのバリエーション - tagomorisのメモ置き場
node.jsで複数の処理を並列に実行して全部完了したらコールバックを呼び出したい - tagomorisのメモ置き場 node.jsで複数の処理を並列に実行して全部完了したらコールバックを呼び出したい - tagomorisのメモ置き場

また、Hadoopまわりで本格的にあっちこっちクビを突っ込みはじめて知り合いが増えた。これのおかげで、秋から冬にかけていろいろ面白いことになったような気もする。

"Hbase at Facebook" に行ってきた - tagomorisのメモ置き場 "Hbase at Facebook" に行ってきた - tagomorisのメモ置き場

が、その一方でなんとなーく書いたblogエントリが大ヒットしてしまってどういうことなのかうろたえることが多かったのもこの時期。なんであんなことになったのか。特にみっつめのエントリはコメント欄が面白くてウオッチせずにいられないと友人知人に大評判でした。おまえら仕事しろ。

RAIDレベルの話: 1+0と6はどっちが安全か? - tagomorisのメモ置き場 RAIDレベルの話: 1+0と6はどっちが安全か? - tagomorisのメモ置き場
xargs を使ってカジュアルに並列処理 - tagomorisのメモ置き場 xargs を使ってカジュアルに並列処理 - tagomorisのメモ置き場
いっしょに仕事をしたいプログラマ 5つの特徴 - tagomorisのメモ置き場 いっしょに仕事をしたいプログラマ 5つの特徴 - tagomorisのメモ置き場

しかし実際、今でも考えは変わってない。最近 こんな記事 が話題になったりもしてましたね。あるわー、それおれ半年前に書いたわー(そして炎上したわー)とか。

イベント期 8月 9月 10月

アウトプット期。8月はISUCON、9月はHadoop Conference Japan 2011 Fall、10月はYAPC::Asia 2011とものすごいイベント続きでだいぶやばかった。何がやばいってだいたい直前までコード書いてたことか。

isucon終了に寄せて - tagomorisのメモ置き場 isucon終了に寄せて - tagomorisのメモ置き場

ISUCON「つくりかた」も。8月はずっとこれやってた。まあ、なんか、面白かったなーって。イベントそのものが面白かったのと、副作用的に色んな人と知り合いになれた、知ってもらえた、というのは大きかった。

Node.jsなWebアプリでJobQueueなしにラクラク巨大処理を実行 - tagomorisのメモ置き場 Node.jsなWebアプリでJobQueueなしにラクラク巨大処理を実行 - tagomorisのメモ置き場
#isucon を支えた技術: ベンチマークmaster/agentの構造とnode.jsの話 - tagomorisのメモ置き場 #isucon を支えた技術: ベンチマークmaster/agentの構造とnode.jsの話 - tagomorisのメモ置き場

ISUCONはnode.jsのコードでできているといっても過言ではなくて、そんなあたりで考えたことがいくつかこのあたりに。いまだにリアルタイム処理やりたいと思ったことはないし大規模Webアプリ開発をnode.jsでやるとかないわーって感じだけど、それでもnode.jsわりといいよって思ってる。

Hive Client Webアプリケーション shib をつくった - tagomorisのメモ置き場 Hive Client Webアプリケーション shib をつくった - tagomorisのメモ置き場
Hadoop Conference Japan 2011 Fall に行ってきた&しゃべってきた - tagomorisのメモ置き場 Hadoop Conference Japan 2011 Fall に行ってきた&しゃべってきた - tagomorisのメモ置き場

こっちはHadoop系でやってることをひとまとめ的にしゃべってきた時の話。10分だったのでだいぶざっくりとしたまとめだったけど、何やってるかは明確にできたのかなー。その後またあれこれやってます。まとめてアウトプットする機会があるかなあ。

YAPC::ASIAに行ってきた&しゃべってきた #yapcasia - tagomorisのメモ置き場 YAPC::ASIAに行ってきた&しゃべってきた #yapcasia - tagomorisのメモ置き場
サラリーマン人生における希望と絶望 - tagomorisのメモ置き場 サラリーマン人生における希望と絶望 - tagomorisのメモ置き場
YAPC::ASIAで、あるいは他の勉強会で、しゃべりたいこと、聞きたいこと - tagomorisのメモ置き場 YAPC::ASIAで、あるいは他の勉強会で、しゃべりたいこと、聞きたいこと - tagomorisのメモ置き場

YAPC::Asia 2011でしゃべってきた。……のは小粒な話だったけど、まあ。年頭にいちおう目標にしてたので実現できてよかった。もう少しPerlがっつりなことをやってると面白いのかなーという気もするけど、来年どうしようかな。
で、YAPC前にtokuhiromという人がYAPC開催の1週間くらい前にホッテントリになるような面白い一発ネタエントリを上げておくって言っていたのを読んだのでその通りにしてみたんだけど、いやー、なんかねー、あんま変わらなかったような気がするなあ。色んな人に「よく釣れましたか?(ニヤニヤ」みたいなことは言われたけど。ちょっと反省した。まあ、ええ、よく釣れました。今にして思うと「面白い(Perlのコードの)一発ネタエントリ」ってことだよなきっと。

ふたたびログ解析期 11月 12月

ふたたびHadoop、そしてfluentd期。現在進行形。Hadoop Conference Japan 2011 Fallのときに「ストリーム処理やりたい」と言ってたのを、ついにfluentdで開始した。あれこれコード書いたりベンチマークとったりしてる。

Hoopの性能を確認してみたらもうlibhdfsとかオワコンでHoop使えって結果になった - tagomorisのメモ置き場 Hoopの性能を確認してみたらもうlibhdfsとかオワコンでHoop使えって結果になった - tagomorisのメモ置き場
fluentd のベンチマークとってみたよ! - tagomorisのメモ置き場 fluentd のベンチマークとってみたよ! - tagomorisのメモ置き場
Webサーバログ転送・ストリーム処理系私案 - たごもりすメモ Webサーバログ転送・ストリーム処理系私案 - たごもりすメモ
fluent-plugin-hoop v0.1.0 released - tagomorisのメモ置き場 fluent-plugin-hoop v0.1.0 released - tagomorisのメモ置き場

このへんはまだようやく形になってきたところ。とりあえず使うコードをひととおり書き上げたので、あとはこれから、実際に運用に乗せながら手直ししていく。まだまだやることが山積みでだいぶ楽しいです。

まとめ

こうして見るとblogエントリはそのときやっていたことを見事に反映している。うん、コード書きまくってた1年でしたね。来年もこうありたい。毎年具体的な目標は2月に温泉でぐったりしながら考えることにしているので、多分来年もそうする。
2011年2月に考えたことは以下みっつだった。おお、すげえ、達成してるぞ。

  • コード書きまくる
  • YAPC::Asiaでしゃべりたいのと、あと他にもどこかのイベントに出られたらいいなあ
  • クルマ買ってあちこちクルマと自転車でふらふらしよう

2012年はどうしようかなあ。会社が親会社に統合されたとかであれこれ職場環境が変わったりするのは見えてる。ただ、問題意識そのものがいきなり変わるとも思えないので、当面見える問題をコード書いて解決していきたい、という方向は変わらないと思う。

2011年、みなさまに大変お世話になりました。お蔭様で楽しいエンジニアリングの1年でした。2012年もよろしくおねがいいたします。

fluent-plugin-hoop v0.1.0 released

みんな大好き fluentd は素敵だと思うんだけどHDFSへの書き出しをするプラグインが無い! なんで! という全世界100万人のエンジニアの怨嗟の声が聞こえてきそうだったので、とりあえずどうにかするべく書いた fluent-plugin-hoop がだいたい動くようになったのでリリースした!

rubygems.org経由でリリースしてあるので gem install fluent-plugin-hoop で入ります。あとはなんだっけ、システム全体の環境に影響を与えたくなければ fluent-gem install fluent-plugin-hoop かな。

Hoop(HDFS http-fs)はHDFSへのオペレーションをover HTTPで提供するサーバソフトウェア。詳しくはぐぐれ。ぐぐってください。*1
書き込みパフォーマンスが十分に出そうだったので、これを経由してHDFSに書き込むということにすればRubyでさくっとfluentdのプラグインが書けそうだったので書いてみた。さくっと書けたという割にはあれこれ試行錯誤があったけどいちおうできて、そこそこのデータ流量にしてみても問題ない程度にはパフォーマンスが出ているので公開してる。自分の手元の用途に対して本能に大丈夫かどうかはまだ最終確認できてない……。

なお、現状の動作確認はすべて Cloudera版の Hoop 0.1.0-SNAPSHOT で行っています。0.23系クラスタを手元で作れる誰かに是非試してみてほしい。

特徴

ログに書かれている時間を見て書き込み先の(HDFS上の)パスを確定するようになっている*2。このため、普通のログローテーションにありがちな「23:59:59のログが翌日分のファイルに泣き別れ」などの事態を避けられる。
またTAB区切りのプレーンテキストファイルで各値を出力できるようにしており、これはHiveが直接読める形式なので個人的にはとても嬉しい。0分0秒*3にはLOAD一発走らせることででHiveクエリが発行可能になる。グレイト。

残念なのが、書き込み先のHDFSパス指定でタグを使えない(パス指定でタグを展開するような設定ができない)こと。これは fluentd TimeSlicedOutput の仕様によるので、ちょっと簡単にはどうにもできない。このためタグごとに出力パスを分けたい場合は設定ファイルにおいて分けたいタグの数だけmatch節を並べる必要がある。
正直に言ってこれはだいぶ残念なので、時間ができたらどうにかできないか考えてみたい。fluentdにTimeSlicedOutputより更に高機能なOutputプラグインベースクラス*4を作ることになるのかなあ。

ベンチマークをとっていて明らかになったことだが、HoopServerは同一パスへの追記が複数のHTTPセッションから混在して連続するとどうも怪しい挙動を示すことがある。バグ報告したいし確認用のコードも書いたけどまだしてない*5。これを回避するため割とヒューリスティックなwaitつきretryの処理を入れてあって、トラフィックを見てるとちょっと不審な挙動を示すのが残念なところ。

ベンチマーク

とりあえずベンチマーク

サーバは前のfluentdベンチマークのときと同機種のものを使った。構成としてはベンチマークスクリプトが scribe プロトコルでデータを送り、fluentdは in_scribe でそれを受信、out_hoop で HoopServer に送信する。HoopServerはHoopベンチマーク時のNameNodeに立てたのをそのまま使っている。
fluentdはサーバ上に4プロセス起動、ベンチマークツールも(別サーバ上に)4プロセス起動し、ベンチマークツールとfluentdは1対1に対応している。fluentd 4プロセスはそれぞれ同時にHoopServerと通信し、出力はHDFS上のひとつのファイルに4プロセス分混ざって出力される。

fluentdの設定ファイルは以下の通り。gistにも置いといた。ただしin_scribeのポート番号のみ 14631, 14632, 14633, 14634 とひとつずつずらしてある。

<source>
    type scribe
    # 14631, 14632, 14633, 14634
    port 14631
    add_prefix scribe
</source>

<match scribe.*>
    type hoop
    hoop_server hoop-server.local:14000
    path /hoop/%Y%m%d/scribe-%Y%m%d-%H.log
    username edge-dev
    time_slice_wait 30s
    flush_interval 5s
    output_include_time false
    output_include_tag  true
    output_data_type attr:message
    add_newline false
    remove_prefix scribe
    default_tag   unknown
</match>

出力先ファイルは1時間ごとに切り替え、最大30秒は遅延ログの到着を待つ、という設定。scribeプロトコルで受け取ったログのmessageデータは末尾に改行を含むので out_hoop 側での改行付与はオフにしてある。
また後々他のログデータのルーティング設定と併存させたときに混ざらないよう in_scribe 側でタグのprefixとして 'scribe' をつける*6。in_scribe は受け取ったデータのタグは category を見るため、例えば category が 'foo' だった場合、fluentd内部でのルーティング時にはタグは scribe.foo となる。out_hoopの設定では remove_prefix scribe があるので、fleuntd内部のルーティング用につけた文字列はここで除去されてファイルに書かれる。

ベンチマークスクリプトgithub.com/tagomoris/fluentd-tester/ に置いてあるコレ。保存ログから欠損を見付けるためにタグ(scribeにおけるカテゴリ)に通番を埋められるようにしたりしてある*7。こんな感じで起動する。

 $ ruby scribe_stream.rb hoop-server.local 14631 first 300 250 40 720 &
 $ ruby scribe_stream.rb hoop-server.local 14632 second 300 250 40 720 &
 $ ruby scribe_stream.rb hoop-server.local 14633 third 300 250 40 720 &
 $ ruby scribe_stream.rb hoop-server.local 14634 forth 300 250 40 720 &

メッセージ長300bytesは現実のApacheのログ長の平均がこのくらいかな、というのに合わせた*8。いちおうベンチマークの目標としてはプロセスあたり 10,000 msg/sec を4並列の 40,000 msg/sec となる。実際には7/8の 35,000 msg/sec をいくらか下回る程度流れたようだ。
12時間で 1,493,340,000 行(約14億9千万行)。HDFS上のファイルをカウントしたところ1行の欠損もなく保存されていた。

以下グラフ。左の方の山は無視していただいて、問題はいちばん右の12時間分あるやつです。
まずトラフィックで、Hoopサーバ(役のNameNode)、fluentdサーバ、の順。緑がinputで青線がoutput。

fluentdサーバのinputは安定していて、これはベンチマークツールの出力を受けたもの。だいたい100Mbps。狙ってたわけじゃないけど偶然ですな。HoopサーバのoutputはHDFS DataNodeへの書き込みで、これも約100Mbpsで安定してる。
問題はfluentd(out_hoop)とHoopサーバの間の通信で、だいぶ上下してる。ちょっとアレな感じ。

  • Hoopへの通信は根本的に HTTP のオーバーヘッドを伴うので、実データに較べてちょっと増える
    • これは1プロセス時に確認したもので、多分バッファリングのサイズをもっと大きくすれば目立たなくなる
  • データのAppend時にHoop Serverが 500 (Internal Server Error) を返すことがあり、こうなると0.5秒くらいはエラーを返し続ける症状を示す
    • このため out_hoop で怪しげな500エラーの場合0.3秒のwaitを入れてからリトライしている

後者のリトライが重なったりするんだろうなーという気がする。Hoop Server の500応答は2並列30秒走行でも明確に発生が確認できる程度の頻度で起きるので、4並列だとさぞ頻発してるんだろうと思われる。確認してないけど。
とはいえ、まあ4並列でこのくらい速度が出るならいちおう及第点。ほんとはこの倍くらい出ることを確認しときたいんだけど、後で。

お次はCPU使用率ロードアベレージ。同じく左側がHoopサーバで右側がfluentdサーバです。基本的には問題ないでしょう。複数プロセス使えばCPUがそこそこ使えるようになりますねってだけですね。メモリは完全に全く問題なかったのでパス。

まあ、だいたいの用途には問題ない出来になっているのではないでしょうか!

まとめ

out_hoop を書いたことでfluentdからHDFSへの投入の道が開けました。みんなこれでログをどんどんHDFSに投入してガンガンHadoopタスクを回すと良いんじゃないでしょうか。
タグをパスに埋められるようにはどうにかしたい! match節いっぱい書くとか嫌だ!

*1:Hoopだけでぐぐったらなんか全然違うのが出てきたけど hoop hdfs でぐぐったらこのblogがトップに! 俺つおい!

*2:正確にはログに書かれている時間は input plugin 側でparseしてやる必要があって in_scribe には現在その機能はない、のであんまり正確な説明ではない。各言語向けのloggerなどはログを出力した時刻をとるようになってると思うのであんまり気にする必要はないだろう。Apache等のログ時刻のparseをどうするかは自分の場合ちょっと違うアプローチをとるので in_scribe への機能追加は後回し……。たぶんそのうちやる。

*3:の数十秒後

*4:時間とタグの2値でバッファのキーにする、それにあわせてAPIを少し変える?

*5:そもそもCloudera版Hoopのコードで動作確認してるんだけど現状の開発はApache Hadoopのツリー上でHDFS httpfsとして行われてるからどうしていいのやら。

*6:この変更は自分でパッチ書いた、けどまだリリースされてないかな? githubからcloneしてfluentd起動時に -p オプションでプラグインを個別指定する。

*7:実は通番桁数を8桁にしちゃったため、12時間で億の桁にいってしまってあんまり役に立たなくなってしまったけど。

*8:最近おまえらUser-AgentとかRefererとか長過ぎだろ!!!