たごもりすメモ

コードとかその他の話とか。

fluentd v1 configのチェッカを作った

tagomoris/fluentd-v1-checker · GitHub
fluentd-v1-checker | RubyGems.org | your community gem host

Fluentdの設定ファイルについてはv1 config という新書式がある。v0.14からはデフォルトでこちらの書式でパースが行われる予定。
ここに @repeatedly さんが書いてるけど、今のうちから v1 config で書いて --use-v1-config しておくと将来的に困らなくてよいと思う。書式的にも綺麗になっているはず。

ところで v1 config といってもいろいろな経緯から、これまでの書式の設定ファイルをぶちこんでもなんとなくパースできるようになっている。値に # とかを含めたいとかでなければ設定を変更しなくても通る場合も多い。

「場合も多い」これが困る。起動してみて動かないとか、思ったのと違う動きかたをしている、とかだとたいへん困る。確認も面倒。
面倒だったので、両方の書式でパースしてみて結果が一致するかどうかを確認してくれるチェッカを作りましたよ、という話でした。

$ gem install fluentd-v1-checker
$ fluentd-v1-checker /path/to/fluentd/config/file/fluentd.conf

これで従来書式と v1 との両方でパースし、結果に差分がなければそのまま終了する。差分があるとこんな感じでdiffを出してくれる。赤が従来書式としてパースした結果、緑がv1。(これはv1書式で書いたので、従来書式としてパースすると当然差分が出る。)

差分が出るようだと --use-v1-config でFluentdを起動したときにこれまでとは異なる設定で動いてしまう、ということだ。

じゃあ v1 書式で新しく設定を書いたときに、従来の書式の設定ファイルと同じ指定になっているかどうかをチェックしたい場合はというと、そのためのオプションがある。

$ fluentd-v1-checker --with-v1-conf fluentd.v1.conf fluentd.v0.conf

これでOK。ちゃんと書式の移行ができたかがわかる。べんり! これで安心して --use-v1-config つきの起動に移行できますね!

HadoopのXML設定のdiffをとるスクリプトを雑に書いた

Hadoopの設定は name, value, final, description の4要素しか持たない上に /configuration/property/attr の階層しかないくせにXMLになっていて、これがもう、ひっじょーにdiffがとりにくい。普通のテキストdiffでは無理といっていい。
おかげで新しいディストリビューションやバージョンが出たとき、そのオススメ設定がこれまで使っていた設定とどの程度違うのかがまったくわからず、毎回死にそうな目にあう。つらい。

ので、この単純な構造のXMLに特化した、雑にdiffをとるためのスクリプトを書いた。これを使うとdiffの結果がこんな感じになる。

超便利でしょ?

もちろん差分のないプロパティについては表示されていない。あと色付けは colordiff*1 および less -R にまかせた。
このスクリプト hadoop_xml_diff.rb は以下のように雑に使う。

$ ruby hadoop_xml_diff.rb path/to/xml/before.xml path/to/xml/after.xml | colordiff | less -R

https://gist.github.com/tagomoris/8f1bd08a7e9c363b9752#file-hadoop_xml_diff-rb

Enjoy!

*1:OSXなら brew install colordiff

Ansible playbookで1台でも失敗したら即座に実行を止める方法

毎回あれー?ってなってドキュメント読んだりググったりするんだけどうまく見付けられないので書いておく。

Ansibleはデフォルトだとtaskの実行が全ホストで失敗しない限り続く。言いかえると、あるタスクがあるホストで失敗したらそのホストについては以降実行されないが、成功した他のホストについてはplaybookの実行は継続される。
これは特に分散ストレージのセットアップなどにおいて、大変よくない。

よくないので1台でもtaskの実行に失敗した時点で止めたいというタスクには以下のように書いておく。

- hosts: ippai
  max_fail_percentage: 0
  tasks:
    - name: ....

failしているホストのパーセンテージが max_fail_percentage を「超えた」場合にplaybookが停止するので、1台でも fail したら即座に止めたい場合は 0 を指定する。

なおここに書いてある。なんでこれ毎回見付けられないんだろうな自分。
http://docs.ansible.com/playbooks_delegation.html#maximum-failure-percentage

RubyConf 2014にいってきた&しゃべってきた

出していたproposalが通ったので Rubyconf2014 に行ってきていた。旅費および会期中の宿泊費は現勤務先のLINE株式会社に出してもらいました。いつもいつもありがたいことです。

サンディエゴの会場付近はとにかくリゾート地っぽい感じで、あちこちに背の高いヤシがぽんぽん立っており、空も海も青いし、なるほどこれは国民性も変わろうというものだ、という感じ。初のアメリカ行きがこれだったので、USに対してだいぶ変なバイアスがかかった可能性がある。

そこでまたNorikraの話をした。何回目だと言われるかもだけど、英語だとまだ2回目だったし、英語圏でのカンファレンスでは初めてだったので……。

人によっては*1多少ウケたっぽいのでよかったよかった、ということにする。が、これ以上無闇に話しにいってもしょうがない気もするので、この活動は当面ではここまでかなー。

RubyConfに参加しての感想とか印象に残ったセッションとかは、ちょうどSFにいったときにゲストに呼んでもらった rebuild.fm 68 でしゃべったので、そっちを聞いていただくのが書く面倒がなくていいです。
ぶっちゃけて言ってしまうと、技術の話を聞くなら日本国内のカンファレンスのほうが面白いなあ、という印象。だからたぶん、ああいうところには、そもそも異なる文化についての見聞を広めるつもりで行くのが正しいんだろうと思う。

サンディエゴは海軍に縁の深い場所ということで、退役した空母ミッドウェーが丸ごとそのまま博物館として公開されてたりして、これがまた超面白かった。日本では絶対に見られない感じで非常によい。

食事は……まあ、選べばおいしいところはある、かなあ、というくらい。写真のメキシコ料理屋はすごくおいしかったが、この前日に行ったメキシコ料理屋はいまいちだった。あとスパゲティ専門店に行ったら、噂に漏れ聞いた「アルデンテという言葉を理解しているのはイタリアの外には日本人しかない」という言葉の信憑性が大幅に増した。

今回はせっかく太平洋を渡ったので、休暇をくっつけて日程を延ばし、ベイエリアにもちょっと行ってきた。写真はMountain Viewでフラグ立てとばかりに行ってきたコンピュータ歴史博物館。超面白かった。あれは一度行くといいが、閉館まで2時間半という時刻に行ったのがちょっと残念だった。半日丸々確保しておくべきだった。

そのほか、サンフランシスコ市内の知り合いに連れていってもらってステーキ食べた*2り、別の人に案内を頼んで市内観光したり、前述のとおりrebuild.fmに出ることになって録音したり。なんかいろいろだった。皆様ほんとうにお世話になりました。おかげさまで短いながらも濃くて楽しい滞在でした。

ただまあ、あそこに自分が住むことはないだろうなあ、と思う。出掛けると東京は自分に合っているということを再認識できるなあ。

あとはアレだ、アメリカ西海岸といえばクラフトビールですよ。

ビールビールビールビールビールビール。計8泊の滞在で30杯(20数種類)くらいは飲んだと思うが記憶が定かではない。

鞄を亡くすこともコーヒーをかぶることもなく無事帰ってこられました。よかったよかった。

*1:既にストリーム処理をなにがしかやっている人とかだと

*2:これがまためっちゃうまかった

最近のHadoop distcpについて

Hadoopクラスタ間でデータを移動するdistcpについては実はHadoop2系で新しくなっており*1、いろいろ機能が増えている。
たとえば以下のようなコマンドが実行できる。

hadoop distcp -i -m 20 -pb -bandwidth 2 webhdfs://cluster.old.local/path/of/data/dir hdfs://cluster.new.local/path/of/data/dir

このコマンドラインには、従来のdistcpでは不可能だったふたつの変更が含まれている。

bandwidth

前は指定できなかったが、今は mapper あたりが使う帯域をMB/sec単位で指定できる。べんり。

webhdfs://

前はバージョンの異なるHadoopクラスタ間でdistcpする場合、新しいクラスタでdistcpを実行し、古いクラスタ(source側)のパスは hftp:// で指定しろ、というのが普通であった。
Hadoop2だとwebhdfsが使えるようになっているはずなので、これを使うがよい、ということになっているようだ。

なおいつものようにoza先生に教えてもらった。

source側のデータ総量が大きくなっていると以下のような例外が出て失敗する状況になっており、大変困っていた。oza先生によるとhftpのglobのバグのように見える、とのことだった。

14/11/27 15:59:44 ERROR tools.DistCp: Exception encountered 
java.lang.NumberFormatException: For input string: "p"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Short.parseShort(Short.java:118)
	at java.lang.Short.valueOf(Short.java:174)
	at java.lang.Short.valueOf(Short.java:200)
	at org.apache.hadoop.hdfs.web.HftpFileSystem$LsParser.startElement(HftpFileSystem.java:437)
	at com.sun.org.apache.xerces.internal.parsers.AbstractSAXParser.startElement(AbstractSAXParser.java:509)
	at com.sun.org.apache.xerces.internal.parsers.AbstractXMLDocumentParser.emptyElement(AbstractXMLDocumentParser.java:182)

これが webhdfs:// を使うと何の問題もなく動いた。すばらしい。グレート。

*1:なおHadoop1系でも distcp2 と指定すれば使えるケースがあるようだ

MRv2/Tezで簡単にクエリのベンチをとった

Hiveしか使ってないので以下のオプションを設定するだけで使える。楽。

SET hive.execution.engine=tez;

なお HDP 2.1 with Hive 0.10, Tez 0.4 での話です。クラスタの概要は以下の通り。

  • master x3
  • slave x20
    • Xeon(R) CPU E5-2630L v2 (6core 12Threads) x2
    • RAM 64GB
    • HDD x12

シナリオ

データの流れとしては以下のようなシナリオを想定する。

  1. 外部から非圧縮plain text tsvでHDFS内のファイルにデータが書かれる
  2. LOADで hourly テーブルに読み込む
  3. dailyで INSERT により daily テーブルに書き込む
    • このときファイルフォーマット変換や圧縮を同時に行う
    • hourly テーブルの変換済みパーティションおよび元の生データは削除する
  4. 普段のクエリは daily テーブルに対して行う

テーブルのスキーマは以下のような感じで、まあ普通にあるWebサーバのアクセスログが入っていると思おう。あとパース時にいくつかフラグを立ててBOOLEANを追加してある感じ。

-- hourly
CREATE TABLE hourly (
       hhmmss STRING,
       vhost STRING,
       path STRING     COMMENT 'with query',
       method STRING,
       status SMALLINT,
       bytes BIGINT    COMMENT 'response body size',
       duration BIGINT COMMENT 'micro sec',
       referer STRING,
       rhost STRING,
       userlabel STRING,
       agent STRING,
       flag BOOLEAN,
       f_redirection BOOLEAN,
       f_errors BOOLEAN,
       f_internal BOOLEAN,
       f_miscfile BOOLEAN,
       f_imagefile BOOLEAN,
       f_bot BOOLEAN,
       query STRING    COMMENT 'request query param json',
       opts STRING     COMMENT 'log optional values json'
)
PARTITIONED BY (service STRING, yyyymmddhh STRING, loadseq STRING)
ROW FORMAT DELIMITED
 FIELDS TERMINATED BY '\t'
 LINES TERMINATED BY '\n'
STORED AS TEXTFILE;

-- daily
CREATE TABLE daily (
       hhmmss STRING,
       vhost STRING,
       path STRING     COMMENT 'with query',
       method STRING,
       status SMALLINT,
       bytes BIGINT    COMMENT 'response body size',
       duration BIGINT COMMENT 'micro sec',
       referer STRING,
       rhost STRING,
       userlabel STRING,
       agent STRING,
       flag BOOLEAN,
       f_redirection BOOLEAN,
       f_errors BOOLEAN,
       f_internal BOOLEAN,
       f_miscfile BOOLEAN,
       f_imagefile BOOLEAN,
       f_bot BOOLEAN,
       query STRING    COMMENT 'request query param json',
       opts STRING     COMMENT 'log optional values json'
)
PARTITIONED BY (service STRING, yyyymmdd STRING)
STORED AS RCFILE;

評価

INSERT

hourlyからdailyへのINSERTにかかる所要時間を計測する。
以下のクエリを MRv2 と Tez で共に用いる。MRv2用のオプションも含めてすべて共通で設定する*1。日時については適当な範囲で指定。

SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.optimize.sort.dynamic.partition=false;
SET hive.exec.reducers.max=4096;
SET mapreduce.job.reducer=2048;
 
FROM hourly
INSERT OVERWRITE TABLE daily PARTITION (service,yyyymmdd)
  SELECT hhmmss,vhost,path,method,status,bytes,duration,referer,rhost,userlabel,agent,
         flag,f_redirection,f_errors,f_internal,f_miscfile,f_imagefile,f_bot,
         query,opts,
         service, 'yyyymmdd'
  WHERE service LIKE '%' AND yyyymmddhh LIKE 'yyyymmdd%'

非圧縮で約650GB程度のデータを対象にこのクエリを実行したところ、以下のような結果だった。

  • MRv2
    • 17分48秒 (1068秒)
  • Tez
    • 11分53秒 (713秒)

713/1068 = 0.667.. なので、まさに1.5倍ちょうど速くなっている。

SELECT

続けてこのようにして生成したdailyテーブルに以下のような集計クエリをMRv2、Tezでそれぞれ実行し、結果が出るまでの時間を計測した。このときは特に mapreduce.job.reduce などのオプションは指定していない。

SELECT
  hhmmss, COUNT(*) AS c
FROM access_log1
WHERE service LIKE '%' AND yyyymmdd='20141114'
GROUP BY hhmmss
ORDER BY c DESC LIMIT 10

結果は以下の通り。

  • MRv2
    • 436秒
  • Tez
    • 179秒

179/436 = 0.410.. なので2.5倍速い。このクエリはMapReduceだと2段のmapreduceに分解されて実行されることはHive大好きなみなさんならクエリを見た瞬間にわかると思うが、やはりそのようなクエリにおいては1ジョブで効率的に実行できるTezが更に有利になる。

そしてこのような単純な集計クエリは非常に多く実行されている。

まとめ

Tezいいんじゃないですかね! たまに落ちるという話も聞きますが!

ただResourceManagerからHistory URLたどっていっても何も見えないとか、いろいろ実行状況わかんないのは自分の設定のせいなのかなあ。これはトラブル時にだいぶ困る感じがしますね。

*1:当然だが hive.execution.engine=tez は除く

Hive dynamic partition insertsにまつわるいくつかの問題と対処について

だいぶ前のHiveの機能準拠で作ってたクラスタを大幅に作り直したので、ついでにETL処理をdynamic partition inserts一発でやればMapReduce 1ジョブで済んで超効率的に! やった! と思ったらいくつかハマったのでメモ。

なおdynamic partition insertsについては説明が面倒なので公式Wikiの該当ページを読むとよい、が、簡単に言うとHiveでパーティションにINSERTするときにINSERT先のパーティション指定をSELECTクエリの出力により行う、というもの。

なお断りがない限りは HDP2.1 with Hive 0.13 の環境で試したものとする。(移行元はCDH4)

クエリの書き方

単純に言うと、パーティションとして指定したいカラムは SELECT 句の最後に置かなければならない。

簡単に言うと year=INT/month=INT/day=INT というパーティション階層になっているテーブルにINSERTするとき、month と day をSELECT句から取り出したい場合には、以下のようにクエリを書かなければならない。

INSERT OVERWRITE TABLE tbl PARTITION (year=2014, month, day)
SELECT
  data1, data2, data3, data4,
  month, day
FROM source_table WHERE ...

dynamicに指定したいカラムだけを書いておく。

ハマった

いちばんわかりにくくハマったのは INSERT クエリを流したとき、なぜか毎回いくつかのReducerだけが他のReducerよりもはるかに長く動き続けており、全体の処理の規模が大きいときはそれこそ永遠にお待ちください状態になる、という挙動。

最初は MRv2 on YARN 移行に伴うものかと思ったが、違うっぽい。よく見てみると毎回 shuffle で莫大な時間を消費しているようだった。*1設定をあれこれ見ていたらこんなのが見付かった。

hive.optimize.sort.dynamic.partition

  • Default Value: true in Hive 0.13.0 and 0.13.1; false in Hive 0.14.0 and later (HIVE-8151)
  • Added In: Hive 0.13.0 with HIVE-6455

When enabled, dynamic partitioning column will be globally sorted. This way we can keep only one record writer open for each partition value in the reducer thereby reducing the memory pressure on reducers.

Configuration Properties - Apache Hive - Apache Software Foundation

実は最初にCDH4のクラスタでほぼ同じことをやったときは問題なく終わってて、HDP 2.1 のクラスタでやったときにはまった。このせいっぽい。Hive 0.13 においてはこのオプションがデフォルトで有効になっており、dynamic partition insertsを使うときに自動的に出力をsortにかける。しかもその record writer は必ずひとつのreducerとなる。
このため、出力先のパーティションごとに考えたときにこのサイズが巨大なものがあると、sortがいつまでも終わらず非常に長い時間がかかる、ということらしい。

気付いてこれを false にしたらパーティション全体に対するsortが行われなくなり、reducerが走らずそのまま出力がパーティションに書かれるようになったので、妙に時間がかかるようなこともなく終わるようになった。めでたしめでたし。

なお Hive 0.14 ではデフォルト false になっている。やっぱみんなハマったんじゃないかな。

他にもいくつか

Hive on MRv2 on YARNで気になったところがあったので書いておく。

hive.exec.dynamic.partition
  • Default Value: false
  • Added In: Hive 0.6.0
  • Whether or not to allow dynamic partitions in DML/DDL.

有効にしましょう。

hive.exec.dynamic.partition.mode
  • Default Value: strict
  • Added In: Hive 0.6.0
  • In strict mode, the user must specify at least one static partition in case the user accidentally overwrites all partitions. In nonstrict mode all partitions are allowed to be dynamic.

簡単に言うと、dynamic partition inserts時、strict になっていれば、少なくとも1段のパーティションについてはクエリ内で明示的に指定されていなければならない(SELECT結果を使わずに指定しなければならない)。

これのパーティション指定用カラムをトップレベルからすべて動的に指定したい場合、このモードを nonstrict として指定してやる必要がある。

SET hive.exec.dynamic.partition.mode=nonstrict;

INSERT OVERWRITE TABLE tbl PARTITION (year, month, day)
SELECT
  data1, data2, data3, data4,
  year, month, day
FROM source_table WHERE ...

トップレベルのパーティションを無尽蔵に増やしてしまうと管理がものすごく大変になるから、親切心でデフォルトを strict にしてあるんだと思う。DROP PARTITIONも面倒になるし。覚悟がある人だけ nonstrict にしましょう。

hive.exec.max.dynamic.partitions
  • Default Value: 1000
  • Added In: Hive 0.6.0
  • Maximum number of dynamic partitions allowed to be created in total.

1回のdynamic partition insertsクエリで生成されるパーティション数の上限。日付とかでpartitionを作っている分には超えないが、何かしらの商品コード等を用いてパーティション作成する場合にはけっこうひっかかるかもしれない。注意。
ひっかかったらクエリ失敗時に例外にメッセージ出てるはずだから、増やそう。

hive.exec.max.dynamic.partitions.pernode
  • Default Value: 100
  • Added In: Hive 0.6.0
  • Maximum number of dynamic partitions allowed to be created in each mapper/reducer node.

1回のdynamic partition insertsクエリにおいて、ひとつのmapper/reducerで作られるパーティション数の上限。前項と同じでひっかかるケースはあると思う。注意。ひっかかったら上げる、でいいと思う。

hive.exec.reducers.max と mapreduce.job.reducer

hive.exec.reducers.max はデフォルトが999で、この数字は mapreduce.job.reducer よりも優先される。これがわかってなかったので、なぜか毎回 reducers が999になり、あれー? と思っていた。注意。 oza先生に教えておもらいました。多謝。

が、これは前述の hive.optimize.sort.dynamic.partition=false により意味がなくなった。

結論

みんなで hive.execution.engine=tez を指定しよう!

*1:これMRv1 JobTrackerの画面に較べてMRv2の画面に出てこなくなったので、だいぶ長いこと気付かなかった