11
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Elixirで遅延実行と定期実行

Posted at

はじめに

こんばんは、Papillon6814です。
今日プログラムを書いているときに、クラッシュロワイヤル銀の宝箱のような感じで「ユーザーの特定のアクションから〇〇分後に処理を実行させたい」と思うときがありました。
最初はTask.asyncで非同期のプロセスを作り、その中でProcess.sleepを実行すれば良いかと考えていました。
しかし、遅延させた処理のキャンセルをするのがTask.asyncの方法だと少し面倒だったので他の方法を探していたところObanというものを見つけました。

このObanを試しに少しだけ使ってみたのでその記録を残しておきます。ObanSample

準備

とりあえず試すためにサンプルのプロジェクトを作成します。

mix phx.new oban_sample --no-webpack

Obanではpostgresを使うので、--no-ectoオプションは付けずにプロジェクトを作成します。
基本的にREADMEに沿って準備を進めていきます。

ライブラリのインストール

mix.exs
def deps do
  [
    {:oban, "~> 2.7"}
  ]
end

いつも通りmix.exsに記述を加えます。

マイグレーションファイルの設定

Obanは処理にpostgresのテーブルを利用します。
したがってテーブルを作成するためにマイグレーションファイルを生成し、記述を加えます。

mix ecto.gen.migration add_oban_jobs_table
defmodule MyApp.Repo.Migrations.AddObanJobsTable do
  use Ecto.Migration

  def up do
    Oban.Migrations.up()
  end

  # We specify `version: 1` in `down`, ensuring that we'll roll all the way back down if
  # necessary, regardless of which version we've migrated `up` to.
  def down do
    Oban.Migrations.down(version: 1)
  end
end

記述し終えたらmigrateをします。

mix ecto.migrate

準備の記述

プロジェクト内でObanを使用することができるように、config.exsに記述を追加します。

config.exs
config :oban_sample, Oban,
  repo: ObanSample.Repo,
  plugins: [
    Oban.Plugins.Pruner,
    {Oban.Plugins.Cron,
    crontab: [
      {"* * * * *", ObanSample.Cron}
    ]}
  ],
  queues: [default: 10, events: 50, media: 20]

plugins:と書いてあるところにcrontabについての記述がありますが、これは定期実行の処理を行うために追加したものです。なので定期実行をしない場合は除いておいた方が良いです。

そして最後にapplication.exに記述を追加します。
Oban自体はアプリケーションではないので、application.exのスーパーバイザーツリーにObanを追加する必要があります。

application.ex
defmodule ObanSample.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  def start(_type, _args) do
    children = [
      # Start the Ecto repository
      ObanSample.Repo,
      # Start the Telemetry supervisor
      ObanSampleWeb.Telemetry,
      # Start the PubSub system
      {Phoenix.PubSub, name: ObanSample.PubSub},
      # Start the Endpoint (http/https)
      ObanSampleWeb.Endpoint,
      # Start a worker by calling: ObanSample.Worker.start_link(arg)
      # {ObanSample.Worker, arg}
      {Oban, oban_config()}
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: ObanSample.Supervisor]
    Supervisor.start_link(children, opts)
  end

  # Tell Phoenix to update the endpoint configuration
  # whenever the application is updated.
  def config_change(changed, _new, removed) do
    ObanSampleWeb.Endpoint.config_change(changed, removed)
    :ok
  end

  # Conditionally disable queues or plugins here.
  defp oban_config do
    Application.fetch_env!(:oban_sample, Oban)
  end
end

以上で準備は完了です。

遅延実行

Obanで遅延実行をするためのモジュールを作ります。
Obanでは、指定したモジュール内にperform関数を実装することで遅延・定期実行の処理を実行することができます。

business.ex
defmodule ObanSample.Business do
  use Oban.Worker, queue: :events

  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"id" => _id} = args}) do
    IO.inspect(args, label: :args)

    case args do
      %{"in_the" => "business"} ->
        IO.inspect("business")

      %{"vote_for" => _vote} ->
        IO.inspect("vote-for")

      %{"id" => obtained_id} ->
        IO.inspect("obtained_id: #{obtained_id}")
    end

    :ok
  end
end

遅延実行をさせる際にこのモジュールのperformが実行されるようにします。
サンプルで作成したプロジェクトのコントローラーから関数を一つ抜き出して紹介します。

page_controller.ex
  def attempt3(conn, _) do
    id = %{id: 4, message: "attempt 3"}
      |> ObanSample.Business.new(schedule_in: 2*60)
      |> Oban.insert()
      |> elem(1)
      |> Map.get(:id)

    json(conn, %{msg: "worked", id: id})
  end

この関数が実行されると、実行から2分後にObanSample.Businessのperform関数が実行されます。

> mix phx.server                                                                      
[info] Running ObanSampleWeb.Endpoint with cowboy 2.9.0 at 0.0.0.0:4040 (http)
[info] Access ObanSampleWeb.Endpoint at http://localhost:4040
[info] GET /attempt3
[debug] Processing with ObanSampleWeb.PageController.attempt3/2
  Parameters: %{}
  Pipelines: [:browser]
[info] Sent 200 in 29ms
args: %{"id" => 4, "message" => "attempt 3"}
"obtained_id: 4"

この際にcaseをうまく利用したりモジュールを分けたりすれば条件によって実行内容を変えられそうです。
Obanでは処理のキャンセルを行うこともできます。

page_controller.ex
  def cancel1(conn, %{"id" => id}) do
    id
    |> String.to_integer()
    |> Oban.cancel_job()

    json(conn, %{msg: "worked"})
  end

ここのパラメータのidはOban.Jobのidです。先ほどのattempt3関数ではレスポンスとしてOban.Jobのidを返すようにしているのでそのidを利用すれば遅延実行のキャンセルができます。

定期実行

定期実行はオマケ程度ですが一応動いたので載せておきます。
これは遅延実行とは別で実行させるWorkerなので新しくモジュールを作成します。

cron.ex
defmodule ObanSample.Cron do
  use Oban.Worker

  @impl Oban.Worker
  def perform(_job) do
    IO.inspect("minute worker")
    :ok
  end
end

あとはもうサーバーを動かすだけです。confix.exsに書いたのが毎分処理に関する記述だったので1分ごとに"minute worker"と出力がされるはずです。

以上です。読んでいただいてありがとうございました。

11
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?