Edited at

RubyとMySQLで作られたジョブキューPerfectQueueを試してみる

More than 1 year has passed since last update.

TreasureData製の分散キュー PerfectQueueを使ってみます。

redisを利用したSidekiqやAWS SQSを利用したShoryukenのようにRubyで使える

ジョブキューはPerfectQueue以外にも存在していますが、PerfectQueueはキューの管理にMySQLを使っているところが特徴的です。


インストール

PerfectQueueはRubyで作られているので、通常のRubyのプロダクトと同じようにbundlerを用いてインストールを行います。

mkdir perfectqueue

cd ./perfectqueue
bundle init

Gemfileを作成した後に、以下の様にGemファイルを編集します。


Gemfile

# frozen_string_literal: true

source "https://rubygems.org"

gem "perfectqueue"
gem "json"
gem "mysql2"


Gemfileを編集が終わったあとはbundlerでインストールを実行します。

bundle install --path=./vendor/bundle

PerfectQueueはMySQLにキューを入れていくので、PerfectQueueで使用する

MySQLのデータベースを作成します。

mysql> create database perfectqueue;

Query OK, 1 row affected (0.00 sec)

PerfectQueueで使用するDBの設定は、config/perfectqueue.ymlに記述します。


config/perfectqueue.yml

development:

type: rdb_compat
url: mysql2://root:@localhost:3306/perfectqueue
table: queues
processors: 1

DBの設定が完了したら、perfectqueueのinitコマンドでMySQLにqueue用のテーブル

を作成します。

$ bundle exec perfectqueue init

MySQL上でテーブルの確認を行うと、queue用のテーブルは以下のようになっています。

mysql> use perfectqueue;

mysql> desc queues;
+-------------+---------------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+---------------------+------+-----+---------+-------+
| id | varchar(255) | NO | PRI | NULL | |
| timeout | int(11) | NO | MUL | NULL | |
| data | longblob | NO | | NULL | |
| created_at | int(11) | YES | | NULL | |
| resource | varchar(255) | YES | | NULL | |
| max_running | int(11) | YES | | NULL | |
| owner | bigint(21) unsigned | NO | | 0 | |
+-------------+---------------------+------+-----+---------+-------+


CLIでのキューの投入

PerfectQueueのインストールが完了したので実際のキューを導入してみます。

$ bundle exec perfectqueue submit k1 my_task '{"uid":1}' -u user_1

perfectqueue submitがキューを投入するためのコマンドになります。

それ以降の引数はid、タスクのタイプ、データの順に記述します。

キューの投入が完了したら、MySQLでqueueテーブルを検索してみます。

mysql> select * from queues;

+----+------------+------------------------------+------------+----------+-------------+-------+
| id | timeout | data | created_at | resource | max_running | owner |
+----+------------+------------------------------+------------+----------+-------------+-------+
| k1 | 1479699578 | {"uid":1,"type":"my_task"} | 1479699578 | user_1 | NULL | 0 |
+----+------------+------------------------------+------------+----------+-------------+-------+

perfectqueueのコマンドでもキューの中身を確認することが出来ます。

簡単なコマンドなので、ruby以外のプログラムからもシステムコールでキューを投入する

ことは容易だと思います。

その場合は、gem install perfectqueueを行って、bundlerなしで、perfectqueueを実行できる

環境にしておいたほうがいいでしょう。

$ bundle exec perfectqueue list

key type user status created_at timeout data
k1 my_task user_1 waiting 2016-11-21 12:39:38 +0900 2016-11-21 12:39:38 +0900 {"uid"=>1}
1 entries.


Rubyでのキューの投入

キューの投入をrubyからでも試してみます。


enqueue.rb

require 'bundler'

Bundler.require
require 'yaml'

environment = "development"

configration = YAML.load_file(File.expand_path(File.join(File.dirname(__FILE__),"..","config","perfectqueue.yml")))[environment]
queue = PerfectQueue.open(configration)

queue.submit("k2", "my_task", {uid:'2'},{user:"user_2"})


queue.submitがキューの投入を実行するメソッドになります。

引数の順番はperfectqueueコマンドのときと同じです。

第4引数のオプションにはuserの他に、now,run_at,max_runnning,compression

を使うことが出来ます。

nowとrun_atはqueueテーブルのtimeoutに値が入り、max_runningはmax_runningに値が入ります。

簡単にキューの投入を行えますが、注意するべきこととしては、先ほどのqueueテーブルの

構造でもわかる通りidがユニークになっていることです。そのために同じkeyのキューを

投入しようとするとエラーになってしまいます。

$ bundle exec ruby ./app/enqueue.rb 

disconnects current connection: task key=k2 already exists

そのためにキューを投入するためのidは重複しないように実行するhostの名前で

あったり、導入する時刻、あるいはSecureRandom.hexなどの関数をつかって

idをユニークにする必要があります。


ワーカーの実装

PerfectQueueでは、投入されたキューを処理ためには、キューの取り出しを行う

dispatcherとキューに応じた処理を行うhandlerの2つの仕組みでキューを処理してきます。

まずはdispatcherの記述をおこないます。


app/workers/dispatch.rb

Dir[File.join(File.dirname(__FILE__), 'dispatch', '*.rb')].each {|file| require file }

class Dispatch < PerfectQueue::Application::Dispatch
# describe routing
route "type1" => TestHandler
route /^regexp-.*$/ => :TestHandler
route "my_task" => MyTaskHandler
end


routeで先ほどキューを投入するときに指定したキューのタイプとそれに処理する

handlerを指定します。

handlerは以下の様に記述します。


app/workers/dispatch/my_task_handler.rb

class MyTaskHandler < PerfectQueue::Application::Base

# implement run method
def run
# do something ...
puts "acquired task: #{task.inspect}"

# call task.finish!, task.retry! or task.release!
task.finish!
end
end


キューの投入の際に指定したデータはtask.dataメソッドで取り出すことが出来ます。


PerfectQueueの実行

キューの実行は以下のコマンドで実行します。

bundle exec perfectqueue run -I. -rapp/workers/dispatch Dispatch

キューの実行が完了した状態のmysqlのテーブルです。

mysql> select * from queues;

+---------+------------+------------------------------------+------------+----------+-------------+-------+
| id | timeout | data | created_at | resource | max_running | owner |
+---------+------------+------------------------------------+------------+----------+-------------+-------+
| k1 | 479709237 | {"uid":1,"type":"my_task"} | NULL | NULL | NULL | 0 |
| k2 | 479709237 | {"uid":"2","type":"my_task"} | NULL | NULL | NULL | 0 |
+---------+------------+------------------------------------+------------+----------+-------------+-------+

created_atやreourceがNULLに更新され、timeoutも更新されています。

workerプロセスを複数起動したい場合は、perfectqueue.ymlにprocessors

の項目を追加します。


config/perfectqueue.yml

development:

type: rdb_compat
url: mysql2://root:@localhost:3306/perfectqueue
table: queues
processors: 2

この状態でPerfectQueueを起動してプロセスを確認すると以下のように

2つのプロセスが動いていることが確認できます。

$ ps aux | grep perfectqueue

ec2-user 9478 5.8 2.9 297856 30392 pts/2 Sl+ 15:58 0:00 ruby2.2 /home/ec2-user/projects/perfectqueue/vendor/bundle/ruby/2.2/bin/perfectqueue run -I. -rapp/workers/dispatch Dispatch
ec2-user 9481 0.0 2.5 298020 26324 pts/2 Sl+ 15:58 0:00 perfectqueue-supervisor:Dispatch
ec2-user 9486 0.2 2.9 385320 30548 pts/2 Sl+ 15:58 0:00 perfectqueue:Dispatch 1
ec2-user 9491 0.2 2.9 385188 30532 pts/2 Sl+ 15:58 0:00 perfectqueue:Dispatch 2


まとめ

駆け足ではありましたが、PerfectQueueの使い方を見てきました。

Amazon RDSのようなMySQLのマネージドサービスを使えばMySQLもすぐに準備できるので、

Railsは使っていないけど、Rubyが使える環境で、ジョブキューの仕組みを使いたいといった

状況ではPerfectQueueは最適ではないかと思います。

またキューを投入した状態のMySQLをdumpしておけば、キューを処理した後でもリストアして再度

キューを処理することができるので、handlerの実装と動作確認も簡単に行うことができます。