142
70

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

SidekiqはRedisに何を書き込んでいるのか

Last updated at Posted at 2017-08-05

動機

Sidekiqのジョブを定期的に実行したい場合、crontabでrails runner 'MyWorker.perform_async'みたいなプロセスを起動するケースが多いと思いますが、crontabを実行するサーバがSPOFになってしまったりと、運用上悩ましい点があります。

そこで、AWS環境を前提として、

CloudWatch Events -> Lambda(Python) -> Redis -> Sidekiq

という具合に、サーバーレスな仕組みを構築することを考えました。

その過程で、「Redisに何を書き込めばSidekiqがdequeueしてくれるのか?」、という点を解明する必要があったので、Sidekiqを動かしながら実際にRedisの中身を覗いてみました。

なお、言語によってはSidekiq互換のクライアントライブラリが存在するので、それらを利用するのが早いと思います。

前提

  • Rails 5.1.3
  • Sidekiq 5.0.4
  • より一般化した情報を得たいため、redis-namespaceを有効にする
  • ActiveJobは使わない
    • 最後に補足します
  • peform_asyncを想定
    • Scheduled Jobは別の機会に調べたい
config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
  config.redis = { url: 'redis://localhost:6379', namespace: 'myapp' }
end

Sidekiq.configure_client do |config|
  config.redis = { url: 'redis://localhost:6379', namespace: 'myapp' }
end
config/sidekiq.yml
:concurrency: 5
:queues:
  - default
  - [myqueue, 2]

keyの調査

まずはsidekiq起動直後の状態で、Redisのkeyを確認してみます。
longcat.localはマシン名です。

127.0.0.1:6379> keys *
1) "myapp:processes"
2) "myapp:stat:failed"
3) "myapp:stat:processed:2017-08-05"
4) "myapp:stat:processed"
5) "myapp:stat:failed:2017-08-05"
6) "myapp:longcat.local:72372:3d6ca4888e6f"
7) "myapp:queues"

なお、namespaceの指定がない場合は、processesqueuesのように、プレフィックスの無いkeyになります。

それでは、適当なWorkerを実装してジョブを投入してみます。

app/workers/sleep_worker.rb
class SleepWorker
  include Sidekiq::Worker
  sidekiq_options queue: :default, retry: false

  def perform(time_to_sleep)
    sleep(time_to_sleep)
  end
end
$ bundle exec rails runner '3.times { SleepWorker.perform_async(3600) }'

ジョブ投入後、keyが増えていることが分かります。

127.0.0.1:6379> keys *
1) "myapp:processes"
2) "myapp:stat:failed"
3) "myapp:stat:processed:2017-08-05"
4) "myapp:stat:processed"
5) "myapp:longcat.local:72372:3d6ca4888e6f:workers"
6) "myapp:stat:failed:2017-08-05"
7) "myapp:longcat.local:72372:3d6ca4888e6f"
8) "myapp:queues"

ここまで確認できたkeyを分類すると、次の5つに分類できそうです。

  • ${NAMESPACE}:processes
  • ${NAMESPACE}:${HOSTNAME}:${PID}:${HASH}
  • ${NAMESPACE}:${HOSTNAME}:${PID}:${HASH}:workers
  • ${NAMESPACE}:queues
  • ${NAMESPACE}:stat:*

以降、名前空間を表す${NAMESPACE}:は省略して記述します。

queues

queuesの型を調べてみます。

127.0.0.1:6379> type myapp:queues
set

queuesはSetなので、smembersで中身を覗いてみます。

127.0.0.1:6379> smembers myapp:queues
1) "default"

定義済みのキューの名前の集合が格納されているようです。

config/sidekiq.ymlにはdefault以外のキュー(myqueue)も定義しているのですが、格納されていませんね。

そこで、別のキュー(myqueue)に意図的にジョブを投入してみます。

$ bundle exec rails runner 'SleepWorker.set(queue: :myqueue).perform_async(3600)'

無事にmyqueueも登場しました。

127.0.0.1:6379> smembers myapp:queues
1) "myqueue"
2) "default"

processes

processesの型を調べてみます。

127.0.0.1:6379> type myapp:processes
set

processesはSetなので、smembersで中身を覗いてみます。

127.0.0.1:6379> smembers myapp:processes
1) "longcat.local:72372:3d6ca4888e6f"

実行中のSidekiqのプロセス単位でkeyが存在するようで、

  • 72372はプロセスID
  • longcat.localはホスト名
  • 3d6ca4888e6fは謎のハッシュ

謎のハッシュの正体はSidekiqのソースを読めば分かると思いますが、今のところ重要ではなさそうなので無視します。

processesに格納された${HOSTNAME}:${PID}:${HASH}形式の文字列ですが、同系列のkeyが存在しているので、次はそのkeyについて調査します。

${HOSTNAME}:${PID}:${HASH}

${HOSTNAME}:${PID}:${HASH}の型を調べてみます。

127.0.0.1:6379> type myapp:longcat.local:72372:3d6ca4888e6f
hash

${HOSTNAME}:${PID}:${HASH}の型はHashなので、hgetallで中身を覗いてみます。

127.0.0.1:6379> hgetall myapp:longcat.local:72372:3d6ca4888e6f
1) "busy"
2) "3"
3) "quiet"
4) "false"
5) "beat"
6) "1501901472.00442"
7) "info"
8) "{\"hostname\":\"longcat.local\",\"started_at\":1501900616.115412,\"pid\":72372,\"tag\":\"MyApp\",\"concurrency\":5,\"queues\":[\"default\",\"myqueue\"],\"labels\":[],\"identity\":\"longcat.local:72372:3d6ca4888e6f\"}"

稼働中のSidekiqプロセスの詳細な情報のようです。

${HOSTNAME}:${PID}:${HASH}:workers

${HOSTNAME}:${PID}:${HASH}:workersの型を調べてみます。

127.0.0.1:6379> type myapp:longcat.local:72372:3d6ca4888e6f:workers
hash

${HOSTNAME}:${PID}:${HASH}:workersの型はHashなので、hgetallで中身を覗いてみます。

127.0.0.1:6379> hgetall myapp:longcat.local:72372:3d6ca4888e6f:workers
1) "ov7g1wja0"
2) "{\"queue\":\"default\",\"payload\":{\"class\":\"SleepWorker\",\"args\":[3600],\"retry\":false,\"queue\":\"default\",\"jid\":\"d4c8f8f6759874ffef20a93a\",\"created_at\":1501900585.634248,\"enqueued_at\":1501900585.634562},\"run_at\":1501900616}"
3) "ov7g1wi88"
4) "{\"queue\":\"default\",\"payload\":{\"class\":\"SleepWorker\",\"args\":[3600],\"retry\":false,\"queue\":\"default\",\"jid\":\"7b7436c9e3e336fc9deaf40f\",\"created_at\":1501900578.189331,\"enqueued_at\":1501900578.189852},\"run_at\":1501900616}"
5) "ov7g1wikg"
6) "{\"queue\":\"default\",\"payload\":{\"class\":\"SleepWorker\",\"args\":[3600],\"retry\":false,\"queue\":\"default\",\"jid\":\"179e5481af82ab17f2b25538\",\"created_at\":1501900637.775475,\"enqueued_at\":1501900637.7757661},\"run_at\":1501900637}"

実行中のジョブの情報がHashとして格納されているようです。
hashのキーは実行中のスレッドのID(ログにTID-ov7g1wja0みたいに出てくるやつ)で、値はジョブのJSON表現となっています。

{
  "queue": "default",
  "payload": {
    "class": "SleepWorker",
    "args": [
      3600
    ],
    "retry": false,
    "queue": "default",
    "jid": "d4c8f8f6759874ffef20a93a",
    "created_at": 1501900585.6342,
    "enqueued_at": 1501900585.6346
  },
  "run_at": 1501900616
}

現在、concurrency = 5なので、あと3つジョブを投入して、処理待ちのジョブを作り出してみます。

$ bundle exec rails runner '3.times { SleepWorker.perform_async(3600) }'

その後、keyを確認すると、myapp:queue:defaultが増えています。
待ち行列を扱うためのkeyだと考えられます。

127.0.0.1:6379> keys *
1) "myapp:processes"
2) "myapp:queue:default"
3) "myapp:stat:failed"
4) "myapp:stat:processed:2017-08-05"
5) "myapp:stat:processed"
6) "myapp:longcat.local:72372:3d6ca4888e6f:workers"
7) "myapp:stat:failed:2017-08-05"
8) "myapp:longcat.local:72372:3d6ca4888e6f"
9) "myapp:queues"

queue:${QUEUE}

queue:${QUEUE}の型を調べてみます。

127.0.0.1:6379> type myapp:queue:default
list

queue:${QUEUE}の型はListなので、lrangeで中身を覗いてみます。

127.0.0.1:6379> lrange myapp:queue:default 0 -1
1) "{\"class\":\"SleepWorker\",\"args\":[3600],\"retry\":false,\"queue\":\"default\",\"jid\":\"10852e13a6d9846d8a95b0e7\",\"created_at\":1501902132.849817,\"enqueued_at\":1501902132.849853}"

処理待ちのジョブの情報が、JSON文字列のリストとして格納されているようです。

**「Redisに何を書き込めばSidekiqがdequeueしてくれるのか?」**という当初の疑問に対しては、かなり核心的な情報にまで到達できました。

つまり、queue:${QUEUE}というListに対して、次のような形式のJSON文字列をPUSHしてやれば良いのでは、という仮説が得られました。

{
  "class": "SleepWorker",
  "args": [
    3600
  ],
  "retry": false,
  "queue": "default",
  "jid": "10852e13a6d9846d8a95b0e7",
  "created_at": 1501902132.8498,
  "enqueued_at": 1501902132.8499
}

class,args,retry,queueに指定すべき値は想像がつきますが、その他の値については、Sidekiqのソースを読んだところ、次のような値を指定すれば良さそうです。

  • jid: SecureRandom.hex(12)
  • created_at: Time.now.to_f
  • enqueued_at: Time.now.to_f

なお、キューが空の場合は、keysでqueue:${QUEUE}は表示されないので注意が必要です。

retry

意図的に失敗するWorkerを実装して、ジョブキューに投入してみます。

app/workers/failure_worker.rb
class FailureWorker
  include Sidekiq::Worker
  sidekiq_options queue: :default, retry: 3

  def perform
    raise StandardError
  end
end
$ bundle exec rails runner 'FailureWorker.perform_async'

keyを確認すると、myapp:retryというkeyが追加されています。

127.0.0.1:6379> keys *
1) "myapp:retry"
2) "myapp:processes"
3) "myapp:longcat.local:73839:1b7f42fe816a"
4) "myapp:stat:failed"
5) "myapp:stat:processed:2017-08-05"
6) "myapp:stat:processed"
7) "myapp:stat:failed:2017-08-05"
8) "myapp:queues"
9) "myapp:longcat.local:73839:1b7f42fe816a:workers"

retryの型を調べてみます。

127.0.0.1:6379> type myapp:retry
zset

retryの型はZSet(ソート済みSet)のようなので、zrangeで中身を覗いてみます。

127.0.0.1:6379> zrange myapp:retry 0 -1
1) "{\"class\":\"FailureWorker\",\"args\":[],\"retry\":3,\"queue\":\"default\",\"jid\":\"953bfc8a85a0ef39b446f16d\",\"created_at\":1501904319.945594,\"enqueued_at\":1501904537.833684,\"error_message\":\"StandardError\",\"error_class\":\"StandardError\",\"failed_at\":1501904465.9930282,\"retry_count\":2,\"retried_at\":1501904537.835155}"

リトライ中のジョブの情報がJSON文字列の集合(ソート済み)として格納されているようです。
JSONの中には、queue:${QUEUE}で初期化された情報に加えて、エラーやリトライに関する情報が追加されています。

{
  "class": "FailureWorker",
  "args": [
    
  ],
  "retry": 3,
  "queue": "default",
  "jid": "953bfc8a85a0ef39b446f16d",
  "created_at": 1501904319.9456,
  "enqueued_at": 1501904537.8337,
  "error_message": "StandardError",
  "error_class": "StandardError",
  "failed_at": 1501904465.993,
  "retry_count": 2,
  "retried_at": 1501904537.8352
}

dead

限界までリトライされた頃に、改めてkeyを確認すると、myapp:deadというkeyが追加されています。

127.0.0.1:6379> keys *
1) "myapp:processes"
2) "myapp:longcat.local:73839:1b7f42fe816a"
3) "myapp:dead"
4) "myapp:stat:failed"
5) "myapp:stat:processed:2017-08-05"
6) "myapp:stat:processed"
7) "myapp:stat:failed:2017-08-05"
8) "myapp:queues"
9) "myapp:longcat.local:73839:1b7f42fe816a:workers"

deadの型を調べてみます。

127.0.0.1:6379> type myapp:dead
zset

deadの型はZSet(ソート済みSet)のようなので、zrangeで中身を覗いてみます。

127.0.0.1:6379> zrange myapp:dead 0 -1
1) "{\"class\":\"FailureWorker\",\"args\":[],\"retry\":3,\"queue\":\"default\",\"jid\":\"953bfc8a85a0ef39b446f16d\",\"created_at\":1501904319.945594,\"enqueued_at\":1501904660.428597,\"error_message\":\"StandardError\",\"error_class\":\"StandardError\",\"failed_at\":1501904465.9930282,\"retry_count\":3,\"retried_at\":1501904660.430109}"

JSONの中身は、retryに格納されていたJSONと同じです。

{
  "class": "FailureWorker",
  "args": [
    
  ],
  "retry": 3,
  "queue": "default",
  "jid": "953bfc8a85a0ef39b446f16d",
  "created_at": 1501904319.9456,
  "enqueued_at": 1501904660.4286,
  "error_message": "StandardError",
  "error_class": "StandardError",
  "failed_at": 1501904465.993,
  "retry_count": 3,
  "retried_at": 1501904660.4301
}

stat:*

もう力尽きたので調べませんでしたが、名前から推察するにstat:*系のkeyは、統計情報を格納するためのものだと思われます。

Redisに直接enqueueする

これまでの考察から、queue:${QUEUE}にJSONをPUSHすることで、Sidekiqのクライアントを使わずにenqueueが出来そうです。

そこで、実験してみます。

> lpush myapp:queue:default "{\"class\":\"SleepWorker\",\"args\":[3600],\"retry\":false,\"queue\":\"default\",\"jid\":\"0b34564dbb2dcd63ec644b16\",\"created_at\":1501906533.288397,\"enqueued_at\":1501906533.288397}"

キタ━━━━(゚∀゚)━━━━!!

sidekiq.log
2017-08-05T04:19:45.626Z 73839 TID-ouxj1c6uk SleepWorker JID-0b34564dbb2dcd63ec644b16 INFO: start

Redisに書き込まれる値のまとめ(stat・schedule除く)

key type 内容
queues Set キュー名の集合
processes Set プロセスの集合
${HOSTNAME}:${PID}:${HASH} Hash 各プロセスの詳細情報
${HOSTNAME}:${PID}:${HASH}:workers Hash 各プロセスで処理中のジョブ
queue:${QUEUE} List 各キューのジョブ待ち行列
retry ZSet リトライ中のジョブ
dead ZSet Dead Job Queue

[補足] ActiveJob

ActiveJobがインタフェースのデファクトスタンダードになっていることから、ActiveJobを使う場合も見ておきます。

config/application.rb
...

module MyApp
  class Application < Rails::Application
    ...

    config.active_job.queue_adapter = :sidekiq
  end
end
app/jobs/sleep_job.rb
class SleepJob < ApplicationJob
  queue_as :default

  def perform(time_to_sleep)
    sleep(time_to_sleep)
  end
end

このような設定でJobを実装してジョブキューに投入し、キューの中身をRedisで覗いてみます。

$ bundle exec rails runner '6.times { SleepJob.perform_later(3600) }'
127.0.0.1:6379> lrange myapp:queue:default 0 -1
1) "{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",\"wrapped\":\"SleepJob\",\"queue\":\"default\",\"args\":[{\"job_class\":\"SleepJob\",\"job_id\":\"49f612ef-233a-4660-b9e7-cc48a80ee7d4\",\"provider_job_id\":null,\"queue_name\":\"default\",\"priority\":null,\"arguments\":[3600],\"executions\":0,\"locale\":\"en\"}],\"retry\":true,\"jid\":\"1e7c528c1c5727e0d46a7786\",\"created_at\":1501907321.086514,\"enqueued_at\":1501907321.0865438}"

JSONの構造は以下のようになっており、生Sidekiqの場合と比較して、何個かキーが追加されています。

ActiveJobのジョブ
{
  "class": "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper",
  "wrapped": "SleepJob",
  "queue": "default",
  "args": [
    {
      "job_class": "SleepJob",
      "job_id": "49f612ef-233a-4660-b9e7-cc48a80ee7d4",
      "provider_job_id": null,
      "queue_name": "default",
      "priority": null,
      "arguments": [
        3600
      ],
      "executions": 0,
      "locale": "en"
    }
  ],
  "retry": true,
  "jid": "1e7c528c1c5727e0d46a7786",
  "created_at": 1501907321.0865,
  "enqueued_at": 1501907321.0865
}

もう力尽きたので、ActiveJobの詳細はまた今度調べたいと思います。

142
70
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
142
70

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?