node.jsの非同期I/Oにおけるデータ受信のパターンのバリエーション
そもそもなんでnode.jsのThriftライブラリではBufferedTransportがサポートされず、FramedTransportのみが使える状態だったのか。Thriftの歴史的にはBufferedTransportの方が先行して存在しており、また仕様自体も単純のようだ。*1
実装を開始してみてわかったが、node.jsが採用する非同期I/OアーキテクチャのAPIと実に相性が悪い。Thriftが定義ファイルから各言語用のコードを自動生成する仕組みであることも関係している気がする。いざnode.jsの都合に合わないからといって、カジュアルに生成結果のコードを修正するわけにはいかない。また受信データ(を持っているはずのI/Oストリーム)からデータを読み出すところまでがThriftによる自動生成の範囲に含まれる。
(Twitterで言及を読んで追記) 普通にアプリケーション側のコードをコールバックベースで記述できるなら、その方が自然です。そうした方がいいと思います。ここで扱う例は「アプリケーション側のコードを変更できない事情がある」「アプリケーション側のコードが巨大になることが予想されるためコールバックベースの記述を積み上げることを容認できない」などの状況の場合に、アプリケーション側のコードを非コールバックベースのものにしつつnode.jsでどうにかするための方法です。
FramedTransportとBufferedTransport
違いを簡単に。
- FramedTransport
- 全てのデータはフレームと呼ばれる範囲に含まれて送受信が行われる
- 先頭に32bitバイナリ(big endian)でフレーム長が表され、その後にフレーム長のぶんだけバイトストリームが続く
- 1回のThrift RPCについてのデータ(メッセージ)はすべてひとつのフレームの中に収まる
- BufferedTransport
問題は何か
極めて簡潔に言うと4バイト欲しいときに、読み込み済みデータとして与えられたものが4バイト以上あるとは限らず、次が来ることを明示的に待つことはできない。したがって、すべてのデータ受信処理が以下の機能を備える必要がある。
- データ不足による読み込み処理の中断
- 中断した受信処理の再開
以下、ちょっと具体的に例。
node.jsによるストリームからのデータ受信処理は、以下のようにデータ受信イベントに対するコールバックとして実装する。Thriftはちょっと離れて、適当なRPC用のclientオブジェクトを作ったと仮定してコード例を書いてみよう。
var client = new AnyRpcService.Client(); var stream = net.createConnection(port, host); stream.addEventListener("data", function(err, data) { /* 受け取ったデータをどう処理するかをコールバックとして記述 */ client.read_and_exec(data); });
上述のコールバック中に処理を記述するが、さて、client.read_and_exec に以下のような前提を置いてみよう。
- 実際にどのようなRPC呼び出しなのかは、データのヘッダ(先頭1バイト)を読むと分かる
- メソッドAの場合、ヘッダに加えて4バイトのデータを読み込む
- メソッドBの場合、ヘッダに加えて10バイトのデータを読み込む
- メソッドCの場合、改行コードを見付けるまでデータを読み込む
Blocking I/Oな脳だと「読めばいいじゃん」だが、しかしnode.js(Non-blocking I/Oな世界)では読めるデータは data として与えられており、これ以上のデータを「待つ」ことはできない。それをやるとblockしてしまうので。従ってメソッドA、BおよびCを実装するには、clientとして以下のような動作を行う必要がある。
client.processing = undefined; client.buffer = ''; client.read_and_exec = function(data) { // data は ストリームが setEncoding() さている場合は文字列、それ以外はBuffer // この例では文字列として扱っている if (! this.processing) { if (data.length < 1){ return; } this.processing = read_rpc_method(data[0]); } switch (this.processing) { case a: if (this.buffer.length + data.length < 4) { this.buffer += data; break; } var value = data.slice(0, 5); this.buffer = data.slice(5); // and process... return; case b: if (this.buffer.length + data.length < 10) { this.buffer += data; break; } var value = data.slice(0, 11); this.buffer = data.slice(11); // and process... return; case c: var newlinepos = data.indexOf("\n"); if (newlinepos < 0) { this.buffer += data; break; } var value = data.slice(0, newlinepos); if (newlinepos < data.length - 1) { this.buffer = data.slice(newlinepos + 1); }else{ this.buffer = ''; } // and process... return; } // a/b/c どのケースにも該当しない場合 this.buffer += data; };
必要なデータを持っているか data を検査する部分とかバッファからのデータ読み出しとかはもうちょっとまとめたりして綺麗にできるだろうけど、基本的にはこんな感じ。これを外部からの入力データを読む部分でぜんぶやらないといけない。はっきり言ってかなり面倒。
フレーム化されたデータ受信の例
ThriftでいうFramedTransportのように、処理に関係なく何バイト読めばいいということが受信データ自身から分かる場合、状況はかなり簡単になる。clientクラスの側ではバッファの扱いを気にしないよう、データ受信イベントの定義ごとにひとつのクロージャを割り当てるようにすればいい。以下のようなコードだ。*2
var frameReader = function(client){ var buffer = ''; var frameLength = 0; return function(data) { buffer += data; while(1) { if (frameLength == 0) { if (buffer.length < 1) { return; } frameLength = read_binary_frame_length(buffer[0]); buffer = buffer.slice(1); } if (buffer.length < frameLength) { return; } var frame = buffer.slice(0, frameLength); buffer = frameLength > buffer.length ? buffer.slice(frameLength) : ''; frameLength = 0; client.read_and_exec(frame); } }; }; var client = new AnyRpcService.Client(); var stream = net.createConnection(port, host); stream.addEventListener("data", frameReader(client));
ちょっとわかりにくいが frameReader() が呼ばれるたび(つまり stream のdataイベントに対応する処理を登録しようとするたび)に、独自のバッファをもったクロージャを生成して返している。
このクロージャは呼ばれるたび(つまり stream のdataイベントが発生するたび)に buffer にdataを追記し、フレーム長を読み出す。フレーム長に足りる量のデータがその時点でbufferに無ければ、client側の処理は全く呼び出さずに return し、次のdataイベントを待つ。フレーム長に足りるデータがあればその長さのデータをバッファから切り出し、clientの read_and_exec() に渡す。ここでフレームを規定しているプロトコルの仕様上、client.read_and_exec() に渡されたフレームには(どのようなRPCメソッドであろうと)必要なデータがすべて収まっていることが保証されるので、clientクラスを実装するときにバッファリングや受信の中断/再開などを考える必要はない。
同じような仕組みは、フレーム長が無いプロトコルでも「データは1行のテキストでやりとりする。改行がデータの区切り」などと決まっていれば使える。非同期readに関連する処理をひとつのクロージャにすべて隠蔽できるので、かなりオススメのパターンだ。*3
非フレームデータの受信で非同期readを隠蔽する
非フレームデータの場合、単純に考えると個別のread処理ごとにバッファリングをやらないといけないというのは前述のとおり。しかし世の中には client クラスに手を入れられない場合がある。既存のライブラリやクラスを使い回したい場合や、自動生成されたクラスをそのまま使用する必要がある場合(Thriftだ)などなど。そもそも各クラスにバッファリングなどをいちいち実装していくのは大変だしバグの温床になりそうなので、できるならそういうことはやりたくない。どこか下のレイヤで隠蔽するのが正しい実装だと言える気がする。
ではどうするか。アプリケーションクラス(ここで言うClient)は、データを読み出すために data.read(bytes) というメソッドを経由する、ということにする。dataは中間で非同期処理を隠蔽するレイヤが提供するバッファリング用のオブジェクト。data.read(1); data.read(1); というように呼べば、最初の1バイト、次の1バイト、と返ってくることにしよう。Clientからはこのインターフェイスのみ用いる。*4
また Client は data からデータを読み出すためのメソッド read_method を持ち、データが正常に読み出されたときには exec_method を呼び出してその後の処理を実行する、ということにしよう。言葉で書くと面倒だな。こんな感じ。
AnyRpcService.Client = function(){ }; Client.prototype = { read_rpc_method: function(data){ var onebyte = data.read(1); switch(onebyte) { case 'a': return 'a'; case 'b': return 'b'; case 'c': return 'c'; } return undefined; }, read_method: function(data){ var method = this.read_rpc_method(data); switch(method) { case 'a': return this.exec_method('a', data.read(4)); case 'b': return this.exec_method('b', data.read(10)); case 'c': var line = ''; var char = undefined; while(char = data.read(1)) { line += char; if (char == "\n") break; } return this.exec_method('c', data.read(line)); } return undefined; }, exec_method: function(method_name, data){ return this[method_name](data); }, a: function(data){ /* 実際の処理あれこれ */ }, b: function(data){ /* 実際の処理あれこれ */ }, c: function(data){ /* 実際の処理あれこれ */ } };
こんな感じ。read_method/exec_methodでデータ読み込みとディスパッチャをやっていると思えばいい。べたに書くとちょっと微妙に見える気がしないでもないな、と今思った。*5
上記のコードは data.read() というインターフェイスを使うこと、および副作用のないコードにすること、という以外の注意点はない。またa/b/cのメソッド内の実処理においては、data として引数に渡されてくるときには必要なバイト数が揃っているものと仮定したコードが書ける。
では上記のClientクラスをどう非同期readに組込むか。言葉で言うと「読み込みのcommit/rollbackを実装したバッファと例外を用いる」。ストリーム生成時に以下のようなイベントリスナを登録することで実現する。
var DataNotEnoughError = function(){}; var ReReadableBuffer = function(){ var buffer = ''; var readPos = ''; }; ReReadableBuffer.prototype = { read: function(len){ if (this.buffer.length < len) throw new DataNotEnoughError(); var data = this.buffer.slice(0,len); this.readPos = len; return data; }, commit: function(){ this.buffer = this.buffer.slice(this.readPos); this.readPos = 0; }, rollback: function(){ this.readPos = 0; } }; var dataReader = function(client){ var buf = new ReReadableBuffer(); return function(data){ buf.buffer += data; while(buf.buffer.length > 0){ try { client.read_method(buf); buf.commit(); } catch(e){ if (e instanceof DataNotEnoughError){ buf.rollback(); } else{ throw e; } } } }; }; var client = new AnyRpcService.Client(); var stream = net.createConnection(port, host); stream.addEventListener("data", dataReader(client));
ざっと書くとこんな感じ。*6
この処理のキモは上に書いた通り、読み込みのcommit/rollbackが可能なバッファだ。Clientクラスにおけるデータの読み込みは副作用が無い処理にしておいて、途中でデータが足りなくて失敗した場合には read() が例外を投げる。例外が発生した場合はバッファの読み込みカーソルを先頭に戻しておいて、次にデータを受信したときにリトライできるようになっている。例外が発生せずに実処理メソッドのディスパッチまで到達した場合には必要なデータはすべて読み出せているので、その時点で失敗を考える必要はない。この場合は読み込みバッファ内でカーソルが進んだ部分は正常に読み出されたあとなので、バッファに対して commit を呼んで次の読み込みに備える。
なお、ここでも dataReader はdataイベントへの登録ごとに ReReadableBuffer のインスタンスを生成して保持するクロージャを返すので、別のI/OイベントにそれぞれaddEventListenerすれば、別々のバッファが自動的に割り当てられることになる。このあたりに配慮しなくていいのは便利だし、言語機能の強力さが嬉しいあたりだ。
まとめ
node.jsが非同期で楽しいぜヒャッホーといっても、実際にはすべての処理で非同期を前提にしたコーディングを行うのはさすがにめんどいし、バグりそうで怖い。なのでデータに対する実処理を記述する時点では必要な量のデータが揃っていることを保証したい。そのためのパターンとして、以下のような方法がある。
- 読み込みデータをフレーム化する
- フレーム単位で後処理に渡すことをデータ受信レイヤで保証する
- 読み込みデータがフレーム化できない場合
- 読み込み処理を副作用がない形で記述することを強制する
- それを利用したバッファを間に挟むことで後処理に渡った時点では必要な量のデータが揃っていることを保証する
このあたりを記述する必要がある人が世の中にどれだけいるかわからないけど、非同期I/Oプログラミングのパターンのひとつの紹介ということで。
で
世の中に既にこういうパターンは知られているに違いなくて名前がついていると思うんだけどなんて言うのか誰か知りませんか?
*1:のようだ、というのは、バイナリプロトコルの仕様についてどこにも明確に定義した文書を発見できなかったため。各言語用ライブラリが適当に互換性をとりあって成立してるのかなあ。危険だ。
*2:node-thriftのconnection.jsでは最初FramedTransportに特化した、このような処理内容のコードになってた。
*3:なにかもう名前のついた有名なパターンだ、という事実がどこかにありそうな気がする。そういうあたりにはとても弱くて……。
*5:Thriftの場合、read_methodにあたる部分がプロトコル(protocol.js)および自動生成のClientクラスとして分離されているので、あんまりこのへんをまとめて目にすることがない。
*6:node-thriftの場合はデータ受信と処理メソッドのディスパッチがちょっと変わった構造をしているので、これよりもうちょっと怪しげな処理になってる。