たごもりすメモ

コードとかその他の話とか。

HiveでLOAD粒度を自由に変更可能にするパーティション構成

ひとことで言うと「通常必要なパーティション階層の更に下にもうひとつパーティションを掘っておけ」だけ。

普通のパーティション設計

仮にWebサーバのアクセスログを投入するためのテーブル accesslog があるとする。このテーブルを、まずサービス名 service カラムでパーティショニングし、さらに日毎にパーティショニングするとしよう。データは外部でタブ区切りにparseしておき、それを毎日LOADする。

このようなテーブルになる。

CREATE TABLE accesslog (
  rhost STRING,
  time STRING,
  method STRING,
  path STRING,
  status SMALLINT,
  bytes BIGINT,
  referer STRING,
  useragent STRING
) 
PARTITIONED BY (service STRING, yyyymmdd STRING)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS TEXTFILE

このテーブルに一日一回、外部で準備したファイルを持ってきてLOADする。やった、クエリ可能になった!
まあ特に問題ないよね。

あのー、前日のログしか見られないのつらいんですけど、毎時ロードとかできないの?

いやちょっと待って、毎時? なら時間単位のパーティションを増やして……パーティション階層の追加はテーブル作り直しだ! 詰んだ! いやでもやるしかないか……

(長期間の苦闘の結果) どうだ時間単位のパーティション階層を増やしたぞ! 毎時LOADできるぜ!これでどうだ!

あのー、5分毎ロードとかにできね?

ぎゃー!

HiveのLOADの問題

HiveのLOADは、LOAD先パーティションが既に存在する場合は以下のように動作する。

  • OVERWRITEをつけた場合
  • OVERWRITEをつけない場合
    • 既存パーティションのディレクトリはそのまま、LOAD対象のファイルがそのディレクトリに mv され追加される
    • このとき重複するファイル名のデータが既に存在する場合、それは上書きされる

たとえば libhdfs や WebHDFS などを用いてHDFS上のファイルに直接追記しているような場合、このような挙動はたいへん困る。一例は以下のような感じ。

  1. 外部から /path/of/log/serivceX-2013-01-24.log に追記で書き込む
  2. 半日に1度LOADすることに変更したい!
    • 12時に LOAD DATA inpath '/path/of/log/serviceX-2013-01-24.log' INTO TABLE accesslog PARTITION service='serviceX', yyyymmdd='20130124' する
    • これで service=serviceX/yyyymmdd=20130124 に serviceX-2013-01-24.log ができる
  3. LOAD後にはまた追記されたデータが /path/of/log/serviceX-2013-01-24.log に書き込まれる
    • 24時に LOAD ……ああっ、半日分しか無いデータで serviceX-2013-01-24.log が上書きされた!

LOADを自由に制御可能にするための方法はふたつある。順に解説するが、後者がオススメ。

ファイル名を頑張ってコントロールする

要するに LOAD したときにファイル名がカブらなきゃいいんである。ということでLOAD前にHDFS上で以下のような処理を行う。

  1. 直接LOADせず、いったんtmp用ディレクトリに mv する
  2. ファイル名をチェックし、既存パーティションに存在するものとかぶらないファイル名に変更する
    • 前のmvで一緒にやってもいいが
  3. LOADする

読み込みタイミングからキー的なものを生成してそれを使うなどの方法もある。実行時秒(epoch) + 同時に読み込まれるファイル数のシーケンス、みたいな感じ。そうすればHDFS上に既にあるかどうかのチェックを省略できる。
が、複数箇所からのLOADで重複するケースなどが少し怖い。知らずに上書きされちゃうような状況になったとき、その検出が非常に困難であると同時に、気付いたときには既に対処不可能になっているから。

またそもそもHDFS上での move やファイル名チェックはけっこうめんどくさい。Hiveクライアント経由だけでできない。WebHDFSを使うなどで全体の処理の複雑度が上がる。もうちょっと単純にやりたいですね。

パーティション階層を増やす

前項でファイル名でやっていたようなことをパーティションでやる、と言えばわかりやすい。具体的にはロードタイミングによって値が決まるパーティション階層をひとつ増やしておく。

CREATE TABLE accesslog (
  rhost STRING,
  time STRING,
  method STRING,
  path STRING,
  status SMALLINT,
  bytes BIGINT,
  referer STRING,
  useragent STRING
) 
PARTITIONED BY (service STRING, yyyymmdd STRING, loadseq STRING)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS TEXTFILE

パーティション階層 loadseq が増えただけ。このテーブルにLOADでデータを追加するときは以下のようにする。

  • LOAD 時に epoch + 実行ホスト名 + PID とかでキーを作り loadseq にセットする

これだけ。

既存パーティションに存在するかどうかはHiveクライアント経由の操作のみで確認できる。別ホストや同一ホストからの同時実行での重複もキーをちゃんと作っておけば避けられるので基本的には安全。またこの方法であればいくらでもLOADの頻度を上げる(粒度を細かくする)ことが可能になる。

通常クエリを実行する場合は service と yyyymmdd にだけ注目しておき、loadseqは無視すればいい。クエリ実行時の時点で指定した yyyymmdd 以下にあるパーティションは自動的にすべてクエリ対象となる。

利点と注意点

現在あるテーブルを変更してもクエリの互換性があるというのが大きな利点。loadseqカラムは利用者*1には単に知らせなければいい。
また前にも書いたがHiveクライアント以外のクライアント経由での操作を必要としないのも大きい。シンプルさ重要。

注意点はパーティション数が増えることで、NameNodeのヒープ容量に影響を与える可能性がある。まあそのあたりはファイルであってもあまり変わらない*2

もっと言うと、HDFSに直接外部から追記しているときは当然だが圧縮できないので、手元では以下のような構成にしている。

  1. WebHDFS経由で追記
  2. 時間単位でパーティションを切っているテーブルにLOAD
    • ここに loadseq を導入
  3. 24時間経過したら日単位でパーティションを切っているテーブルにINSERT
    • このときに同時に圧縮する
    • 読み込み対象のパーティションは処理完了後にDROPしてしまう

こうしておけば loadseq つきのパーティションは日毎の再編成処理が終わったら削除できるので、HDFSメタデータが増える量を24時間分に限定できる。まったく問題ない。

結論

わーい、何分おきだろうと自由にLOADできるようになったぜ!

追記 (2/5)

「時間単位テーブルと日単位テーブルでデータが分断されて統一的にクエリできないよねそれ、いいの?」という反応が何件かあった。んー、とりあえず手元ではそれでいい、んだけど、それを避けようと思うと何か方法があるかな、と考えてた。
以下のような方法がある。

  • パーティション構成を「余分に2段階深く掘る」
    • service/hourly_partition/format_partition/loadseq のようにする(format_partitionが新しく現れたもの)
  • 作業用の一時挿入用テーブルを用意する
    • service/hourly_partition
    • 行およびファイルフォーマットは本テーブルにあわせておくこと

これらを用いて以下のようにする。

  1. 通常は(追記 + LOADで) service=blog/yyyymmddhh=2013020523/format=raw/loadseq=..... のような階層に取り込んでいく
    • これは非圧縮なので、どこかで圧縮する作業が必要
  2. 適当なタイミング*3で以下の処理を行う
    • raw なパーティション service=blog/yyyymmddhh=2013020523/format=raw から読み込み、圧縮しつつ、一時挿入用テーブルの service=blog/yyyymmddhh=2013020523 へlNSERTする
    • 完了後に以下の2クエリを連続して行う
      • 一時挿入テーブルの該当パーティションから、本テーブルの service=blog/yyyymmddhh=2013020523/format=gz/loadseq=X にLOADする
      • 本テーブルの service=blog/yyyymmddhh=2013020523/format=raw を DROP PARTITION する
    • 上記クエリ完了後に一時挿入テーブルのパーティションはDROPしておく

こうすれば通常の集計クエリを実行する側は各パーティションの圧縮/非圧縮を気にする必要なく、かつ長期的な保存のため圧縮とパーティション統合も行った状態で格納することができる。テーブルの分断もない。

ただしHiveにはトランザクションが無いため、一時挿入テーブルから本テーブルへの LOAD 操作と raw なパーティションの DROP の間に集計クエリが走ってしまうとデータが倍に重複して見えてしまう*4。いつどのようなタイミングでクエリが投げられるのかを全くコントロールできない場合はこれもきついかも。
何かこれを回避する方法があるかな……。 LOAD OVERWRITE をうまく使えば方法がありそうな気もしなくもない、が、元が複数で書き込みデータがひとつだとなあ。

*1:……がHiveテーブルの裏側を知らないとすれば

*2:正確にはディレクトリの増分があるので最悪2倍の影響がある

*3:この時間パーティション 2013020523 へのロードがもう発生しないと思われるタイミング

*4:あるいはDROPとLOADの順序を逆にすれば、データが無いように見える