動機
Sidekiqのジョブを定期的に実行したい場合、crontabでrails runner 'MyWorker.perform_async'
みたいなプロセスを起動するケースが多いと思いますが、crontabを実行するサーバがSPOFになってしまったりと、運用上悩ましい点があります。
そこで、AWS環境を前提として、
CloudWatch Events -> Lambda(Python) -> Redis -> Sidekiq
という具合に、サーバーレスな仕組みを構築することを考えました。
その過程で、「Redisに何を書き込めばSidekiqがdequeueしてくれるのか?」、という点を解明する必要があったので、Sidekiqを動かしながら実際にRedisの中身を覗いてみました。
なお、言語によってはSidekiq互換のクライアントライブラリが存在するので、それらを利用するのが早いと思います。
前提
- Rails 5.1.3
- Sidekiq 5.0.4
- より一般化した情報を得たいため、
redis-namespace
を有効にする - ActiveJobは使わない
- 最後に補足します
- peform_asyncを想定
- Scheduled Jobは別の機会に調べたい
Sidekiq.configure_server do |config|
config.redis = { url: 'redis://localhost:6379', namespace: 'myapp' }
end
Sidekiq.configure_client do |config|
config.redis = { url: 'redis://localhost:6379', namespace: 'myapp' }
end
:concurrency: 5
:queues:
- default
- [myqueue, 2]
keyの調査
まずはsidekiq起動直後の状態で、Redisのkeyを確認してみます。
longcat.local
はマシン名です。
127.0.0.1:6379> keys *
1) "myapp:processes"
2) "myapp:stat:failed"
3) "myapp:stat:processed:2017-08-05"
4) "myapp:stat:processed"
5) "myapp:stat:failed:2017-08-05"
6) "myapp:longcat.local:72372:3d6ca4888e6f"
7) "myapp:queues"
なお、namespace
の指定がない場合は、processes
やqueues
のように、プレフィックスの無いkeyになります。
それでは、適当なWorkerを実装してジョブを投入してみます。
class SleepWorker
include Sidekiq::Worker
sidekiq_options queue: :default, retry: false
def perform(time_to_sleep)
sleep(time_to_sleep)
end
end
$ bundle exec rails runner '3.times { SleepWorker.perform_async(3600) }'
ジョブ投入後、keyが増えていることが分かります。
127.0.0.1:6379> keys *
1) "myapp:processes"
2) "myapp:stat:failed"
3) "myapp:stat:processed:2017-08-05"
4) "myapp:stat:processed"
5) "myapp:longcat.local:72372:3d6ca4888e6f:workers"
6) "myapp:stat:failed:2017-08-05"
7) "myapp:longcat.local:72372:3d6ca4888e6f"
8) "myapp:queues"
ここまで確認できたkeyを分類すると、次の5つに分類できそうです。
${NAMESPACE}:processes
${NAMESPACE}:${HOSTNAME}:${PID}:${HASH}
${NAMESPACE}:${HOSTNAME}:${PID}:${HASH}:workers
${NAMESPACE}:queues
${NAMESPACE}:stat:*
以降、名前空間を表す${NAMESPACE}:
は省略して記述します。
queues
queues
の型を調べてみます。
127.0.0.1:6379> type myapp:queues
set
queues
はSetなので、smembers
で中身を覗いてみます。
127.0.0.1:6379> smembers myapp:queues
1) "default"
定義済みのキューの名前の集合が格納されているようです。
config/sidekiq.yml
にはdefault以外のキュー(myqueue)も定義しているのですが、格納されていませんね。
そこで、別のキュー(myqueue)に意図的にジョブを投入してみます。
$ bundle exec rails runner 'SleepWorker.set(queue: :myqueue).perform_async(3600)'
無事にmyqueueも登場しました。
127.0.0.1:6379> smembers myapp:queues
1) "myqueue"
2) "default"
processes
processes
の型を調べてみます。
127.0.0.1:6379> type myapp:processes
set
processes
はSetなので、smembers
で中身を覗いてみます。
127.0.0.1:6379> smembers myapp:processes
1) "longcat.local:72372:3d6ca4888e6f"
実行中のSidekiqのプロセス単位でkeyが存在するようで、
-
72372
はプロセスID -
longcat.local
はホスト名 -
3d6ca4888e6f
は謎のハッシュ
謎のハッシュの正体はSidekiqのソースを読めば分かると思いますが、今のところ重要ではなさそうなので無視します。
processes
に格納された${HOSTNAME}:${PID}:${HASH}
形式の文字列ですが、同系列のkeyが存在しているので、次はそのkeyについて調査します。
${HOSTNAME}:${PID}:${HASH}
${HOSTNAME}:${PID}:${HASH}
の型を調べてみます。
127.0.0.1:6379> type myapp:longcat.local:72372:3d6ca4888e6f
hash
${HOSTNAME}:${PID}:${HASH}
の型はHashなので、hgetall
で中身を覗いてみます。
127.0.0.1:6379> hgetall myapp:longcat.local:72372:3d6ca4888e6f
1) "busy"
2) "3"
3) "quiet"
4) "false"
5) "beat"
6) "1501901472.00442"
7) "info"
8) "{\"hostname\":\"longcat.local\",\"started_at\":1501900616.115412,\"pid\":72372,\"tag\":\"MyApp\",\"concurrency\":5,\"queues\":[\"default\",\"myqueue\"],\"labels\":[],\"identity\":\"longcat.local:72372:3d6ca4888e6f\"}"
稼働中のSidekiqプロセスの詳細な情報のようです。
${HOSTNAME}:${PID}:${HASH}:workers
${HOSTNAME}:${PID}:${HASH}:workers
の型を調べてみます。
127.0.0.1:6379> type myapp:longcat.local:72372:3d6ca4888e6f:workers
hash
${HOSTNAME}:${PID}:${HASH}:workers
の型はHashなので、hgetall
で中身を覗いてみます。
127.0.0.1:6379> hgetall myapp:longcat.local:72372:3d6ca4888e6f:workers
1) "ov7g1wja0"
2) "{\"queue\":\"default\",\"payload\":{\"class\":\"SleepWorker\",\"args\":[3600],\"retry\":false,\"queue\":\"default\",\"jid\":\"d4c8f8f6759874ffef20a93a\",\"created_at\":1501900585.634248,\"enqueued_at\":1501900585.634562},\"run_at\":1501900616}"
3) "ov7g1wi88"
4) "{\"queue\":\"default\",\"payload\":{\"class\":\"SleepWorker\",\"args\":[3600],\"retry\":false,\"queue\":\"default\",\"jid\":\"7b7436c9e3e336fc9deaf40f\",\"created_at\":1501900578.189331,\"enqueued_at\":1501900578.189852},\"run_at\":1501900616}"
5) "ov7g1wikg"
6) "{\"queue\":\"default\",\"payload\":{\"class\":\"SleepWorker\",\"args\":[3600],\"retry\":false,\"queue\":\"default\",\"jid\":\"179e5481af82ab17f2b25538\",\"created_at\":1501900637.775475,\"enqueued_at\":1501900637.7757661},\"run_at\":1501900637}"
実行中のジョブの情報がHashとして格納されているようです。
hashのキーは実行中のスレッドのID(ログにTID-ov7g1wja0
みたいに出てくるやつ)で、値はジョブのJSON表現となっています。
{
"queue": "default",
"payload": {
"class": "SleepWorker",
"args": [
3600
],
"retry": false,
"queue": "default",
"jid": "d4c8f8f6759874ffef20a93a",
"created_at": 1501900585.6342,
"enqueued_at": 1501900585.6346
},
"run_at": 1501900616
}
現在、concurrency = 5
なので、あと3つジョブを投入して、処理待ちのジョブを作り出してみます。
$ bundle exec rails runner '3.times { SleepWorker.perform_async(3600) }'
その後、keyを確認すると、myapp:queue:default
が増えています。
待ち行列を扱うためのkeyだと考えられます。
127.0.0.1:6379> keys *
1) "myapp:processes"
2) "myapp:queue:default"
3) "myapp:stat:failed"
4) "myapp:stat:processed:2017-08-05"
5) "myapp:stat:processed"
6) "myapp:longcat.local:72372:3d6ca4888e6f:workers"
7) "myapp:stat:failed:2017-08-05"
8) "myapp:longcat.local:72372:3d6ca4888e6f"
9) "myapp:queues"
queue:${QUEUE}
queue:${QUEUE}
の型を調べてみます。
127.0.0.1:6379> type myapp:queue:default
list
queue:${QUEUE}
の型はListなので、lrange
で中身を覗いてみます。
127.0.0.1:6379> lrange myapp:queue:default 0 -1
1) "{\"class\":\"SleepWorker\",\"args\":[3600],\"retry\":false,\"queue\":\"default\",\"jid\":\"10852e13a6d9846d8a95b0e7\",\"created_at\":1501902132.849817,\"enqueued_at\":1501902132.849853}"
処理待ちのジョブの情報が、JSON文字列のリストとして格納されているようです。
**「Redisに何を書き込めばSidekiqがdequeueしてくれるのか?」**という当初の疑問に対しては、かなり核心的な情報にまで到達できました。
つまり、queue:${QUEUE}
というListに対して、次のような形式のJSON文字列をPUSHしてやれば良いのでは、という仮説が得られました。
{
"class": "SleepWorker",
"args": [
3600
],
"retry": false,
"queue": "default",
"jid": "10852e13a6d9846d8a95b0e7",
"created_at": 1501902132.8498,
"enqueued_at": 1501902132.8499
}
class,args,retry,queueに指定すべき値は想像がつきますが、その他の値については、Sidekiqのソースを読んだところ、次のような値を指定すれば良さそうです。
- jid:
SecureRandom.hex(12)
- created_at:
Time.now.to_f
- enqueued_at:
Time.now.to_f
なお、キューが空の場合は、keysでqueue:${QUEUE}
は表示されないので注意が必要です。
retry
意図的に失敗するWorkerを実装して、ジョブキューに投入してみます。
class FailureWorker
include Sidekiq::Worker
sidekiq_options queue: :default, retry: 3
def perform
raise StandardError
end
end
$ bundle exec rails runner 'FailureWorker.perform_async'
keyを確認すると、myapp:retry
というkeyが追加されています。
127.0.0.1:6379> keys *
1) "myapp:retry"
2) "myapp:processes"
3) "myapp:longcat.local:73839:1b7f42fe816a"
4) "myapp:stat:failed"
5) "myapp:stat:processed:2017-08-05"
6) "myapp:stat:processed"
7) "myapp:stat:failed:2017-08-05"
8) "myapp:queues"
9) "myapp:longcat.local:73839:1b7f42fe816a:workers"
retry
の型を調べてみます。
127.0.0.1:6379> type myapp:retry
zset
retry
の型はZSet(ソート済みSet)のようなので、zrange
で中身を覗いてみます。
127.0.0.1:6379> zrange myapp:retry 0 -1
1) "{\"class\":\"FailureWorker\",\"args\":[],\"retry\":3,\"queue\":\"default\",\"jid\":\"953bfc8a85a0ef39b446f16d\",\"created_at\":1501904319.945594,\"enqueued_at\":1501904537.833684,\"error_message\":\"StandardError\",\"error_class\":\"StandardError\",\"failed_at\":1501904465.9930282,\"retry_count\":2,\"retried_at\":1501904537.835155}"
リトライ中のジョブの情報がJSON文字列の集合(ソート済み)として格納されているようです。
JSONの中には、queue:${QUEUE}
で初期化された情報に加えて、エラーやリトライに関する情報が追加されています。
{
"class": "FailureWorker",
"args": [
],
"retry": 3,
"queue": "default",
"jid": "953bfc8a85a0ef39b446f16d",
"created_at": 1501904319.9456,
"enqueued_at": 1501904537.8337,
"error_message": "StandardError",
"error_class": "StandardError",
"failed_at": 1501904465.993,
"retry_count": 2,
"retried_at": 1501904537.8352
}
dead
限界までリトライされた頃に、改めてkeyを確認すると、myapp:dead
というkeyが追加されています。
127.0.0.1:6379> keys *
1) "myapp:processes"
2) "myapp:longcat.local:73839:1b7f42fe816a"
3) "myapp:dead"
4) "myapp:stat:failed"
5) "myapp:stat:processed:2017-08-05"
6) "myapp:stat:processed"
7) "myapp:stat:failed:2017-08-05"
8) "myapp:queues"
9) "myapp:longcat.local:73839:1b7f42fe816a:workers"
dead
の型を調べてみます。
127.0.0.1:6379> type myapp:dead
zset
dead
の型はZSet(ソート済みSet)のようなので、zrange
で中身を覗いてみます。
127.0.0.1:6379> zrange myapp:dead 0 -1
1) "{\"class\":\"FailureWorker\",\"args\":[],\"retry\":3,\"queue\":\"default\",\"jid\":\"953bfc8a85a0ef39b446f16d\",\"created_at\":1501904319.945594,\"enqueued_at\":1501904660.428597,\"error_message\":\"StandardError\",\"error_class\":\"StandardError\",\"failed_at\":1501904465.9930282,\"retry_count\":3,\"retried_at\":1501904660.430109}"
JSONの中身は、retry
に格納されていたJSONと同じです。
{
"class": "FailureWorker",
"args": [
],
"retry": 3,
"queue": "default",
"jid": "953bfc8a85a0ef39b446f16d",
"created_at": 1501904319.9456,
"enqueued_at": 1501904660.4286,
"error_message": "StandardError",
"error_class": "StandardError",
"failed_at": 1501904465.993,
"retry_count": 3,
"retried_at": 1501904660.4301
}
stat:*
もう力尽きたので調べませんでしたが、名前から推察するにstat:*
系のkeyは、統計情報を格納するためのものだと思われます。
Redisに直接enqueueする
これまでの考察から、queue:${QUEUE}にJSONをPUSHすることで、Sidekiqのクライアントを使わずにenqueueが出来そうです。
そこで、実験してみます。
> lpush myapp:queue:default "{\"class\":\"SleepWorker\",\"args\":[3600],\"retry\":false,\"queue\":\"default\",\"jid\":\"0b34564dbb2dcd63ec644b16\",\"created_at\":1501906533.288397,\"enqueued_at\":1501906533.288397}"
キタ━━━━(゚∀゚)━━━━!!
2017-08-05T04:19:45.626Z 73839 TID-ouxj1c6uk SleepWorker JID-0b34564dbb2dcd63ec644b16 INFO: start
Redisに書き込まれる値のまとめ(stat・schedule除く)
key | type | 内容 |
---|---|---|
queues |
Set | キュー名の集合 |
processes |
Set | プロセスの集合 |
${HOSTNAME}:${PID}:${HASH} |
Hash | 各プロセスの詳細情報 |
${HOSTNAME}:${PID}:${HASH}:workers |
Hash | 各プロセスで処理中のジョブ |
queue:${QUEUE} |
List | 各キューのジョブ待ち行列 |
retry |
ZSet | リトライ中のジョブ |
dead |
ZSet | Dead Job Queue |
[補足] ActiveJob
ActiveJobがインタフェースのデファクトスタンダードになっていることから、ActiveJobを使う場合も見ておきます。
...
module MyApp
class Application < Rails::Application
...
config.active_job.queue_adapter = :sidekiq
end
end
class SleepJob < ApplicationJob
queue_as :default
def perform(time_to_sleep)
sleep(time_to_sleep)
end
end
このような設定でJobを実装してジョブキューに投入し、キューの中身をRedisで覗いてみます。
$ bundle exec rails runner '6.times { SleepJob.perform_later(3600) }'
127.0.0.1:6379> lrange myapp:queue:default 0 -1
1) "{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",\"wrapped\":\"SleepJob\",\"queue\":\"default\",\"args\":[{\"job_class\":\"SleepJob\",\"job_id\":\"49f612ef-233a-4660-b9e7-cc48a80ee7d4\",\"provider_job_id\":null,\"queue_name\":\"default\",\"priority\":null,\"arguments\":[3600],\"executions\":0,\"locale\":\"en\"}],\"retry\":true,\"jid\":\"1e7c528c1c5727e0d46a7786\",\"created_at\":1501907321.086514,\"enqueued_at\":1501907321.0865438}"
JSONの構造は以下のようになっており、生Sidekiqの場合と比較して、何個かキーが追加されています。
{
"class": "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper",
"wrapped": "SleepJob",
"queue": "default",
"args": [
{
"job_class": "SleepJob",
"job_id": "49f612ef-233a-4660-b9e7-cc48a80ee7d4",
"provider_job_id": null,
"queue_name": "default",
"priority": null,
"arguments": [
3600
],
"executions": 0,
"locale": "en"
}
],
"retry": true,
"jid": "1e7c528c1c5727e0d46a7786",
"created_at": 1501907321.0865,
"enqueued_at": 1501907321.0865
}
もう力尽きたので、ActiveJobの詳細はまた今度調べたいと思います。