Smalltalk
Tarantool
mq
SmalltalkDay 22

Tarantubeでメッセージキュー

はじめに

前回の記事から1年ほど経ちました。
Tarantoolは1.7系から1.8に移り、No SQLなのにSQLが使えるようになったりなど、着実に進化を続けています。

Tarantool本体はさておき、今回はTarantalkのメッセージキュー拡張、Tarantubeの解説をしてみます。

メッセージキュー(MQ)というとRabbitMQや、HornetQなどが有名ですが、Tarantoolのqueueはそれらに比べて軽量で、気軽に導入できるというところが特徴になっています。

なおTarantool本体の特徴については前々回の記事「SmalltalkからLuaをevalする」を参考にしてください。

インストール

Tarantoolにqueueモジュールを入れる

まずはTarantool自体にqueueモジュールを入れる必要があります。
Tarantoolには機能拡張のモジュールが豊富に用意されており、queueもその一つです。

入れるのは簡単で、管理用コマンドのtarantoolctlを使用します。

tarantoolctl rocks install queue

後はTarantoolのコンソールからrequireすると使えるようになります。

tarantool> box.cfg{listen = 3301}
tarantool> queue = require('queue')
tarantool> queue.stats()

スクリーンショット 2017-12-21 0.17.23.png

まだ何もしていないので、queue.stats()でstatisticsを表示させても空の状態です。

PharoにTarantubeを入れる

バックエンド側の準備ができたので、次にPharoにTarantubeを入れていきます。
Tarantubeは、オブジェクト指向なAPIでTarantoolのqueueが使えるようにしたものです。
といっても大げさなものではなく、わずか3つのクラスからできている軽量なライブラリです。

いつものMetacelloのスクリプトで入ります。(Playgroundで"Do it"です)

Metacello new
    smalltalkhubUser: 'Pharo' project: 'MetaRepoForPharo60';
    configuration: 'Tarantube';
    version: #stable;
    get; load.

あるいはCatalog Browser経由でも良いでしょう。デスクトップメニューの'Tools'->'Catalog Browser'で開き、検索窓に'tube'と打ちます。Tarantubeが見つかるので、選択して右クリックメニューで'Install stable version'を選びます。

スクリーンショット 2017-12-21 0.27.13.png

動かしてみる

Tarantoolをqueueモジュールを組み込んだ上で起動しましょう。
以下のようなconfig.luaを用意しておくと楽です。

config.lua
box.cfg{listen = 3301}
box.schema.user.create('taran', {password = 'talk', if_not_exists=true})
box.schema.user.grant('taran', 'read,write,execute', 'universe', nil, {if_not_exists=true})
queue = require('queue')
require('console').start()

詳しくは省きますが、3301ポートでクライアントからの接続を待ち受けるようにし、DBのユーザとしてtaranさんを登録、queueモジュールを入れて、対話型のコンソールを開くという処理になっています。

tarantool config.lua

とすると、tarantoolのコンソールが上がった状態になります。

スクリーンショット 2017-12-21 22.41.51.png

つないでみる

このTarantoolにTarantubeでつなぎ、queue機能を使ってみましょう。

tarantalk := TrTarantalk connect: 'taran:talk@localhost:3301'.
tarantalk tubes. "現在のtube一覧"

まだ"Print it"(Cmd + p)しても空の配列#()が返ってくるだけです。

FIFO キューの作成

'my_tube'という名前のキューを作成します。Tarantubeではキューのことをなぜかtubeと呼ぶのです。
"Do it"(Cmd + d)してみます。

"FIFOのキューを作成"
tube := tarantalk ensureTubeNamed: 'my_tube'.

tarantalk tubesを、再び"Print it"すると、今度はan Array(TrTube( id: 0 name: my_tube type: fifo))となりました。

キューにタスクを追加

キューにタスクを積んでみます。MessagePackでエンコードできる範囲で、任意のオブジェクトをキューに追加できるのですが、今回は簡単のため、文字列オブジェクトを入れていきます。

"キューにタスクを追加"
1 to: 10 do:  [:idx | tube putTaskWith: ('タスク ', idx asString)]. 

"Do it"後、おもむろにTarantoolのコンソールに戻って、queue.stats()してみましょう。

スクリーンショット 2017-12-21 22.57.23.png

putされたタスクが10件あるということがわかります。このタスクをluaから取り出しても良いですし、別のPharoイメージから取り出してもよいのです。永続化されているので、Tarantoolを落としても、キューは次回の起動で復活します。

キューからタスクを取り出す

タスクを取り出してみましょう。本来は別のPharoイメージでやったほうが気分が出ますが、ここでは上記のtubeをそのまま使います。

結果を見るため、"Do it"前にトランスクリプトを開けておいてください。
デスクトップメニューで'Tools'->'Transcript'です。

"定期的にタスクを処理(タイムアウトとして2秒を指定)"
tube repeatTakeTaskFor: 2 ifAvailable: [:task | Transcript cr; show: {task. task data}. task done].

実行するとこんな感じで表示されます。

スクリーンショット 2017-12-21 23.05.54.png

今後はstatisticsをPharo側から見てみましょう。"Inspect it"(Cmd + i)します。

tube statistics.

インスペクタが上がります。'calls'を辿ってみると、取得('take')が10回行われて、すべて無事完了('ack')したことがわかります。

スクリーンショット 2017-12-21 23.33.25.png

当然ながら、Tarantoolのコンソールのqueue.stats()の結果と同じになっています。

スクリーンショット 2017-12-21 23.09.35.png

非同期にタスクを積んでおいて、好きなタイミングで取り出して実行できるということが確認できました。

何に使っているのか?

TarantubeはALLSTOCKERという中古建設機械売買プラットフォームで、注文メールの送信などに使われています。Pharoで非同期に処理したいときは、なかなか役に立つやつなのです。

まとめ

Tarantubeの基本的な使い方を解説しました。Tarantubeについては、タスクの優先度や、ディレイやサブキューの指定など、まだまだ面白い機能があるのですが、それらについては26日のSmalltalk忘年会2017でお話する予定です。こちらもお楽しみに。