読者です 読者をやめる 読者になる 読者になる

たごもりすメモ

コードとかその他の話とか。

複数クライアントからHiveServerへの同時接続

Hive

Hadoopクラスタは(ノードの処理能力が許すかぎり)複数のジョブを並行に起動できるが、その一方でHiveServerはシングルスレッドで動作し、複数のクライアントを(厳密な意味で)同時に相手にはできない。そのあたりはWikiに書いてある。

The HiveServer is currently single threaded, which could present serious use limitations. See JIRA: HIVE-80.

HiveServer - Apache Hive - Apache Software Foundation

結論から先に書くと「Hiveクエリの結果が出てから、そのクエリを発行したクライアントが結果を全部フェッチし終わるまで」がシングルスレッド動作の対象、ということになるらしい。あるクライアントが実行したクエリの処理がHadoopクラスタ上で終わっても、そのとき別のクライアントが結果をフェッチ中であれば、終わるまで待たされる。

実験

複数のクライアントそれぞれから見たときに実際にどこで待たされるのかは興味がある。ので、実験してみた。スクリプトこちら
node.jsのスクリプトから2セッションを並行に張って、それぞれからクエリを実行している。結果はこんな。

tagomoris$ NODE_PATH=lib node hive_multi_client.js 
conn2: connected.
conn1: connected.
conn2: executing.
conn1: executing.
conn2: executed.
conn2: fetched.
conn2: result: [ 'README.md\t1\thoge',
  'bin\t1\thoge',
  'derby.log\t1\thoge',
  'extlib\t1\thoge',
  'hql\t1\thoge',
  'js\t1\thoge',
  'lib\t1\thoge',
  'metastore_db\t1\thoge',
  't\t1\thoge' ]
conn1: executed.
conn1: fetched.
conn1: result: [ 'README.md\t1',
  'bin\t1',
  'derby.log\t1',
  'extlib\t1',
  'hql\t1',
  'js\t1',
  'lib\t1',
  'metastore_db\t1',
  't\t1' ]
tagomoris$ 

時刻を出してないからアレだけど、MapReduce自体はHiveServerが execute メッセージを受け取ったら裏側で走っているので、ふたつめのクライアント(ここでは conn1)もそこまで待たされるわけではない。時系列順だと以下のようになる。

  1. HiveClientがHiveServerに接続する
  2. HiveClientが execute メッセージを送る
    • HiveClientは待たされる (node.jsだとコールバックが呼ばれるのが待たされる、他の言語だとブロックする)
      • MapReduceが走って結果が出るまで待たされる
    • もし他のHiveClientが同時に接続している場合、待たされる時間が長くなる可能性がある
      • 自分より前にクエリを実行しているクライアントがいる場合、そのクライアントが結果をフェッチし終えるまでは自分も待たされる
  3. HiveServerが execute メッセージのレスポンスを返す
    • ここでクライアントの execute コールバックが呼ばれる(もしくはブロックが解けて次に処理が進む)
  4. HiveClientがfetchOne/fetchN/fetchAllメッセージを送ったりする
    • 結果を全部読み出した時点で、そのクエリに関する処理が終わったと見做される
    • こうなって初めて、待たされていた他のクライアントが execute メッセージのレスポンスを受け取れる

MapReduceが裏で並行に走ってくれるのは嬉しい。結果の読み出しによっぽど時間をかけない限りはあんまり問題にならなさそうだ。

クエリがエラーの場合は?

Hiveクエリが明らかにエラーの場合はMapReduceが走らないはず。が、他のクライアントのMapReduce終了を待たいとエラーすら分からないとかだと厳しいなあ……と思って、試した。こちら

  • conn1
    • コネクション確立後、5秒後に execute
    • parse errorになる(HiveServerが判断可能な)エラーになるクエリを実行: select x, count(*) from q group by x (テーブル名が違う)
  • conn2
    • コネクション確立後、1秒後に execute
    • 普通のクエリ

結果、クエリのエラーが出たぶんは先に実行されたMapReduceが実行中であってもクライアントにすぐ返される。このことから、クライアントへのレスポンスの順序は、executeメッセージを送った順ではなく、結果が先に出た順、だと思われる。

tagomoris$ NODE_PATH=lib node hive_multi_client.js 
conn2: connected.
conn1: connected.
conn2: executing.
conn1: executing.
conn1: executed.
conn1: error on execute(): { name: 'HiveServerException',
  message: 'Query returned non-zero code: 10, cause: FAILED: Error in semantic analysis: line 1:24 Table not found q',
  errorCode: 10,
  SQLState: '42S02' }
conn1: fetched.
conn1: result: []
conn2: executed.
conn2: fetched.
conn2: result: [ 'README.md\t1\thoge',
  'bin\t1\thoge',
  'derby.log\t1\thoge',
  'extlib\t1\thoge',
  'hql\t1\thoge',
  'js\t1\thoge',
  'lib\t1\thoge',
  'metastore_db\t1\thoge',
  't\t1\thoge' ]
tagomoris$ 
クエリの処理所要時間が違うケース

もう少し試した。わざと複雑なクエリを一方で発行して、後でexecuteしたクエリが先に終わるようにしてみた。これ

  • conn1
    • コネクション確立後、5秒後に execute
    • 普通のクエリ: select x, count(*) from p group by x
  • conn2
    • コネクション確立後、1秒後に execute
    • 割と複雑なクエリ: select x, count(*) as cnt, "hoge" from p group by x sort by cnt desc limit (3段のmapreduceに展開される)

結果はこんな。

tagomoris$ NODE_PATH=lib node hive_multi_client.js 
conn2: connected.
conn1: connected.
conn2: executing.
conn1: executing.
conn1: executed.
conn1: fetched.
conn1: result: [ 'README.md\t1',
  'bin\t1',
  'derby.log\t1',
  'extlib\t1',
  'hql\t1',
  'js\t1',
  'lib\t1',
  'metastore_db\t1',
  't\t1' ]
conn2: executed.
conn2: fetched.
conn2: result: [ 'README.md\t1\thoge',
  'bin\t1\thoge',
  'derby.log\t1\thoge',
  'extlib\t1\thoge',
  'hql\t1\thoge',
  'js\t1\thoge',
  'lib\t1\thoge',
  'metastore_db\t1\thoge',
  't\t1\thoge' ]
tagomoris$

クエリ自体はconn2側のの方が先にHadoopクラスタに渡されてるんだけど、先に結果が返ってきた conn1 の方が execute のレスポンスを受け取ってる。これで確認できた。