たごもりすメモ

コードとかその他の話とか。

REPLでありつつ指定のコードをappspotで動作させる flex_remote_api_shell を作った

remote_api_shell は手元で書いたコードをすぐ動作させることができて、しかも本番環境に対してデータをすぐにput()できたりするので大変便利だと思うんだけど、いかんせん遅いのが問題。複雑なデータのメンテナンスをするときに便利なんだけど、対象のエンティティ数が1000を超えたあたりから、処理するごとにかなり我慢のならない待ち時間が発生する。
で、その遅さってMakeSyncCallをHTTP RPC経由で実行してることに問題がある。ので、指定したときだけその実行がappspot上で直接動けば、手元で書いたロジックの動作を確認しつつ、重いループだけappspot上で高速に処理することができて便利なんじゃね!?

と思ったので、作りました flex_remote_apiリポジトリgithubにあります。ライセンスは Apache License v2.0。READMEはそれなりに頑張って書いたので、テキトー英語の読解が苦手でない人はこのエントリよりもREADMEの方がいいかも。

(1/18追記: flex_remote_apiあらためremote_driverとなりました - tagomorisのメモ置き場 )

なお、まだエラー処理がぜんぜんなのでちょっと微妙です。appspot側で実行されるコードが例外になったりすると永遠にお待ちください状態になったり。そのうち直すはず。

また readline モジュールに強く依存するため、Windows環境では動作しない可能性が高いです。Cygwinなどを使えばreadlineモジュールが(多分)使えるので、どうしてもWindowsから実行したい人はCygwin使いましょう。LinuxやMac OSXでは普通に使えます。多分。

使い方

  • flex_remote_apiの準備
    • flex_remote_apiリポジトリをcloneする
    • flex_remote_api.py を自分のアプリケーションのディレクトリにコピーする
    • リポジトリに含まれる queue.yamlを自分のアプリケーションにコピーするか FlexRemoteApiJob の定義を書き写す
    • app.yamlにハンドラ定義を追加する(remote_apiも必要)
builtins:
- remote_api: on

handlers:
- url: /_ex_ah/flex_remote_api/.*
  script: flex_remote_api.py
    • アプリケーションをデプロイする( appcfg.py update する)
  • flex_remote_api_shell の起動、実行
    • bin/flex_remote_api_shell.py をAppEngine SDKのパスとアプリケーションIDを引数に起動する
 $ bin/flex_remote_api_shell.py /path/to/sdk/google_appengine app-id
    • メールアドレスとパスワードは自分のアプリケーションの管理用のGoogleアカウントのものを入れておきます
  • なにか書く
    • クラスの定義や関数定義、エンティティの作成とput、Queryなどなど、普通に remote_api_shell と同じように使えます
    • 行の頭に 'remote' およびスペースを入れてから書いたコードは appspot 上で実行されます
 app-id/flex> remote hoge_function(arguments)
    • eval で処理されるので、式を書きましょう
      • 文は書けません
      • 例えば for ループなどはPythonでは文なので、そのままでは書けないため、いったん関数として定義してあげてから関数を実行する形にしましょう

その他、現状の制限などなど、詳しくはREADMEを読んでください……。もしくは Twitter で @tagomoris に質問してください。w

何が嬉しいの?

速度

大量のエンティティを操作する速度が、通常の remote_api 経由での実行にくらべて圧倒的に高速です。内容によりますが単純なデータ作成と保存でもだいたい50倍〜100倍くらい高速なので、Datastoreに既に存在するデータの更新などのケースではおそらくもっと速いでしょう。

記述のお手軽さ

手元でデータを作ったりロジックを確認したりしてから、改めて全体に対してそれを実行してやる、という使い方ができます。
例えばエンティティ単位での処理をまず関数として書いて、そのまま手元で(remote_apiで)実行して、結果を確認する。ちゃんと動作することを確認したら、その関数を全エンティティを対象に実行するように書いてリモートで実行する。
こうすることで、書く、すぐ実行する、という操作を結果を確認しながらさくさく行えます。部分部分の記述が正しくなっているかを確認しながら進行させることができるので「やってみた! 30分待って結果を確認したら例外吐いて落ちてた!」という orz な事態を避けることができます。

データのインポートなど

例えばですが、データのインポートが必要なとき、こんな使いかたもできます。

  • インポート元のデータを適当に分割して、テキトーなモデル名でDatastoreに突っ込んでおく
    • 元が100万行のCSVがインポート元のデータだとしたら、まずそれを1万行ずつのデータに分割してリストに持つ (仮に ary というリストだとする)
    • その場でインポート専用に以下みたいなモデルをさくっと定義(デプロイされたコードにこの定義が書かれてなくても全く問題なし)
class ImportData(db.Model):
  num = db.IntegerProperty()
  content = db.TextProperty()
    • このエンティティにデータを全部突っ込んで、とりあえず remote_api 経由でDatastoreに入れておく
for i in range(0, len(ary)):
  ImportData(num=i, content=ary[i]).put()
    • これはそれなりに遅いですが、残念ですが我慢しましょう……これをやっておけばあとで高速に処理できるし、一度だけのことだし
    • もしProtocolBuffer 1MBサイズ制限にひっかかる可能性が低ければ、batch putが速いです
entities = [ImportData(num=i, content=ary[i]) for i in range(0, len(ary))]
db.put(entities)
  • ImportDataをひとつ受け取って、各行から目的のエンティティに変換してputする関数を書く
    • 例えばこんな感じ
def convert(num):
  target = ImportData.all().query("num =", num).get()
  for line in target.content.split("\n"):  # か、何か適当なcsv用のモジュールをimportするとか
    values = line.split(',') # とりあえず適当にね
    HogeModel(prop1=values[0], prop2=values[1], ...).put()  # 欲しいようにエンティティを組み立てて put
    • で、まずremote_apiで一度実行してみる
app-id/flex> convert(0)
    • 完了したら HogeModel を適当に取得してみるとかDatastore Viewerで確認するとかで、ちゃんと実行されたかを見る
      • うまくいってなかったら def convert を修正・再定義してからまた試してみる
  • 1単位がちゃんと実行されるようになったら、残りの全体に対してリモートで実行
    • このとき、ループは関数でくるんであげる必要がある
app-id/flex> def convert_range(start, end):
...   for i in range(start, end):
...     convert(i)
... 
app-id/flex> remote convert_range(1, 99) # 0 は終わってるんで、残りの99グループに対してappspot上で実行!

こんな感じで簡単実行!

また特定のURLのリストに対して、内容を取得してからそのデータをエンティティに保存して、といったようなものも、あらかじめURLリストをDatastoreなりに入れておいてそれを取得して実行するよう関数を書くことで、appspot上で高速に実行することができます。HTTPの通信もurlfetchを使ってappspot上から行えば、手元の通信回線が低速でも全く問題ありません。
元がデータベースで手元にあったりするケースはちょっと面倒ですが、まずできるだけそのままDatastoreに入れてしまうのが勝利への道でしょう。そうすればいざデータのコンバートを実行するというとき、コンバート元データのgetとコンバート後データのputが超高速に行えます。最初のインポートは一回だけだから、そこの時間が問題になることはあんまり無いはず。

考えている改造

エラーのハンドリングはちゃんと書くつもり。taskqueueがどういう例外で失敗したかを取得する方法があるかなあ……。

あとはリモート実行を指示したときに完了を待ってブロックせずにすぐ処理を戻して、並列して複数の処理要求を発行できるようにする非同期モードを作るつもりです。そうすると、例えば上記のデータインポートが以下のように書けるようになるはず。

app-id/flex> def convert_range(start, end):
...   for i in range(start, end):
...     convert(i)
... 
app-id/flex> remote_async convert_range(1, 20)
app-id/flex> remote_async convert_range(21, 40)
app-id/flex> remote_async convert_range(41, 60)
app-id/flex> remote_async convert_range(61, 80)
app-id/flex> remote_async convert_range(81, 99) # ここまでは全部、すぐに処理が戻ってくる
app-id/flex> remote_async_wait() # ここで上記の remote_async の処理全部が終了するのを待つ

現状で remote ディレクティブの処理はappspot上ではtaskqueueでの実行なので、並列化自体は簡単にできるはず。こうすれば処理時間は理論上、最初の例(convert_range(1,99)の場合)の1/5で完了すると思われて、夢が広がりんぐですね。

ということで

誰か使ってみたら感想ください!w