TreasureData製の分散キュー PerfectQueueを使ってみます。
redisを利用したSidekiqやAWS SQSを利用したShoryukenのようにRubyで使える
ジョブキューはPerfectQueue以外にも存在していますが、PerfectQueueはキューの管理にMySQLを使っているところが特徴的です。
インストール
PerfectQueueはRubyで作られているので、通常のRubyのプロダクトと同じようにbundlerを用いてインストールを行います。
mkdir perfectqueue
cd ./perfectqueue
bundle init
Gemfileを作成した後に、以下の様にGemファイルを編集します。
# frozen_string_literal: true
source "https://rubygems.org"
gem "perfectqueue"
gem "json"
gem "mysql2"
Gemfileを編集が終わったあとはbundlerでインストールを実行します。
bundle install --path=./vendor/bundle
PerfectQueueはMySQLにキューを入れていくので、PerfectQueueで使用する
MySQLのデータベースを作成します。
mysql> create database perfectqueue;
Query OK, 1 row affected (0.00 sec)
PerfectQueueで使用するDBの設定は、config/perfectqueue.ymlに記述します。
development:
type: rdb_compat
url: mysql2://root:@localhost:3306/perfectqueue
table: queues
processors: 1
DBの設定が完了したら、perfectqueueのinitコマンドでMySQLにqueue用のテーブル
を作成します。
$ bundle exec perfectqueue init
MySQL上でテーブルの確認を行うと、queue用のテーブルは以下のようになっています。
mysql> use perfectqueue;
mysql> desc queues;
+-------------+---------------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+---------------------+------+-----+---------+-------+
| id | varchar(255) | NO | PRI | NULL | |
| timeout | int(11) | NO | MUL | NULL | |
| data | longblob | NO | | NULL | |
| created_at | int(11) | YES | | NULL | |
| resource | varchar(255) | YES | | NULL | |
| max_running | int(11) | YES | | NULL | |
| owner | bigint(21) unsigned | NO | | 0 | |
+-------------+---------------------+------+-----+---------+-------+
CLIでのキューの投入
PerfectQueueのインストールが完了したので実際のキューを導入してみます。
$ bundle exec perfectqueue submit k1 my_task '{"uid":1}' -u user_1
perfectqueue submitがキューを投入するためのコマンドになります。
それ以降の引数はid、タスクのタイプ、データの順に記述します。
キューの投入が完了したら、MySQLでqueueテーブルを検索してみます。
mysql> select * from queues;
+----+------------+------------------------------+------------+----------+-------------+-------+
| id | timeout | data | created_at | resource | max_running | owner |
+----+------------+------------------------------+------------+----------+-------------+-------+
| k1 | 1479699578 | {"uid":1,"type":"my_task"} | 1479699578 | user_1 | NULL | 0 |
+----+------------+------------------------------+------------+----------+-------------+-------+
perfectqueueのコマンドでもキューの中身を確認することが出来ます。
簡単なコマンドなので、ruby以外のプログラムからもシステムコールでキューを投入する
ことは容易だと思います。
その場合は、gem install perfectqueueを行って、bundlerなしで、perfectqueueを実行できる
環境にしておいたほうがいいでしょう。
$ bundle exec perfectqueue list
key type user status created_at timeout data
k1 my_task user_1 waiting 2016-11-21 12:39:38 +0900 2016-11-21 12:39:38 +0900 {"uid"=>1}
1 entries.
Rubyでのキューの投入
キューの投入をrubyからでも試してみます。
require 'bundler'
Bundler.require
require 'yaml'
environment = "development"
configration = YAML.load_file(File.expand_path(File.join(File.dirname(__FILE__),"..","config","perfectqueue.yml")))[environment]
queue = PerfectQueue.open(configration)
queue.submit("k2", "my_task", {uid:'2'},{user:"user_2"})
queue.submitがキューの投入を実行するメソッドになります。
引数の順番はperfectqueueコマンドのときと同じです。
第4引数のオプションにはuserの他に、now,run_at,max_runnning,compression
を使うことが出来ます。
nowとrun_atはqueueテーブルのtimeoutに値が入り、max_runningはmax_runningに値が入ります。
簡単にキューの投入を行えますが、注意するべきこととしては、先ほどのqueueテーブルの
構造でもわかる通りidがユニークになっていることです。そのために同じkeyのキューを
投入しようとするとエラーになってしまいます。
$ bundle exec ruby ./app/enqueue.rb
disconnects current connection: task key=k2 already exists
そのためにキューを投入するためのidは重複しないように実行するhostの名前で
あったり、導入する時刻、あるいはSecureRandom.hexなどの関数をつかって
idをユニークにする必要があります。
ワーカーの実装
PerfectQueueでは、投入されたキューを処理ためには、キューの取り出しを行う
dispatcherとキューに応じた処理を行うhandlerの2つの仕組みでキューを処理してきます。
まずはdispatcherの記述をおこないます。
Dir[File.join(File.dirname(__FILE__), 'dispatch', '*.rb')].each {|file| require file }
class Dispatch < PerfectQueue::Application::Dispatch
# describe routing
route "type1" => TestHandler
route /^regexp-.*$/ => :TestHandler
route "my_task" => MyTaskHandler
end
routeで先ほどキューを投入するときに指定したキューのタイプとそれに処理する
handlerを指定します。
handlerは以下の様に記述します。
class MyTaskHandler < PerfectQueue::Application::Base
# implement run method
def run
# do something ...
puts "acquired task: #{task.inspect}"
# call task.finish!, task.retry! or task.release!
task.finish!
end
end
キューの投入の際に指定したデータはtask.dataメソッドで取り出すことが出来ます。
PerfectQueueの実行
キューの実行は以下のコマンドで実行します。
bundle exec perfectqueue run -I. -rapp/workers/dispatch Dispatch
キューの実行が完了した状態のmysqlのテーブルです。
mysql> select * from queues;
+---------+------------+------------------------------------+------------+----------+-------------+-------+
| id | timeout | data | created_at | resource | max_running | owner |
+---------+------------+------------------------------------+------------+----------+-------------+-------+
| k1 | 479709237 | {"uid":1,"type":"my_task"} | NULL | NULL | NULL | 0 |
| k2 | 479709237 | {"uid":"2","type":"my_task"} | NULL | NULL | NULL | 0 |
+---------+------------+------------------------------------+------------+----------+-------------+-------+
created_atやreourceがNULLに更新され、timeoutも更新されています。
workerプロセスを複数起動したい場合は、perfectqueue.ymlにprocessors
の項目を追加します。
development:
type: rdb_compat
url: mysql2://root:@localhost:3306/perfectqueue
table: queues
processors: 2
この状態でPerfectQueueを起動してプロセスを確認すると以下のように
2つのプロセスが動いていることが確認できます。
$ ps aux | grep perfectqueue
ec2-user 9478 5.8 2.9 297856 30392 pts/2 Sl+ 15:58 0:00 ruby2.2 /home/ec2-user/projects/perfectqueue/vendor/bundle/ruby/2.2/bin/perfectqueue run -I. -rapp/workers/dispatch Dispatch
ec2-user 9481 0.0 2.5 298020 26324 pts/2 Sl+ 15:58 0:00 perfectqueue-supervisor:Dispatch
ec2-user 9486 0.2 2.9 385320 30548 pts/2 Sl+ 15:58 0:00 perfectqueue:Dispatch 1
ec2-user 9491 0.2 2.9 385188 30532 pts/2 Sl+ 15:58 0:00 perfectqueue:Dispatch 2
まとめ
駆け足ではありましたが、PerfectQueueの使い方を見てきました。
Amazon RDSのようなMySQLのマネージドサービスを使えばMySQLもすぐに準備できるので、
Railsは使っていないけど、Rubyが使える環境で、ジョブキューの仕組みを使いたいといった
状況ではPerfectQueueは最適ではないかと思います。
またキューを投入した状態のMySQLをdumpしておけば、キューを処理した後でもリストアして再度
キューを処理することができるので、handlerの実装と動作確認も簡単に行うことができます。