やりたいこと
時間のかかる処理はTask Queueに追い出したい。また、Task QueueとはWeb API経由で対話ができるようにしたい。
今回はExqというライブラリを使ってみることにする。Resque / Sidekiq互換とのこと。こいつにはWeb APIが無いので、そこは自作することとする。
実装
事前準備として、redisが必要になるので適宜インストールしておく。
インストール
mix.exs
に以下の変更を加える。
# :exqを追記
def application do
[mod: {Smaug, []},
applications: [:phoenix, :phoenix_html, :cowboy, :logger,
:phoenix_ecto, :mariaex,
:exq]]
end
defp deps do
[
# ... other deps
{:exq, "~> 0.3.0"}
]
end
変更後、mix deps.get
でインストールする。
設定
config/config.exs
に以下の記述を追加。その他の設定項目についてはドキュメント参照。
config :exq,
host: "localhost",
port: 6379,
namespace: "exq"
web.ex
web.ex
に以下を追加し、contollerと大体同じようなことができるようにしておく。
def worker do
quote do
alias Demo.Repo
import Ecto.Model
import Ecto.Query, only: [from: 1, from: 2]
import Demo.Router.Helpers
end
end
Worker
HelloWorldなworkerを書いてみる。
defmodule Demo.HelloWorldWorker do
use Demo.Web, :worker
def perform do
IO.puts "Hello World!"
end
end
この状態でworkerは走らせられるようになっているので、iex -S mix phoenix.server
を叩いてテストをしてみる。
iex(1)> Exq.enqueue(:exq, "default", Demo.HelloWorldWorker, [])
"Hello World!"
Web API
このままだと不便なので、Web APIを作ってQueueのステータスなどを取れるようにする。
今回はGET /api/job/stats
、POST /api/job
を定義。以下のようにcontrollerとviewを書く。
defmodule Demo.JobController do
use Demo.Web, :controller
def stats(conn, _params) do
{:ok, processed} = Exq.Api.stats(:exq_enqueuer, "processed")
{:ok, failed} = Exq.Api.stats(:exq_enqueuer, "failed")
{:ok, busy} = Exq.Api.busy(:exq_enqueuer)
{:ok, scheduled} = Exq.Api.queue_size(:exq_enqueuer, :scheduled)
{:ok, queues} = Exq.Api.queue_size(:exq_enqueuer)
queue_sizes = for {_q, size} <- queues do
{size, _} = Integer.parse(size)
size
end
qtotal = "#{Enum.sum(queue_sizes)}"
render(conn, :stats, stats: %{
processed: processed,
failed: failed,
busy: busy,
scheduled: scheduled,
enqueued: qtotal
})
end
def create(conn, %{"job_name" => job_name}) do
{:ok, jid} = Exq.enqueue(:exq_enqueuer, "default", job_name, [])
render(conn, :show, job: %{id: jid})
end
end
defmodule Demo.JobView do
use Demo.Web, :view
def render("stats.json", %{stats: stats}) do
%{
data: %{
processed: stats.processed,
failed: stats.failed,
busy: stats.busy,
scheduled: stats.scheduled,
enqueued: stats.enqueued
}
}
end
def render("show.json", %{job: job}) do
%{data: render_one(job, Demo.JobView, "job.json")}
end
def render("job.json", %{job: job}) do
%{id: job.id}
end
end
ルーティングを以下のように設定。
defmodule Demo.Router do
use Demo.Web, :router
pipeline :browser do
plug :accepts, ["html"]
plug :fetch_session
plug :fetch_flash
plug :protect_from_forgery
plug :put_secure_browser_headers
end
pipeline :api do
plug :accepts, ["json"]
end
scope "/", Demo do
pipe_through :browser
get "/", PageController, :index
end
scope "/api", Demo do
pipe_through :api
get "/job/stats", JobController, :stats
post "/job", JobController, :create
end
end
動かしてみる
# Enqueue
$ curl -X POST -H "Content-Type: application/json" -d '{"job_name": "Demo.HelloWorldWorker"}' http://localhost:4000/api/job
{"data":{"id":"bf78b561-fa13-47b5-a8d8-3ce76c6b0034"}}
# Queue Stats
$ curl -X GET -H "Content-Type: application/json" http://localhost:4000/api/job/stats
{"data":{"scheduled":"0","processed":"1","failed":"0","enqueued":"0","busy":"0"}}
あとがき
[Elixir] PhoenixでPlugの自作も参考にしてもらい、Task Queue用のWeb APIへのリクエストはlocalhostからだけ/ローカルネットワーク内からだけ受け付けるようにするとよいと思う。