たごもりすメモ

コードとかその他の話とか。

fluent-plugin-esper構想概略

Fluentd Casual Talks #2 で聞いた fluentd + esper によるCEP実現の話が非常に素敵な感じで、自分の手元でもああいう感じで書きやすく投入しやすいストリームクエリがやりたくてしょうがないお年頃なので、以下のようなものを作ろうかと考えている。勢いあまって(まだ1バイトも書いてないのに) the RubyKaigi 2013 のCFPにsubmitしてしまった。通ったら何がなんでも2ヶ月くらいで書いて動かさないといけない。

ということで以下のようなものを作ろうかと考えている。この内容は Fluentd Casual Talks #2 の懇親会でFluentdコミッタ陣と @angostura11 さんとで頭をつきあわせてゴニョゴニョ話していた内容で、100%自分で考えたものでないことは最初に記しておく。だいたいそもそものアイデアは @angostura11 さんのtalkから来ている。別に彼らも剽窃だといって怒ったりするまい。……たぶん。

まず前提としてEsperというのはこういうもの。*1

  • Javaで書かれたストリーム処理用ライブラリ
  • SQL的なクエリ StreamSQL をサポート(ウィンドウ関数の追加など)
  • 投入するデータの型を(データ投入前に)定義しておく必要がある
  • ライブラリなのでサーバ部分を別途記述して用意する必要がある

http://www.igvita.com/2011/05/27/streamsql-event-processing-with-esper/

あまり他にストリーム処理用ライブラリの類について聞いたことがないので、これをどうにか使うことを考える。それなりに大きいライブラリらしいので再実装するとかは考えない。そのまま使いたい。更新のお世話にもなれるし。
ただし普通にJavaのコードを書いてEsperサーバみたいなのを使うと別プロセスを管理しないといけなくなり、カジュアルさが思いっきり下がる。別ソフトウェアという心理的な障壁は長期運用に対しても悪影響を及ぼすので良くない。やはり gem install && fluentd だけでインストールして起動したい。

ということで、以下のような構成を考える。

  • fluent-plugin-esper を作る
    • これだけで esper および esper を含むプログラム(javaなやつ)が入るようにする
    • forkして起動し、fluentd本体プロセスとのやりとりはpipeをつかって行う

javaなアレをgemにして配布・インストールとか問題ないんだっけとかそういうことはとりあえず考えない。なんとかなるさ。最悪インストール時にどっかからダウンロードしてjavacする。
javaプロセス部分を何で書くかはだいぶ悩みどころだが、javaでゴリゴリ書くかjrubyで書くかみたいな感じだろうからまあそれも後で考える。

さてデータの入力と出力はこれでいいとしても、クエリ投入などの管理系の口が必要になる。これは子プロセス側で適当にネットワーク通信なポートをlistenすればいいんだけど、それに関連して以下のようなことを考えないといけない。

  • 複数のEsperノードへの負荷分散と調整
    • Esperで実際どの程度の量が処理可能かどうかは不明だがまあ無限ではあるまい、ということで分けることを考えないといけない
    • 1タグのデータが複数のプロセスに分かれると問題があるのでタグ単位でルーティングを頑張る
    • どのタグのデータをどこに流したかを誰かが覚えてる必要がある
    • およびルーティングそのもののためのプラグインが必要
  • 複数のEsperノードへのクエリ投入と一貫性の保証
    • 特定のクエリを登録したとき、それが全てのEsperノードに漏れなく配られる必要がある
    • クエリ定義以外にもデータ型なども同じように管理する
  • 投入したクエリ・データ定義などの揮発への対応
    • そもそもどんなタグがどこにどのように流れたかを覚えておく必要がある
    • どこかのEsperノードが再起動したときにちゃんと状況を再現する
    • 全体が再起動したときにアホの子にならないよう全体を再現する
    • そういう設定を各Esperノードでシリアライズ・保存するのは危険があぶないので中央管理したい
    • 管理ノードプラグインがあればいいんじゃね?

もはやプラグインて何だっけ状態になってきたが、そんな感じで複数のプラグインインスタンスが協調して動くようにする。データのやりとりはpipeも管理側もmsgpackでよかろう。管理側はJSON APIでもいいかな。
問題は管理側がどこでデータを保存するかで、これはもう in_tail のposファイルみたいにどこかに置かせていただく以外にはなさそうな気がする。あとはDBを選択できるようにするとか。気合いをいれて実装するしかないかな。

ということで大まかには以下のような感じになるでしょう。

  • Esper管理プラグイン
    • NW経由でEsperプラグイン各インスタンス、およびEsperルーティングプラグイン各インスタンスに対して設定情報を提供する
    • NW経由でユーザからデータ定義の追加/削除やクエリの追加/削除、現在の設定情報の提供を行う
    • ユーザから追加されたデータ定義/クエリ(メタデータ)をEsperプラグイン各インスタンスにpushする*2
    • ユーザ定義/クエリなどの情報を保存・再生する
    • クエリ結果を集約してFluentdネットワークに流す
  • Esperルーティングプラグイン
    • Esper管理プラグインの位置を指定して起動する
    • どのタグをどのEsperプラグインに流すかを管理・再生する(Esper管理プラグインに問合せる)
    • 基本的にはforwardなんだけど、ルーティングを自由に変更可能である必要がある
    • 1インスタンスで試すとき*3はわざわざ流し直さず内部ルーティングだけで解決したい
  • Esperプラグイン
    • Esper管理プラグインの位置を指定して起動する
    • 子プロセスをfork&execしてEsperプロセスを起動する
    • メタデータをEsper管理プラグインに問合せてEsperプロセスに流し込む
    • 実データをEsperプロセスに流し込み、クエリ結果を受け取ってEsper管理サーバに流す

ざっくり図にするとこんな感じかな。
f:id:tagomoris:20130219140510p:plain

要検討なのは以下のようなあたり

  • fluentdノードをまたぐときに必然的にforwardまわりの記述が必須になるが、それをどうするか
    • まあしょうがないよね、書いてよ、とするか
    • 何か独自のNW転送をごにょごにょやるか……やりたくない
      • forwardを継承して勝手にインスタンスを作って登録するという手はある
  • ノードの特定をどうするか
    • 再起動したりしたとき、前に繋ぎにきたノードと同一ノードかどうかをどのように判定する?
    • uuidをごにょごにょやるとかhostnameでがんばるとか?
    • 設定になんかキーみたいなのを書かせる?
  • 結果データを Esper admin に集約する必要が本当にあるかどうか
    • うーん? でも結果をまとめてどこかに流すならそのほうがいい気がするなあ

さー楽しくなってきやがったぜー。

*1:ほぼ聞いた話だけなので間違いがある可能性が高い

*2:もしくはEsperプラグイン側から高頻度でpullする?

*3:あるいは同じfluentdへ流すとき