たごもりすメモ

コードとかその他の話とか。

HiveServerを使用してnode.jsからHiveQLを実行する

HiveServerはThriftプロトコルをしゃべるので、おなじくThriftで接続すると任意のクエリを発行できたりして大変便利。ということで node.js からもやった。以前からやってる内容の続き。

node.jsから接続する

node.jsでnpm install thrift で取得できる thrift ライブラリは2011/04/08現在ではBufferedTransportに対応していない。v0.7.0相当のものが出てきたら多分対応版になるんじゃないかと思うが、それまでは tagomoris/node-thrift · GitHub から持ってきてもらう必要がある。

$ cd tmp
$ git clone git://github.com/tagomoris/node-thrift.git
$ cd ../yourproject
$ mkdir lib; cd lib
$ cp -r ../../tmp/node-thrift/lib/thrift ./

続けて、thrift定義ファイルから js:node を指定してスクリプトを生成する。

$ thrift -I ~/tmp/hive-0.6.0/src -I ~/tmp/hive-0.6.0/src/service -I ~/tmp/hive-0.6.0/src/service/include -I ~/tmp/hive-0.6.0/src/metastore --gen js:node ~/tmp/hive-0.6.0/src/service/include/thrift/fb303/if/fb303.thrift
$ thrift -I ~/tmp/hive-0.6.0/src -I ~/tmp/hive-0.6.0/src/service -I ~/tmp/hive-0.6.0/src/service/include -I ~/tmp/hive-0.6.0/src/metastore --gen js:node ~/tmp/hive-0.6.0/src/metastore/if/hive_metastore.thrift
$ thrift -I ~/tmp/hive-0.6.0/src -I ~/tmp/hive-0.6.0/src/service -I ~/tmp/hive-0.6.0/src/service/include -I ~/tmp/hive-0.6.0/src/metastore --gen js:node ~/tmp/hive-0.6.0/src/service/if/hive_service.thrift
$ thrift -I ~/tmp/hive-0.6.0/src -I ~/tmp/hive-0.6.0/src/service -I ~/tmp/hive-0.6.0/src/service/include -I ~/tmp/hive-0.6.0/src/metastore --gen js:node ~/tmp/hive-0.6.0/src/ql/if/queryplan.thrift

実行したら lib の中に thrift と gen-nodejs の両ディレクトリができている。あとはこんな感じのスクリプトを作って走らせればいい。

var thrift = require('thrift'),
    ttransport = require('thrift/transport'),
    ThriftHive = require('gen-nodejs/ThriftHive');

var connection = thrift.createConnection("localhost", 10000, {transport: ttransport.TBufferedTransport, timeout: 600*1000}),
    client = thrift.createClient(ThriftHive, connection);

connection.on('error', function(err) {
    console.error(err);
});

connection.addListener("connect", function() {
    client.execute('select count(*) from p', function(err){
        console.error("pos");
        if (err) { console.error("error on execute(): " + err); process.exit(1); }
        
        client.fetchAll(function(err, data){
            if (err){ console.error("error on fetchAll(): " + err); process.exit(1); }
            console.error(data);
            connection.end();
            process.exit(0);
        });
    });
});

上記スクリプトを以下のようにして実行。

$ NODE_PATH=lib node hive_sample.js

できたできた!

複数クライアントからHiveServerへの同時接続

Hadoopクラスタは(ノードの処理能力が許すかぎり)複数のジョブを並行に起動できるが、その一方でHiveServerはシングルスレッドで動作し、複数のクライアントを(厳密な意味で)同時に相手にはできない。そのあたりはWikiに書いてある。

The HiveServer is currently single threaded, which could present serious use limitations. See JIRA: HIVE-80.

HiveServer - Apache Hive - Apache Software Foundation

結論から先に書くと「Hiveクエリの結果が出てから、そのクエリを発行したクライアントが結果を全部フェッチし終わるまで」がシングルスレッド動作の対象、ということになるらしい。あるクライアントが実行したクエリの処理がHadoopクラスタ上で終わっても、そのとき別のクライアントが結果をフェッチ中であれば、終わるまで待たされる。

実験

複数のクライアントそれぞれから見たときに実際にどこで待たされるのかは興味がある。ので、実験してみた。スクリプトこちら
node.jsのスクリプトから2セッションを並行に張って、それぞれからクエリを実行している。結果はこんな。

tagomoris$ NODE_PATH=lib node hive_multi_client.js 
conn2: connected.
conn1: connected.
conn2: executing.
conn1: executing.
conn2: executed.
conn2: fetched.
conn2: result: [ 'README.md\t1\thoge',
  'bin\t1\thoge',
  'derby.log\t1\thoge',
  'extlib\t1\thoge',
  'hql\t1\thoge',
  'js\t1\thoge',
  'lib\t1\thoge',
  'metastore_db\t1\thoge',
  't\t1\thoge' ]
conn1: executed.
conn1: fetched.
conn1: result: [ 'README.md\t1',
  'bin\t1',
  'derby.log\t1',
  'extlib\t1',
  'hql\t1',
  'js\t1',
  'lib\t1',
  'metastore_db\t1',
  't\t1' ]
tagomoris$ 

時刻を出してないからアレだけど、MapReduce自体はHiveServerが execute メッセージを受け取ったら裏側で走っているので、ふたつめのクライアント(ここでは conn1)もそこまで待たされるわけではない。時系列順だと以下のようになる。

  1. HiveClientがHiveServerに接続する
  2. HiveClientが execute メッセージを送る
    • HiveClientは待たされる (node.jsだとコールバックが呼ばれるのが待たされる、他の言語だとブロックする)
      • MapReduceが走って結果が出るまで待たされる
    • もし他のHiveClientが同時に接続している場合、待たされる時間が長くなる可能性がある
      • 自分より前にクエリを実行しているクライアントがいる場合、そのクライアントが結果をフェッチし終えるまでは自分も待たされる
  3. HiveServerが execute メッセージのレスポンスを返す
    • ここでクライアントの execute コールバックが呼ばれる(もしくはブロックが解けて次に処理が進む)
  4. HiveClientがfetchOne/fetchN/fetchAllメッセージを送ったりする
    • 結果を全部読み出した時点で、そのクエリに関する処理が終わったと見做される
    • こうなって初めて、待たされていた他のクライアントが execute メッセージのレスポンスを受け取れる

MapReduceが裏で並行に走ってくれるのは嬉しい。結果の読み出しによっぽど時間をかけない限りはあんまり問題にならなさそうだ。

クエリがエラーの場合は?

Hiveクエリが明らかにエラーの場合はMapReduceが走らないはず。が、他のクライアントのMapReduce終了を待たいとエラーすら分からないとかだと厳しいなあ……と思って、試した。こちら

  • conn1
    • コネクション確立後、5秒後に execute
    • parse errorになる(HiveServerが判断可能な)エラーになるクエリを実行: select x, count(*) from q group by x (テーブル名が違う)
  • conn2
    • コネクション確立後、1秒後に execute
    • 普通のクエリ

結果、クエリのエラーが出たぶんは先に実行されたMapReduceが実行中であってもクライアントにすぐ返される。このことから、クライアントへのレスポンスの順序は、executeメッセージを送った順ではなく、結果が先に出た順、だと思われる。

tagomoris$ NODE_PATH=lib node hive_multi_client.js 
conn2: connected.
conn1: connected.
conn2: executing.
conn1: executing.
conn1: executed.
conn1: error on execute(): { name: 'HiveServerException',
  message: 'Query returned non-zero code: 10, cause: FAILED: Error in semantic analysis: line 1:24 Table not found q',
  errorCode: 10,
  SQLState: '42S02' }
conn1: fetched.
conn1: result: []
conn2: executed.
conn2: fetched.
conn2: result: [ 'README.md\t1\thoge',
  'bin\t1\thoge',
  'derby.log\t1\thoge',
  'extlib\t1\thoge',
  'hql\t1\thoge',
  'js\t1\thoge',
  'lib\t1\thoge',
  'metastore_db\t1\thoge',
  't\t1\thoge' ]
tagomoris$ 
クエリの処理所要時間が違うケース

もう少し試した。わざと複雑なクエリを一方で発行して、後でexecuteしたクエリが先に終わるようにしてみた。これ

  • conn1
    • コネクション確立後、5秒後に execute
    • 普通のクエリ: select x, count(*) from p group by x
  • conn2
    • コネクション確立後、1秒後に execute
    • 割と複雑なクエリ: select x, count(*) as cnt, "hoge" from p group by x sort by cnt desc limit (3段のmapreduceに展開される)

結果はこんな。

tagomoris$ NODE_PATH=lib node hive_multi_client.js 
conn2: connected.
conn1: connected.
conn2: executing.
conn1: executing.
conn1: executed.
conn1: fetched.
conn1: result: [ 'README.md\t1',
  'bin\t1',
  'derby.log\t1',
  'extlib\t1',
  'hql\t1',
  'js\t1',
  'lib\t1',
  'metastore_db\t1',
  't\t1' ]
conn2: executed.
conn2: fetched.
conn2: result: [ 'README.md\t1\thoge',
  'bin\t1\thoge',
  'derby.log\t1\thoge',
  'extlib\t1\thoge',
  'hql\t1\thoge',
  'js\t1\thoge',
  'lib\t1\thoge',
  'metastore_db\t1\thoge',
  't\t1\thoge' ]
tagomoris$

クエリ自体はconn2側のの方が先にHadoopクラスタに渡されてるんだけど、先に結果が返ってきた conn1 の方が execute のレスポンスを受け取ってる。これで確認できた。

Hiveに対するThriftの各RPCレスポンス

メモ。HiveServer用に定義されている各メソッドをThrift RPCで呼んだときに何が返ってくるのよ、と。順序的には、基本的には execute をまずクエリつきで呼んで、それに対してクエリプラン・結果データのスキーマ・結果データそのものを取得する、というかたち。"getClusterStatus"だけはクライアントからいつでも発行できる。(のかな?)

コードはこんな感じ。

var thrift = require('thrift'),
    ttransport = require('thrift/transport'),
    ThriftHive = require('gen-nodejs/ThriftHive');

var connection = thrift.createConnection("localhost", 10000, {transport: ttransport.TBufferedTransport, timeout: 600*1000}),
    client = thrift.createClient(ThriftHive, connection);

connection.on('error', function(err) {
  console.error(err);
});

connection.addListener("connect", function() {
  client.getClusterStatus(function(err, data){
    console.log("getClusterStatus:", data);
    client.execute('select x, count(*) as cnt from p group by x sort by cnt limit 10', function(err){
      if (err) { console.error("error on execute(): " + err); process.exit(1); }

      client.getQueryPlan(function(err, data){
        console.log("getQueryPlan:", data);
        console.log("queryplan queryAttributes:", data.queries[0].queryAttributes);
        console.log("queryplan stageGraph:", data.queries[0].stageGraph);
        console.log("queryplan stageGraph adjacencyList children:", data.queries[0].stageGraph.adjacencyList[0].children);
        console.log("queryplan stageGraph adjacencyList children:", data.queries[0].stageGraph.adjacencyList[1].children);
        console.log("queryplan stageList:", data.queries[0].stageList);
        console.log("queryplan stageList taskList:", data.queries[0].stageList[0].taskList[0]);
        console.log("queryplan stageList taskList operatorGraph adjacencyList:", data.queries[0].stageList[0].taskList[0].operatorGraph.adjacencyList);
        console.log("queryplan stageList taskList:", data.queries[0].stageList[0].taskList[1]);
        console.log("queryplan stageList taskList:", data.queries[0].stageList[1].taskList[0]);
        console.log("queryplan stageList taskList:", data.queries[0].stageList[1].taskList[1]);
        console.log("queryplan stageList taskList:", data.queries[0].stageList[2].taskList[0]);
        console.log("queryplan stageList taskList:", data.queries[0].stageList[2].taskList[1]);

        client.getSchema(function(err, data){
          console.log("getSchema:", data);
          client.getThriftSchema(function(err,data){
            console.log("getThriftSchema:", data);
            client.fetchAll(function(err, data){
              if (err){ console.error("error on fetchAll(): " + err); process.exit(1); }
              console.log("fetchAll:", data);
              connection.end();
              process.exit(0);
            });
          });
        });
      });
    });
  });
});

結果。

getClusterStatus: { taskTrackers: 1,
  mapTasks: 0,
  reduceTasks: 0,
  maxMapTasks: 2,
  maxReduceTasks: 2,
  state: 2 }
getQueryPlan: { queries: 
   [ { queryId: 'hadoop_20110408164747_f3ab8cdc-4064-40b9-81c5-928d81f4b1ce',
       queryType: null,
       queryAttributes: [Object],
       queryCounters: null,
       stageGraph: [Object],
       stageList: [Object],
       done: true,
       started: true } ],
  done: false,
  started: false }
queryplan queryAttributes: { queryString: 'select x, count(*) as cnt from p group by x sort by cnt limit 10' }
queryplan stageGraph: { nodeType: 1,
  roots: null,
  adjacencyList: 
   [ { node: 'Stage-1',
       children: [Object],
       adjacencyType: 0 },
     { node: 'Stage-2',
       children: [Object],
       adjacencyType: 0 } ] }
queryplan stageGraph adjacencyList children: [ 'Stage-2' ]
queryplan stageGraph adjacencyList children: [ 'Stage-3' ]
queryplan stageList: [ { stageId: 'Stage-1',
    stageType: 3,
    stageAttributes: null,
    stageCounters: 
     { 'CNTR_NAME_Stage-1_REDUCE_PROGRESS': 100,
       'CNTR_NAME_Stage-1_MAP_PROGRESS': 100 },
    taskList: [ [Object], [Object] ],
    done: true,
    started: true },
  { stageId: 'Stage-2',
    stageType: 3,
    stageAttributes: null,
    stageCounters: 
     { 'CNTR_NAME_Stage-2_MAP_PROGRESS': 100,
       'CNTR_NAME_Stage-2_REDUCE_PROGRESS': 100 },
    taskList: [ [Object], [Object] ],
    done: true,
    started: true },
  { stageId: 'Stage-3',
    stageType: 3,
    stageAttributes: null,
    stageCounters: 
     { 'CNTR_NAME_Stage-3_REDUCE_PROGRESS': 100,
       'CNTR_NAME_Stage-3_MAP_PROGRESS': 100 },
    taskList: [ [Object], [Object] ],
    done: true,
    started: true } ]
queryplan stageList taskList: { taskId: 'Stage-1_MAP',
  taskType: 0,
  taskAttributes: null,
  taskCounters: null,
  operatorGraph: 
   { nodeType: 0,
     roots: null,
     adjacencyList: [ [Object], [Object], [Object] ] },
  operatorList: 
   [ { operatorId: 'TS_124',
       operatorType: 9,
       operatorAttributes: null,
       operatorCounters: {},
       done: true,
       started: true },
     { operatorId: 'SEL_125',
       operatorType: 8,
       operatorAttributes: null,
       operatorCounters: null,
       done: true,
       started: true },
     { operatorId: 'GBY_126',
       operatorType: 5,
       operatorAttributes: null,
       operatorCounters: null,
       done: true,
       started: true },
     { operatorId: 'RS_127',
       operatorType: 11,
       operatorAttributes: null,
       operatorCounters: null,
       done: true,
       started: true } ],
  done: true,
  started: true }
queryplan stageList taskList operatorGraph adjacencyList: [ { node: 'TS_158',
    children: [ 'SEL_159' ],
    adjacencyType: 0 },
  { node: 'SEL_159',
    children: [ 'GBY_160' ],
    adjacencyType: 0 },
  { node: 'GBY_160',
    children: [ 'RS_161' ],
    adjacencyType: 0 } ]
queryplan stageList taskList: { taskId: 'Stage-1_REDUCE',
  taskType: 1,
  taskAttributes: null,
  taskCounters: null,
  operatorGraph: 
   { nodeType: 0,
     roots: null,
     adjacencyList: [ [Object], [Object] ] },
  operatorList: 
   [ { operatorId: 'GBY_128',
       operatorType: 5,
       operatorAttributes: null,
       operatorCounters: {},
       done: true,
       started: true },
     { operatorId: 'SEL_129',
       operatorType: 8,
       operatorAttributes: null,
       operatorCounters: null,
       done: true,
       started: true },
     { operatorId: 'FS_137',
       operatorType: 10,
       operatorAttributes: null,
       operatorCounters: null,
       done: true,
       started: true } ],
  done: true,
  started: true }
queryplan stageList taskList: { taskId: 'Stage-2_MAP',
  taskType: 0,
  taskAttributes: null,
  taskCounters: null,
  operatorGraph: 
   { nodeType: 0,
     roots: null,
     adjacencyList: [ [Object] ] },
  operatorList: 
   [ { operatorId: 'TS_138',
       operatorType: 9,
       operatorAttributes: null,
       operatorCounters: {},
       done: true,
       started: true },
     { operatorId: 'RS_130',
       operatorType: 11,
       operatorAttributes: null,
       operatorCounters: null,
       done: true,
       started: true } ],
  done: true,
  started: true }
queryplan stageList taskList: { taskId: 'Stage-2_REDUCE',
  taskType: 1,
  taskAttributes: null,
  taskCounters: null,
  operatorGraph: 
   { nodeType: 0,
     roots: null,
     adjacencyList: [ [Object], [Object] ] },
  operatorList: 
   [ { operatorId: 'OP_131',
       operatorType: 2,
       operatorAttributes: null,
       operatorCounters: {},
       done: true,
       started: true },
     { operatorId: 'LIM_132',
       operatorType: 6,
       operatorAttributes: null,
       operatorCounters: null,
       done: true,
       started: true },
     { operatorId: 'FS_139',
       operatorType: 10,
       operatorAttributes: null,
       operatorCounters: null,
       done: true,
       started: true } ],
  done: true,
  started: true }
queryplan stageList taskList: { taskId: 'Stage-3_MAP',
  taskType: 0,
  taskAttributes: null,
  taskCounters: null,
  operatorGraph: 
   { nodeType: 0,
     roots: null,
     adjacencyList: [ [Object] ] },
  operatorList: 
   [ { operatorId: 'TS_140',
       operatorType: 9,
       operatorAttributes: null,
       operatorCounters: {},
       done: true,
       started: true },
     { operatorId: 'RS_133',
       operatorType: 11,
       operatorAttributes: null,
       operatorCounters: null,
       done: true,
       started: true } ],
  done: true,
  started: true }
queryplan stageList taskList: { taskId: 'Stage-3_REDUCE',
  taskType: 1,
  taskAttributes: null,
  taskCounters: null,
  operatorGraph: 
   { nodeType: 0,
     roots: null,
     adjacencyList: [ [Object], [Object] ] },
  operatorList: 
   [ { operatorId: 'OP_134',
       operatorType: 2,
       operatorAttributes: null,
       operatorCounters: {},
       done: true,
       started: true },
     { operatorId: 'LIM_135',
       operatorType: 6,
       operatorAttributes: null,
       operatorCounters: null,
       done: true,
       started: true },
     { operatorId: 'FS_136',
       operatorType: 10,
       operatorAttributes: null,
       operatorCounters: null,
       done: true,
       started: true } ],
  done: true,
  started: true }
getSchema: { fieldSchemas: 
   [ { name: 'x', type: 'string', comment: null },
     { name: 'cnt', type: 'bigint', comment: null } ],
  properties: null }
getThriftSchema: { fieldSchemas: 
   [ { name: 'x', type: 'string', comment: null },
     { name: 'cnt', type: 'i64', comment: null } ],
  properties: null }
fetchAll: [ 'README.md\t1',
  'bin\t1',
  'derby.log\t1',
  'extlib\t1',
  'hql\t1',
  'js\t1',
  'lib\t1',
  'metastore_db\t1',
  't\t1' ]

しかし node.js の console.log() や console.err() はオブジェクトを渡すと適当にシリアライズしてくれるので実にラクでいいね!