Edited at

SidekiqはRedisに何を書き込んでいるのか

More than 1 year has passed since last update.


動機

Sidekiqのジョブを定期的に実行したい場合、crontabでrails runner 'MyWorker.perform_async'みたいなプロセスを起動するケースが多いと思いますが、crontabを実行するサーバがSPOFになってしまったりと、運用上悩ましい点があります。

そこで、AWS環境を前提として、

CloudWatch Events -> Lambda(Python) -> Redis -> Sidekiq

という具合に、サーバーレスな仕組みを構築することを考えました。

その過程で、「Redisに何を書き込めばSidekiqがdequeueしてくれるのか?」、という点を解明する必要があったので、Sidekiqを動かしながら実際にRedisの中身を覗いてみました。

なお、言語によってはSidekiq互換のクライアントライブラリが存在するので、それらを利用するのが早いと思います。


前提


  • Rails 5.1.3

  • Sidekiq 5.0.4

  • より一般化した情報を得たいため、redis-namespaceを有効にする

  • ActiveJobは使わない


    • 最後に補足します



  • peform_asyncを想定


    • Scheduled Jobは別の機会に調べたい




config/initializers/sidekiq.rb

Sidekiq.configure_server do |config|

config.redis = { url: 'redis://localhost:6379', namespace: 'myapp' }
end

Sidekiq.configure_client do |config|
config.redis = { url: 'redis://localhost:6379', namespace: 'myapp' }
end



config/sidekiq.yml

:concurrency: 5

:queues:
- default
- [myqueue, 2]


keyの調査

まずはsidekiq起動直後の状態で、Redisのkeyを確認してみます。

longcat.localはマシン名です。

127.0.0.1:6379> keys *

1) "myapp:processes"
2) "myapp:stat:failed"
3) "myapp:stat:processed:2017-08-05"
4) "myapp:stat:processed"
5) "myapp:stat:failed:2017-08-05"
6) "myapp:longcat.local:72372:3d6ca4888e6f"
7) "myapp:queues"

なお、namespaceの指定がない場合は、processesqueuesのように、プレフィックスの無いkeyになります。

それでは、適当なWorkerを実装してジョブを投入してみます。


app/workers/sleep_worker.rb

class SleepWorker

include Sidekiq::Worker
sidekiq_options queue: :default, retry: false

def perform(time_to_sleep)
sleep(time_to_sleep)
end
end


$ bundle exec rails runner '3.times { SleepWorker.perform_async(3600) }'

ジョブ投入後、keyが増えていることが分かります。

127.0.0.1:6379> keys *

1) "myapp:processes"
2) "myapp:stat:failed"
3) "myapp:stat:processed:2017-08-05"
4) "myapp:stat:processed"
5) "myapp:longcat.local:72372:3d6ca4888e6f:workers"
6) "myapp:stat:failed:2017-08-05"
7) "myapp:longcat.local:72372:3d6ca4888e6f"
8) "myapp:queues"

ここまで確認できたkeyを分類すると、次の5つに分類できそうです。


  • ${NAMESPACE}:processes

  • ${NAMESPACE}:${HOSTNAME}:${PID}:${HASH}

  • ${NAMESPACE}:${HOSTNAME}:${PID}:${HASH}:workers

  • ${NAMESPACE}:queues

  • ${NAMESPACE}:stat:*

以降、名前空間を表す${NAMESPACE}:は省略して記述します。


queues

queuesの型を調べてみます。

127.0.0.1:6379> type myapp:queues

set

queuesはSetなので、smembersで中身を覗いてみます。

127.0.0.1:6379> smembers myapp:queues

1) "default"

定義済みのキューの名前の集合が格納されているようです。

config/sidekiq.ymlにはdefault以外のキュー(myqueue)も定義しているのですが、格納されていませんね。

そこで、別のキュー(myqueue)に意図的にジョブを投入してみます。

$ bundle exec rails runner 'SleepWorker.set(queue: :myqueue).perform_async(3600)'

無事にmyqueueも登場しました。

127.0.0.1:6379> smembers myapp:queues

1) "myqueue"
2) "default"


processes

processesの型を調べてみます。

127.0.0.1:6379> type myapp:processes

set

processesはSetなので、smembersで中身を覗いてみます。

127.0.0.1:6379> smembers myapp:processes

1) "longcat.local:72372:3d6ca4888e6f"

実行中のSidekiqのプロセス単位でkeyが存在するようで、



  • 72372はプロセスID


  • longcat.localはホスト名


  • 3d6ca4888e6fは謎のハッシュ

謎のハッシュの正体はSidekiqのソースを読めば分かると思いますが、今のところ重要ではなさそうなので無視します。

processesに格納された${HOSTNAME}:${PID}:${HASH}形式の文字列ですが、同系列のkeyが存在しているので、次はそのkeyについて調査します。


${HOSTNAME}:${PID}:${HASH}

${HOSTNAME}:${PID}:${HASH}の型を調べてみます。

127.0.0.1:6379> type myapp:longcat.local:72372:3d6ca4888e6f

hash

${HOSTNAME}:${PID}:${HASH}の型はHashなので、hgetallで中身を覗いてみます。

127.0.0.1:6379> hgetall myapp:longcat.local:72372:3d6ca4888e6f

1) "busy"
2) "3"
3) "quiet"
4) "false"
5) "beat"
6) "1501901472.00442"
7) "info"
8) "{\"hostname\":\"longcat.local\",\"started_at\":1501900616.115412,\"pid\":72372,\"tag\":\"MyApp\",\"concurrency\":5,\"queues\":[\"default\",\"myqueue\"],\"labels\":[],\"identity\":\"longcat.local:72372:3d6ca4888e6f\"}"

稼働中のSidekiqプロセスの詳細な情報のようです。


${HOSTNAME}:${PID}:${HASH}:workers

${HOSTNAME}:${PID}:${HASH}:workersの型を調べてみます。

127.0.0.1:6379> type myapp:longcat.local:72372:3d6ca4888e6f:workers

hash

${HOSTNAME}:${PID}:${HASH}:workersの型はHashなので、hgetallで中身を覗いてみます。

127.0.0.1:6379> hgetall myapp:longcat.local:72372:3d6ca4888e6f:workers

1) "ov7g1wja0"
2) "{\"queue\":\"default\",\"payload\":{\"class\":\"SleepWorker\",\"args\":[3600],\"retry\":false,\"queue\":\"default\",\"jid\":\"d4c8f8f6759874ffef20a93a\",\"created_at\":1501900585.634248,\"enqueued_at\":1501900585.634562},\"run_at\":1501900616}"
3) "ov7g1wi88"
4) "{\"queue\":\"default\",\"payload\":{\"class\":\"SleepWorker\",\"args\":[3600],\"retry\":false,\"queue\":\"default\",\"jid\":\"7b7436c9e3e336fc9deaf40f\",\"created_at\":1501900578.189331,\"enqueued_at\":1501900578.189852},\"run_at\":1501900616}"
5) "ov7g1wikg"
6) "{\"queue\":\"default\",\"payload\":{\"class\":\"SleepWorker\",\"args\":[3600],\"retry\":false,\"queue\":\"default\",\"jid\":\"179e5481af82ab17f2b25538\",\"created_at\":1501900637.775475,\"enqueued_at\":1501900637.7757661},\"run_at\":1501900637}"

実行中のジョブの情報がHashとして格納されているようです。

hashのキーは実行中のスレッドのID(ログにTID-ov7g1wja0みたいに出てくるやつ)で、値はジョブのJSON表現となっています。

{

"queue": "default",
"payload": {
"class": "SleepWorker",
"args": [
3600
],
"retry": false,
"queue": "default",
"jid": "d4c8f8f6759874ffef20a93a",
"created_at": 1501900585.6342,
"enqueued_at": 1501900585.6346
},
"run_at": 1501900616
}

現在、concurrency = 5なので、あと3つジョブを投入して、処理待ちのジョブを作り出してみます。

$ bundle exec rails runner '3.times { SleepWorker.perform_async(3600) }'

その後、keyを確認すると、myapp:queue:defaultが増えています。

待ち行列を扱うためのkeyだと考えられます。

127.0.0.1:6379> keys *

1) "myapp:processes"
2) "myapp:queue:default"
3) "myapp:stat:failed"
4) "myapp:stat:processed:2017-08-05"
5) "myapp:stat:processed"
6) "myapp:longcat.local:72372:3d6ca4888e6f:workers"
7) "myapp:stat:failed:2017-08-05"
8) "myapp:longcat.local:72372:3d6ca4888e6f"
9) "myapp:queues"


queue:${QUEUE}

queue:${QUEUE}の型を調べてみます。

127.0.0.1:6379> type myapp:queue:default

list

queue:${QUEUE}の型はListなので、lrangeで中身を覗いてみます。

127.0.0.1:6379> lrange myapp:queue:default 0 -1

1) "{\"class\":\"SleepWorker\",\"args\":[3600],\"retry\":false,\"queue\":\"default\",\"jid\":\"10852e13a6d9846d8a95b0e7\",\"created_at\":1501902132.849817,\"enqueued_at\":1501902132.849853}"

処理待ちのジョブの情報が、JSON文字列のリストとして格納されているようです。

「Redisに何を書き込めばSidekiqがdequeueしてくれるのか?」という当初の疑問に対しては、かなり核心的な情報にまで到達できました。

つまり、queue:${QUEUE}というListに対して、次のような形式のJSON文字列をPUSHしてやれば良いのでは、という仮説が得られました。

{

"class": "SleepWorker",
"args": [
3600
],
"retry": false,
"queue": "default",
"jid": "10852e13a6d9846d8a95b0e7",
"created_at": 1501902132.8498,
"enqueued_at": 1501902132.8499
}

class,args,retry,queueに指定すべき値は想像がつきますが、その他の値については、Sidekiqのソースを読んだところ、次のような値を指定すれば良さそうです。


  • jid: SecureRandom.hex(12)

  • created_at: Time.now.to_f

  • enqueued_at: Time.now.to_f

なお、キューが空の場合は、keysでqueue:${QUEUE}は表示されないので注意が必要です。


retry

意図的に失敗するWorkerを実装して、ジョブキューに投入してみます。


app/workers/failure_worker.rb

class FailureWorker

include Sidekiq::Worker
sidekiq_options queue: :default, retry: 3

def perform
raise StandardError
end
end


$ bundle exec rails runner 'FailureWorker.perform_async'

keyを確認すると、myapp:retryというkeyが追加されています。

127.0.0.1:6379> keys *

1) "myapp:retry"
2) "myapp:processes"
3) "myapp:longcat.local:73839:1b7f42fe816a"
4) "myapp:stat:failed"
5) "myapp:stat:processed:2017-08-05"
6) "myapp:stat:processed"
7) "myapp:stat:failed:2017-08-05"
8) "myapp:queues"
9) "myapp:longcat.local:73839:1b7f42fe816a:workers"

retryの型を調べてみます。

127.0.0.1:6379> type myapp:retry

zset

retryの型はZSet(ソート済みSet)のようなので、zrangeで中身を覗いてみます。

127.0.0.1:6379> zrange myapp:retry 0 -1

1) "{\"class\":\"FailureWorker\",\"args\":[],\"retry\":3,\"queue\":\"default\",\"jid\":\"953bfc8a85a0ef39b446f16d\",\"created_at\":1501904319.945594,\"enqueued_at\":1501904537.833684,\"error_message\":\"StandardError\",\"error_class\":\"StandardError\",\"failed_at\":1501904465.9930282,\"retry_count\":2,\"retried_at\":1501904537.835155}"

リトライ中のジョブの情報がJSON文字列の集合(ソート済み)として格納されているようです。

JSONの中には、queue:${QUEUE}で初期化された情報に加えて、エラーやリトライに関する情報が追加されています。

{

"class": "FailureWorker",
"args": [

],
"retry": 3,
"queue": "default",
"jid": "953bfc8a85a0ef39b446f16d",
"created_at": 1501904319.9456,
"enqueued_at": 1501904537.8337,
"error_message": "StandardError",
"error_class": "StandardError",
"failed_at": 1501904465.993,
"retry_count": 2,
"retried_at": 1501904537.8352
}


dead

限界までリトライされた頃に、改めてkeyを確認すると、myapp:deadというkeyが追加されています。

127.0.0.1:6379> keys *

1) "myapp:processes"
2) "myapp:longcat.local:73839:1b7f42fe816a"
3) "myapp:dead"
4) "myapp:stat:failed"
5) "myapp:stat:processed:2017-08-05"
6) "myapp:stat:processed"
7) "myapp:stat:failed:2017-08-05"
8) "myapp:queues"
9) "myapp:longcat.local:73839:1b7f42fe816a:workers"

deadの型を調べてみます。

127.0.0.1:6379> type myapp:dead

zset

deadの型はZSet(ソート済みSet)のようなので、zrangeで中身を覗いてみます。

127.0.0.1:6379> zrange myapp:dead 0 -1

1) "{\"class\":\"FailureWorker\",\"args\":[],\"retry\":3,\"queue\":\"default\",\"jid\":\"953bfc8a85a0ef39b446f16d\",\"created_at\":1501904319.945594,\"enqueued_at\":1501904660.428597,\"error_message\":\"StandardError\",\"error_class\":\"StandardError\",\"failed_at\":1501904465.9930282,\"retry_count\":3,\"retried_at\":1501904660.430109}"

JSONの中身は、retryに格納されていたJSONと同じです。

{

"class": "FailureWorker",
"args": [

],
"retry": 3,
"queue": "default",
"jid": "953bfc8a85a0ef39b446f16d",
"created_at": 1501904319.9456,
"enqueued_at": 1501904660.4286,
"error_message": "StandardError",
"error_class": "StandardError",
"failed_at": 1501904465.993,
"retry_count": 3,
"retried_at": 1501904660.4301
}


stat:*

もう力尽きたので調べませんでしたが、名前から推察するにstat:*系のkeyは、統計情報を格納するためのものだと思われます。


Redisに直接enqueueする

これまでの考察から、queue:${QUEUE}にJSONをPUSHすることで、Sidekiqのクライアントを使わずにenqueueが出来そうです。

そこで、実験してみます。

> lpush myapp:queue:default "{\"class\":\"SleepWorker\",\"args\":[3600],\"retry\":false,\"queue\":\"default\",\"jid\":\"0b34564dbb2dcd63ec644b16\",\"created_at\":1501906533.288397,\"enqueued_at\":1501906533.288397}"

キタ━━━━(゚∀゚)━━━━!!


sidekiq.log

2017-08-05T04:19:45.626Z 73839 TID-ouxj1c6uk SleepWorker JID-0b34564dbb2dcd63ec644b16 INFO: start



Redisに書き込まれる値のまとめ(stat・schedule除く)

key
type
内容

queues
Set
キュー名の集合

processes
Set
プロセスの集合

${HOSTNAME}:${PID}:${HASH}
Hash
各プロセスの詳細情報

${HOSTNAME}:${PID}:${HASH}:workers
Hash
各プロセスで処理中のジョブ

queue:${QUEUE}
List
各キューのジョブ待ち行列

retry
ZSet
リトライ中のジョブ

dead
ZSet
Dead Job Queue


[補足] ActiveJob

ActiveJobがインタフェースのデファクトスタンダードになっていることから、ActiveJobを使う場合も見ておきます。


config/application.rb

...

module MyApp
class Application < Rails::Application
...

config.active_job.queue_adapter = :sidekiq
end
end



app/jobs/sleep_job.rb

class SleepJob < ApplicationJob

queue_as :default

def perform(time_to_sleep)
sleep(time_to_sleep)
end
end


このような設定でJobを実装してジョブキューに投入し、キューの中身をRedisで覗いてみます。

$ bundle exec rails runner '6.times { SleepJob.perform_later(3600) }'

127.0.0.1:6379> lrange myapp:queue:default 0 -1

1) "{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",\"wrapped\":\"SleepJob\",\"queue\":\"default\",\"args\":[{\"job_class\":\"SleepJob\",\"job_id\":\"49f612ef-233a-4660-b9e7-cc48a80ee7d4\",\"provider_job_id\":null,\"queue_name\":\"default\",\"priority\":null,\"arguments\":[3600],\"executions\":0,\"locale\":\"en\"}],\"retry\":true,\"jid\":\"1e7c528c1c5727e0d46a7786\",\"created_at\":1501907321.086514,\"enqueued_at\":1501907321.0865438}"

JSONの構造は以下のようになっており、生Sidekiqの場合と比較して、何個かキーが追加されています。


ActiveJobのジョブ

{

"class": "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper",
"wrapped": "SleepJob",
"queue": "default",
"args": [
{
"job_class": "SleepJob",
"job_id": "49f612ef-233a-4660-b9e7-cc48a80ee7d4",
"provider_job_id": null,
"queue_name": "default",
"priority": null,
"arguments": [
3600
],
"executions": 0,
"locale": "en"
}
],
"retry": true,
"jid": "1e7c528c1c5727e0d46a7786",
"created_at": 1501907321.0865,
"enqueued_at": 1501907321.0865
}

もう力尽きたので、ActiveJobの詳細はまた今度調べたいと思います。