はじめに
こんばんは、Papillon6814です。
今日プログラムを書いているときに、クラッシュロワイヤルの銀の宝箱のような感じで「ユーザーの特定のアクションから〇〇分後に処理を実行させたい」と思うときがありました。
最初はTask.async
で非同期のプロセスを作り、その中でProcess.sleep
を実行すれば良いかと考えていました。
しかし、遅延させた処理のキャンセルをするのがTask.async
の方法だと少し面倒だったので他の方法を探していたところObanというものを見つけました。
このObanを試しに少しだけ使ってみたのでその記録を残しておきます。ObanSample
準備
とりあえず試すためにサンプルのプロジェクトを作成します。
mix phx.new oban_sample --no-webpack
Obanではpostgresを使うので、--no-ecto
オプションは付けずにプロジェクトを作成します。
基本的にREADMEに沿って準備を進めていきます。
ライブラリのインストール
def deps do
[
{:oban, "~> 2.7"}
]
end
いつも通りmix.exs
に記述を加えます。
マイグレーションファイルの設定
Obanは処理にpostgresのテーブルを利用します。
したがってテーブルを作成するためにマイグレーションファイルを生成し、記述を加えます。
mix ecto.gen.migration add_oban_jobs_table
defmodule MyApp.Repo.Migrations.AddObanJobsTable do
use Ecto.Migration
def up do
Oban.Migrations.up()
end
# We specify `version: 1` in `down`, ensuring that we'll roll all the way back down if
# necessary, regardless of which version we've migrated `up` to.
def down do
Oban.Migrations.down(version: 1)
end
end
記述し終えたらmigrateをします。
mix ecto.migrate
準備の記述
プロジェクト内でObanを使用することができるように、config.exs
に記述を追加します。
config :oban_sample, Oban,
repo: ObanSample.Repo,
plugins: [
Oban.Plugins.Pruner,
{Oban.Plugins.Cron,
crontab: [
{"* * * * *", ObanSample.Cron}
]}
],
queues: [default: 10, events: 50, media: 20]
plugins:
と書いてあるところにcrontabについての記述がありますが、これは定期実行の処理を行うために追加したものです。なので定期実行をしない場合は除いておいた方が良いです。
そして最後にapplication.ex
に記述を追加します。
Oban自体はアプリケーションではないので、application.ex
のスーパーバイザーツリーにObanを追加する必要があります。
defmodule ObanSample.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
def start(_type, _args) do
children = [
# Start the Ecto repository
ObanSample.Repo,
# Start the Telemetry supervisor
ObanSampleWeb.Telemetry,
# Start the PubSub system
{Phoenix.PubSub, name: ObanSample.PubSub},
# Start the Endpoint (http/https)
ObanSampleWeb.Endpoint,
# Start a worker by calling: ObanSample.Worker.start_link(arg)
# {ObanSample.Worker, arg}
{Oban, oban_config()}
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: ObanSample.Supervisor]
Supervisor.start_link(children, opts)
end
# Tell Phoenix to update the endpoint configuration
# whenever the application is updated.
def config_change(changed, _new, removed) do
ObanSampleWeb.Endpoint.config_change(changed, removed)
:ok
end
# Conditionally disable queues or plugins here.
defp oban_config do
Application.fetch_env!(:oban_sample, Oban)
end
end
以上で準備は完了です。
遅延実行
Obanで遅延実行をするためのモジュールを作ります。
Obanでは、指定したモジュール内にperform
関数を実装することで遅延・定期実行の処理を実行することができます。
defmodule ObanSample.Business do
use Oban.Worker, queue: :events
@impl Oban.Worker
def perform(%Oban.Job{args: %{"id" => _id} = args}) do
IO.inspect(args, label: :args)
case args do
%{"in_the" => "business"} ->
IO.inspect("business")
%{"vote_for" => _vote} ->
IO.inspect("vote-for")
%{"id" => obtained_id} ->
IO.inspect("obtained_id: #{obtained_id}")
end
:ok
end
end
遅延実行をさせる際にこのモジュールのperform
が実行されるようにします。
サンプルで作成したプロジェクトのコントローラーから関数を一つ抜き出して紹介します。
def attempt3(conn, _) do
id = %{id: 4, message: "attempt 3"}
|> ObanSample.Business.new(schedule_in: 2*60)
|> Oban.insert()
|> elem(1)
|> Map.get(:id)
json(conn, %{msg: "worked", id: id})
end
この関数が実行されると、実行から2分後にObanSample.Business
のperform関数が実行されます。
> mix phx.server
[info] Running ObanSampleWeb.Endpoint with cowboy 2.9.0 at 0.0.0.0:4040 (http)
[info] Access ObanSampleWeb.Endpoint at http://localhost:4040
[info] GET /attempt3
[debug] Processing with ObanSampleWeb.PageController.attempt3/2
Parameters: %{}
Pipelines: [:browser]
[info] Sent 200 in 29ms
args: %{"id" => 4, "message" => "attempt 3"}
"obtained_id: 4"
この際にcase
をうまく利用したりモジュールを分けたりすれば条件によって実行内容を変えられそうです。
Obanでは処理のキャンセルを行うこともできます。
def cancel1(conn, %{"id" => id}) do
id
|> String.to_integer()
|> Oban.cancel_job()
json(conn, %{msg: "worked"})
end
ここのパラメータのidはOban.Job
のidです。先ほどのattempt3
関数ではレスポンスとしてOban.Job
のidを返すようにしているのでそのidを利用すれば遅延実行のキャンセルができます。
定期実行
定期実行はオマケ程度ですが一応動いたので載せておきます。
これは遅延実行とは別で実行させるWorkerなので新しくモジュールを作成します。
defmodule ObanSample.Cron do
use Oban.Worker
@impl Oban.Worker
def perform(_job) do
IO.inspect("minute worker")
:ok
end
end
あとはもうサーバーを動かすだけです。confix.exs
に書いたのが毎分処理に関する記述だったので1分ごとに"minute worker"と出力がされるはずです。
以上です。読んでいただいてありがとうございました。