たごもりすメモ

コードとかその他の話とか。

HiveServerを使用してPythonやPerlからHiveQLを実行する

HiveServerはThriftプロトコルをしゃべる*1ので、おなじくThriftで接続すると任意のクエリを発行できたりして大変便利。ということで、やったので、そのメモ。

準備

まずHiveServerだけど、hiveコマンドを叩いてクエリが実行できる状態なら以下のコマンドで起動する。*2

 $ hive --service hiveserver

ただしあらゆるデータ投入などをThrift経由でやるのは正直めんどくさい。hiveと叩けば起動するHive CLIでもデータ操作はしたい。なので conf/hive-default.xml にMetastoreの設定をする。今回は手元のMySQLに hive データベースを作り、それを使うようにした。このあたりが大変参考になった。
HiveのmetastoreをMySQLを使ってLocal Metastore形式で利用する - blog.katsuma.tv
ついでにデータ出力時にはデフォルトで圧縮するようにしてある。(変更点およびその関連のみ抜粋)

<!-- これは無変更 -->
<property>
  <name>hive.metastore.local</name>
  <value>true</value>
  <description>controls whether to connect to remove metastore server or open a new metastore server in Hive Client JVM</description>
</property>

<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:mysql://localhost/hive?createDatabaseIfNotExist=true</value>
  <description>JDBC connect string for a JDBC metastore</description>
</property>

<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>com.mysql.jdbc.Driver</value>
  <description>Driver class name for a JDBC metastore</description>
</property>

<!-- 対応するユーザのprivileges設定をMySQLでやっておく -->
<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>hive</value>
  <description>username to use against metastore database</description>
</property>

<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>yourpassword</value>
  <description>password to use against metastore database</description>
</property>

<!-- shib というアプリケーションで使うので HDFS上のパスはデフォルトのものから変更 -->
<property>
  <name>hive.metastore.warehouse.dir</name>
  <value>/shib/hive-warehouse</value>
  <description>location of default database for the warehouse</description>
</property>

<!-- 圧縮の指定、仕様するアルゴリズムはHadoop側の指定任せってのがちょっと悲しい -->
<property>
  <name>hive.exec.compress.output</name>
  <value>true</value>
  <description> This controls whether the final outputs of a query (to a local/hdfs file or a hive table) is compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* </description>
</property>

こうして設定しておけば hive コマンドでCLIを起動した場合と hive --service hiveserver でThrift経由でアクセスした場合に、正常に同じテーブルにアクセスできるようになる。
なお、別のサーバでやはりMySQLをmetastoreに使おうとしたとき "com.mysql.jdbc.exceptions.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes" というようなエラーが出た。http://wiki.apache.org/hadoop/Hive/FAQの最後にこの通りのことが載ってたので、DB作成時に "CREATE DATABASE hive DEFAULT CHARACTER SET 'latin1';" で解決。

1カラムだけ持っているテスト用のテーブルを作ってロードするには、以下みたいな感じで。

$ ls -1 /path/to/dokoka > hoge.txt
$ hive
hive> CREATE TABLE p (x STRING);
hive> show tables;
OK
p
Time taken: 1.772 seconds
hive> LOAD DATA LOCAL INPATH '/Users/hadoop/hoge.txt' OVERWRITE INTO TABLE p;
Copying data from file:/Users/hadoop/hoge.txt
Loading data to table p
OK
Time taken: 0.421 seconds
hive> SELECT count(*) FROM p;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapred.reduce.tasks=<number>
Starting Job = job_201103071155_0052, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201103071155_0052
Kill Command = /Users/hadoop/hadoop/bin/hadoop job  -Dmapred.job.tracker=localhost:50030 -kill job_201103071155_0052
2011-03-10 15:09:03,635 Stage-1 map = 0%,  reduce = 0%
2011-03-10 15:09:06,707 Stage-1 map = 100%,  reduce = 0%
2011-03-10 15:09:15,767 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_201103071155_0052
OK
113
Time taken: 20.967 seconds
hive> 

きちんとMapReduceまで起動することが確認できたらOK。HiveServerを起動しておこう。

$ hive --service hiveserver
Pythonから接続する

Thriftは定義ファイルがあれば、そこから各言語用のライブラリコードを自動生成できるようになっている。が、Python(とPHP)は生成済みのものがパッケージに同梱されているので、とりあえずそれを使えばいい。公式WikiHiveClient - Apache Hive - Apache Software Foundation にだいたい正しいことが書いてあるが、間違いも含まれてるので注意。
例えば、Pythonの対話環境で以下のようにする。

# コンソールから以下のコマンドで起動 (PYTHONPATHに与えるディレクトリは適当に)
# env PYTHONPATH=/Users/tagomoris/tmp/hive-0.6.0/lib/py python
from hive_service import ThriftHive # Wikiだと from hive ... だが、今のコードだと from hive_service ... でないとダメ
from hive_service.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
transport = TSocket.TSocket('localhost', 10000)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = ThriftHive.Client(protocol)
transport.open()
client.execute("select count(*) from p")
client.fetchAll(); # => ['113']
transport.close()

client.execute に任意のクエリを与え、結果を client.fetchAll (もしくはfetchOne/fetchN) で受け取る。割と簡単。なおこの処理を走らせたときに HiveServer のコンソールへの出力を見ると、MapReduceが起動しているのがわかる。

Perlから接続する

ThriftはPerlでも使えるので、Hiveのパッケージから必要な定義ファイルを食わせてPerl用のモジュールを作れば、PerlからでもHiveServerには接続できる。ただしまず thrift コマンドが使える状態にする必要があるので、thrift-0.5.0 あたりを持ってきて configure && make && sudo make install しておくこと。
また、CPANからThrift用のモジュールを入れておく。*3

$ cpanm -l ./extlib/ Thrift::XS

thriftコマンドが使えるようになったら、以下のようにして必要なファイルを生成する。いくつかある。 (3/30 queryplan.thrift を追加)

$ cd /path/to/project # このディレクトリに gen-perl というディレクトリが作られ、そこに生成されたファイルが置かれる
$ thrift -I ~/tmp/hive-0.6.0/src -I ~/tmp/hive-0.6.0/src/service -I ~/tmp/hive-0.6.0/src/service/include -I ~/tmp/hive-0.6.0/src/metastore --gen perl ~/tmp/hive-0.6.0/src/service/include/thrift/fb303/if/fb303.thrift
$ thrift -I ~/tmp/hive-0.6.0/src -I ~/tmp/hive-0.6.0/src/service -I ~/tmp/hive-0.6.0/src/service/include -I ~/tmp/hive-0.6.0/src/metastore --gen perl ~/tmp/hive-0.6.0/src/metastore/if/hive_metastore.thrift
$ thrift -I ~/tmp/hive-0.6.0/src -I ~/tmp/hive-0.6.0/src/service -I ~/tmp/hive-0.6.0/src/service/include -I ~/tmp/hive-0.6.0/src/metastore --gen perl ~/tmp/hive-0.6.0/src/service/if/hive_service.thrift
$ thrift -I ~/tmp/hive-0.6.0/src -I ~/tmp/hive-0.6.0/src/service -I ~/tmp/hive-0.6.0/src/service/include -I ~/tmp/hive-0.6.0/src/metastore --gen perl ~/tmp/hive-0.6.0/src/ql/if/queryplan.thrift

これで必要なファイルは生成できた。Hiveにクエリを投げるサンプルコードはこんなのを書いてみた。gistにも置いておいた
起動時にライブラリの読み込みを指定して起動するのを忘れないこと。

$ perl -Igen-perl thrifthive_test.pl

コードはこんな感じ。

#!/usr/bin/env perl

use local::lib "./extlib/";
use Thrift;
use Thrift::Socket;
use Thrift::BufferedTransport;
use Thrift::BinaryProtocol;
use ThriftHive;

my $socket = Thrift::Socket->new("localhost", 10000);
$socket->setSendTimeout(600 * 1000); # 10min.
$socket->setRecvTimeout(600 * 1000);
my $transport = Thrift::BufferedTransport->new($socket);
my $protocol = Thrift::BinaryProtocol->new($transport);

my $client = ThriftHiveClient->new($protocol);

eval {
    $transport->open();
    $client->execute("select count(*) from p");
    use Data::Dumper;
    warn Dumper $client->fetchAll();
    $transport->close();
};
if ($@) { print $@->{message}, "\n"; };

注意点は $socket を作ったとき、タイムアウトを忘れずにセットすることかな。タイムアウトのデフォルトは10秒で、これだとどんなに高速なMapReduceでも絶対に間に合わず、接続を切られてしまう。ミリ秒指定で、とりあえず600秒(10分)にした。これでもHiveクエリの用途にはちょっと短い場合がありそうだ。

以上

おつかれさまでした!
いや、まだnode.jsでも試すんだけど、あとで……。

*1:jdbc/odbc実装などがあるが、インターフェイスが用意されてるだけで、実際の通信はThrift経由で行われている。jdbcが利用可能な何かのライブラリを繋ぎ込む、とかの事情でない限りは自分でThriftを呼ぶようにプログラミングするのが良いと思う。

*2:Hadoopクラスタのセットアップができていて HADOOP_HOME や HADOOP_CONF_DIR が適切にセットされており、hadoopコマンドが使えること、という前提やらがあれこれあるが、省略。そのうち記事にするかなあ? まあ、がんばれ!

*3:extlibの指定はアプリケーションの都合なので、わからない人は無視して外してよい。