10
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Ateam引越し侍Advent Calendar 2016

Day 1

RubyとMySQLで作られたジョブキューPerfectQueueを試してみる

Last updated at Posted at 2016-11-30

TreasureData製の分散キュー PerfectQueueを使ってみます。
redisを利用したSidekiqやAWS SQSを利用したShoryukenのようにRubyで使える
ジョブキューはPerfectQueue以外にも存在していますが、PerfectQueueはキューの管理にMySQLを使っているところが特徴的です。

インストール

PerfectQueueはRubyで作られているので、通常のRubyのプロダクトと同じようにbundlerを用いてインストールを行います。

mkdir perfectqueue
cd ./perfectqueue
bundle init

Gemfileを作成した後に、以下の様にGemファイルを編集します。

Gemfile
# frozen_string_literal: true
source "https://rubygems.org"

gem "perfectqueue"
gem "json"
gem "mysql2"

Gemfileを編集が終わったあとはbundlerでインストールを実行します。

bundle install --path=./vendor/bundle

PerfectQueueはMySQLにキューを入れていくので、PerfectQueueで使用する
MySQLのデータベースを作成します。

mysql> create database perfectqueue;
Query OK, 1 row affected (0.00 sec)

PerfectQueueで使用するDBの設定は、config/perfectqueue.ymlに記述します。

config/perfectqueue.yml
development:
  type: rdb_compat
  url: mysql2://root:@localhost:3306/perfectqueue
  table: queues
  processors: 1

DBの設定が完了したら、perfectqueueのinitコマンドでMySQLにqueue用のテーブル
を作成します。

$ bundle exec perfectqueue init

MySQL上でテーブルの確認を行うと、queue用のテーブルは以下のようになっています。

mysql> use perfectqueue;
mysql> desc queues;
+-------------+---------------------+------+-----+---------+-------+
| Field       | Type                | Null | Key | Default | Extra |
+-------------+---------------------+------+-----+---------+-------+
| id          | varchar(255)        | NO   | PRI | NULL    |       |
| timeout     | int(11)             | NO   | MUL | NULL    |       |
| data        | longblob            | NO   |     | NULL    |       |
| created_at  | int(11)             | YES  |     | NULL    |       |
| resource    | varchar(255)        | YES  |     | NULL    |       |
| max_running | int(11)             | YES  |     | NULL    |       |
| owner       | bigint(21) unsigned | NO   |     | 0       |       |
+-------------+---------------------+------+-----+---------+-------+

CLIでのキューの投入

PerfectQueueのインストールが完了したので実際のキューを導入してみます。

$ bundle exec perfectqueue submit k1 my_task '{"uid":1}' -u user_1

perfectqueue submitがキューを投入するためのコマンドになります。
それ以降の引数はid、タスクのタイプ、データの順に記述します。
キューの投入が完了したら、MySQLでqueueテーブルを検索してみます。

mysql> select * from queues;
+----+------------+------------------------------+------------+----------+-------------+-------+
| id | timeout    | data                         | created_at | resource | max_running | owner |
+----+------------+------------------------------+------------+----------+-------------+-------+
| k1 | 1479699578 | {"uid":1,"type":"my_task"} | 1479699578 | user_1   |        NULL |     0 |
+----+------------+------------------------------+------------+----------+-------------+-------+

perfectqueueのコマンドでもキューの中身を確認することが出来ます。
簡単なコマンドなので、ruby以外のプログラムからもシステムコールでキューを投入する
ことは容易だと思います。
その場合は、gem install perfectqueueを行って、bundlerなしで、perfectqueueを実行できる
環境にしておいたほうがいいでしょう。

$ bundle exec perfectqueue list
                           key            type               user             status                   created_at                      timeout   data
                            k1         my_task             user_1            waiting    2016-11-21 12:39:38 +0900    2016-11-21 12:39:38 +0900   {"uid"=>1}
1 entries.

Rubyでのキューの投入

キューの投入をrubyからでも試してみます。

enqueue.rb
require 'bundler'
Bundler.require
require 'yaml'

environment = "development"

configration = YAML.load_file(File.expand_path(File.join(File.dirname(__FILE__),"..","config","perfectqueue.yml")))[environment]
queue = PerfectQueue.open(configration)

queue.submit("k2", "my_task", {uid:'2'},{user:"user_2"})

queue.submitがキューの投入を実行するメソッドになります。
引数の順番はperfectqueueコマンドのときと同じです。
第4引数のオプションにはuserの他に、now,run_at,max_runnning,compression
を使うことが出来ます。
nowとrun_atはqueueテーブルのtimeoutに値が入り、max_runningはmax_runningに値が入ります。
簡単にキューの投入を行えますが、注意するべきこととしては、先ほどのqueueテーブルの
構造でもわかる通りidがユニークになっていることです。そのために同じkeyのキューを
投入しようとするとエラーになってしまいます。

$ bundle exec ruby ./app/enqueue.rb 
disconnects current connection: task key=k2 already exists

そのためにキューを投入するためのidは重複しないように実行するhostの名前で
あったり、導入する時刻、あるいはSecureRandom.hexなどの関数をつかって
idをユニークにする必要があります。

ワーカーの実装

PerfectQueueでは、投入されたキューを処理ためには、キューの取り出しを行う
dispatcherとキューに応じた処理を行うhandlerの2つの仕組みでキューを処理してきます。
まずはdispatcherの記述をおこないます。

app/workers/dispatch.rb
Dir[File.join(File.dirname(__FILE__), 'dispatch', '*.rb')].each {|file| require file }

class Dispatch < PerfectQueue::Application::Dispatch
  # describe routing
  route "type1" => TestHandler
  route /^regexp-.*$/ => :TestHandler
  route "my_task" => MyTaskHandler
end

routeで先ほどキューを投入するときに指定したキューのタイプとそれに処理する
handlerを指定します。
handlerは以下の様に記述します。

app/workers/dispatch/my_task_handler.rb
class MyTaskHandler < PerfectQueue::Application::Base
  # implement run method
  def run
    # do something ...
    puts "acquired task: #{task.inspect}"

    # call task.finish!, task.retry! or task.release!
    task.finish!
  end
end

キューの投入の際に指定したデータはtask.dataメソッドで取り出すことが出来ます。

PerfectQueueの実行

キューの実行は以下のコマンドで実行します。

bundle exec perfectqueue run -I. -rapp/workers/dispatch Dispatch

キューの実行が完了した状態のmysqlのテーブルです。

mysql> select * from queues;
+---------+------------+------------------------------------+------------+----------+-------------+-------+
| id      | timeout    | data                               | created_at | resource | max_running | owner |
+---------+------------+------------------------------------+------------+----------+-------------+-------+
| k1      |  479709237 | {"uid":1,"type":"my_task"}         |       NULL | NULL     |        NULL |     0 |
| k2      |  479709237 | {"uid":"2","type":"my_task"}       |       NULL | NULL     |        NULL |     0 |
+---------+------------+------------------------------------+------------+----------+-------------+-------+

created_atやreourceがNULLに更新され、timeoutも更新されています。
workerプロセスを複数起動したい場合は、perfectqueue.ymlにprocessors
の項目を追加します。

config/perfectqueue.yml
development:
  type: rdb_compat
  url: mysql2://root:@localhost:3306/perfectqueue
  table: queues
  processors: 2

この状態でPerfectQueueを起動してプロセスを確認すると以下のように
2つのプロセスが動いていることが確認できます。

$ ps aux | grep perfectqueue
ec2-user  9478  5.8  2.9 297856 30392 pts/2    Sl+  15:58   0:00 ruby2.2 /home/ec2-user/projects/perfectqueue/vendor/bundle/ruby/2.2/bin/perfectqueue run -I. -rapp/workers/dispatch Dispatch
ec2-user  9481  0.0  2.5 298020 26324 pts/2    Sl+  15:58   0:00 perfectqueue-supervisor:Dispatch                                                                                            
ec2-user  9486  0.2  2.9 385320 30548 pts/2    Sl+  15:58   0:00 perfectqueue:Dispatch 1                                                                                                     
ec2-user  9491  0.2  2.9 385188 30532 pts/2    Sl+  15:58   0:00 perfectqueue:Dispatch 2 

まとめ

駆け足ではありましたが、PerfectQueueの使い方を見てきました。
Amazon RDSのようなMySQLのマネージドサービスを使えばMySQLもすぐに準備できるので、
Railsは使っていないけど、Rubyが使える環境で、ジョブキューの仕組みを使いたいといった
状況ではPerfectQueueは最適ではないかと思います。
またキューを投入した状態のMySQLをdumpしておけば、キューを処理した後でもリストアして再度
キューを処理することができるので、handlerの実装と動作確認も簡単に行うことができます。

10
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?