Edited at

Elixir のプロセスを使ってフェイルセーフなアプリケーションを作る ─ 失敗は恐れず泥水にダイブ

More than 3 years have passed since last update.

[翻訳] Elixirのプロセスアーキテクチャ または私は如何にして心配するのを止めてクラッシュを愛するようになったか にもあるように Elixir においては例外処理は、それを頑張ってなんとかしようとするのではなく、軽量プロセスのコンテキストでむしろすすんでクラッシュさせてしまえ、というのが良い作法である。

クイズ番組で ○ か × か答えを選んで壁に突っ込んだ先に、正解ならクッションが、不正解なら泥水があるという企画があるが、それに喩えるなら 泥水だろうが何だろうが躊躇せずダイブしろ! というのが Elixir 流 (俺調べ) である。

もとい、クラッシュさせてどうするのかというと Supevisor を使って、別プロセスから該当プロセスを監視しておいて、クラッシュしてもアプリケーション全体としては間違いなく動いている状態を保証するのが正しい。

カッとなってちょっとそのための例を書いてみたので、承認欲求を満たすためにも、少し晒してみようと思う。


例題: キャッシュつきの HTTP GET

例として


  • 指定された URL から HTML を取ってきて、画面に出力する。今回は簡単のために title のみ抽出して出力する

  • 同じ URL に何度も HTTP アクセスすると迷惑なので、一度取得したデータはメモリにキャッシュしておく

というアプリケーションを実装してみようと思う。

このとき後者のメモリ上のキャッシュというのは明らかに「状態」なので、プロセスの出番である。URL と title のペアを保持する HashDict を持ったプロセスを作っておいて保持させる。


ファーストステップ: HTTPクライアントを用意

まずは指定された URL から HTML を取ってきて title を抽出するモジュールを書こう。これがないと始まらない。


lib/web_archive/fetcher.ex

defmodule WebArchive.Fetcher do

def fetch(url) do
HTTPotion.start
HTTPotion.get(url)
end

def process_response(%{status_code: 200, body: body}) do
body
|> to_string
end

def extract_title(url) do
[{"title", [], [title]}] = fetch(url)
|> process_response
|> Floki.find "title"
title
end
end


HTTP クライアントには HTTPotion、HTML の parse に Floki を使った。

iex> WebArchive.Fetcher.extract_title("https://twitter.com/")

"Twitterへようこそ - ログインまたは新規登録"

このように、extract_title/1 で twitter.com の title 文字列が返ってくる。


泥沼に構わず突っ込む

さて、このコードで特徴的なのは正常系のことしか気にしてないところである。

  def process_response(%{status_code: 200, body: body}) do

body
|> to_string
end

例えば HTTP GET 後の処理は process_response/1 に渡すが、パターンマッチでステータスコード 200 の時しかみてない。従って 200 以外が返ってくると、FunctionClauseError でクラッシュする。

iex(5)> WebArchive.Fetcher.extract_title("http://example.com/404")

** (FunctionClauseError) no function clause matching in WebArchive.Fetcher.process_response/1

また入力値である url のバリデーションをしてないので、例えば :cat とか URL 文字列ではない値を渡すと HTTPotion が例外を吐く。それから、取得したデータが HTML じゃなかったら Floki が何かしら例外を起こすかもしれないし、HTML を解析した結果 title 要素がなかった場合も例外が出るかもしれない。

・・・などなど考えれば異常系はいろいろ可能性としてあるし悩みは尽きないが、いいや!そういうことは考えない。泥水に構わず突っ込め、というのはそういうことだ。

そう、クラッシュしたら、Supervisor に起こして貰うんだ。


状態を保持するサーバーを作って Supervisor で監視

さて、このクライアントにキャッシュの機能をつけることを考えよう。

先にも述べた通りキャッシュは「状態」なので、状態を保持したサーバープロセスを作ることにする。

iex(5)> WebArchive.Server.extract_title("https://twitter.com/") # HTTP アクセス

"Twitterへようこそ - ログインまたは新規登録"
iex(6)> WebArchive.Server.extract_title("https://twitter.com/") # キャッシュから返る
"Twitterへようこそ - ログインまたは新規登録"

こんな塩梅である。

素朴に考えると GenServer を使って実装すれば良いように見える。GenServer については Elixir の OTP (GenServer 編) に解説している。そして、アプリケーションのクラッシュは Supervisor で監視する。それにより HTTP GET が何かしらの理由でエラーになってもアプリケーションは復帰し処理を継続できる。

万歳!!

・・・しかし、このアプリケーションでは、その単純な方針ではひとつだけ問題がある。

たとえば以下の様に、引数に不正な値 ・・・ URL 文字列を期待されているところにアトムを渡すみたいなことをすると、GenServer はクラッシュするわけだが、サーバーがクラッシュするとキャッシュ用の HashDict も一緒にクリアされてしまう。

iex(12)> WebArchive.Server.extract_title(:cat) # クラッシュ!

10:18:04.900 [error] GenServer WebArchive.Server terminating
Last message: {:extract_title, :cat}
State: #PID<0.153.0>
...
iex(13)> WebArchive.Server.extract_title("https://twitter.com/") # HTTP GET してしまう
"Twitterへようこそ - ログインまたは新規登録"

不正な値が与えられるたびキャッシュがクリアされるんでは、さすがに脆弱と言わざるを得ない。

さてどうしたものか。


Supervison Tree でプロセスのライフサイクルを分ける

こういうときは状態を扱う部分をサーバーから分離してまた別のプロセスにする。ここでは Stash と名付けよう。異なるプロセスに分離することでライフサイクルを分けるんである。そして二つのプロセスはメッセージパッシングでやりとりをする。

そして、Supervisor も別に用意して親子関係を作り、監視させる。

4f82f5998c4d538b44579ddcab5b000e2b1137d4.png

これで Server がクラッシュしても Stash は素知らぬ顔で動き続けることができる!


実装

では実装をみていく。


Stash (Agent)

Stash は GenServer で作ってもよいが、HashDict を保持するだけの役割でタスクは持たない。OTP の中でも状態を扱うだけに特化したフレームワークである Agent を使うとより簡単に書ける。


lib/web_archive/stash.ex

defmodule WebArchive.Stash do

def start_link do
Agent.start_link(fn -> HashDict.new end)
end

def save_title(pid, url, title) do
Agent.update pid, fn(dict) -> Dict.put(dict, url, title) end
end

def get_title(pid, url) do
Agent.get pid, fn(dict) -> dict[url] end
end
end


これだけ。ここでは Agent の API を抽象化したかったので Stashわざわざ Stash という形でモジュールを用意しているが、この程度であれば Agent をそのまま使うという選択でも悪くないと思う。


Server (GenServer)

Server には、先に作った Fetcher とこの Stash を使ってキャッシュ機能つきの extract_title/1 を実装する。

こいつはキャッシュ保持用の HashDict を Stash に追いやったしステートレス〜と言いたいところだが、実は Stash プロセスにメッセージパッシングするためその pid を初期化時に受け取り保持し続ける必要があるのでやっぱり状態があるのだった。

というわけで、(1) 何かしらのタスク、ここでは extract_title/1 と (2) 状態の保持、(1) (2) を両方請け負うので、この実装には順当に GenServer を使う。Stash の pid は GenServer の状態保持の仕組みを使って引き回す。


lib/web_archive/server.ex

defmodule WebArchive.Server do

use GenServer

def start_link(stash_pid) do
GenServer.start_link(__MODULE__, stash_pid, name: __MODULE__)
end

def extract_title(url) do
GenServer.call __MODULE__, {:extract_title, url}
end

def handle_call({:extract_title, url}, _from, stash_pid) do
{:reply, fetch_title(stash_pid, url), stash_pid}
end

defp fetch_title(stash_pid, url) do
case WebArchive.Stash.get_title(stash_pid, url) do
nil ->
title = WebArchive.Fetcher.extract_title(url)
:ok = WebArchive.Stash.save_title(stash_pid, url, title)
title
title -> title
end
end
end


fetch_title/2 の中で Fetcher と Stash を使って、Stash にあったらそれを返却なかったら HTTP GET、という処理をしている。もうちょい関数型っぽく書けそうなきもするが。


Supervisor

これで Stash と Server はできたので、Supervisor を用意する。実装は『Programming Elixir』に載ってるものを参考にした。

特に難しいことはやってないが、SubSupervisor が Stash の pid を知る必要がある都合上、Stash → SubSupervisor の順に起動しないといけない。そこで、普通は supervise/3 の第一引数にモジュールを指定するところそこではやらず、明示的に start_child/3 を呼んでいる。


lib/web_archive/supervisor.ex

defmodule WebArchive.Supervisor do

use Supervisor

def start_link do
res = {:ok, sup} = Supervisor.start_link(__MODULE__, [])
{:ok, stash_pid} =
Supervisor.start_child(sup, worker(WebArchive.Stash, []))
Supervisor.start_child(sup, supervisor(WebArchive.SubSupervisor, [stash_pid]))
res
end

def init(_) do
supervise [], strategy: :one_for_one
end
end


サブの Supervisor の方は何の変哲もない。


lib/web_archive/subsupervisor.ex

defmodule WebArchive.SubSupervisor do

use Supervisor

def start_link(stash_pid) do
Supervisor.start_link(__MODULE__, stash_pid)
end

def init(stash_pid) do
supervise [ worker(WebArchive.Server, [stash_pid]) ], strategy: :one_for_one
end
end



Application

これで部品は揃ったので、アプリケーションのエントリポイントを書く。


lib/web_archive.ex

defmodule WebArchive do

use Application
def start(_type, _args) do
WebArchive.Supervisor.start_link
end
end

親の Supervisor を起動するだけですな。

これで親 Supervisor → Stash → SubSupervisor → Server の順に起動する。完成。

iex(15)> WebArchive.Server.extract_title("https://twitter.com/")

"Twitterへようこそ - ログインまたは新規登録"
iex(16)> WebArchive.Server.extract_title(:cat)
** (exit) exited in: GenServer.call(WebArchive.Server, {:extract_title, :cat}, 5000)
** (EXIT) an exception was raised:
...
iex(17)> WebArchive.Server.extract_title("https://twitter.com/")
"Twitterへようこそ - ログインまたは新規登録"

クラッシュさせてもアプリケーションは終了せず、また、クラッシュ後の extract_title/1 もキャッシュから返ってくる。ほんとにそうなってるか知りたい場合は extract_title/1 で HTTP GET してくるところにデバッグメッセージを追加すれば良いだろう。


おまけ: Task の async/await で並行処理

せっかくキャッシュ付きで HTTP リクエストを飛ばす仕組みを作ったので、複数のサイトからまとめてタイトルを取ってくる API も作ってみよう。

iex> WebArchive.Server.extract_titles(["https://twitter.com/", "https://github.com/", "https://kaizenplatform.com/"])

["Twitterへようこそ - ログインまたは新規登録",
"GitHub · Build software better, together.",
"Kaizen Platform, Inc."]

こんな感じの。

もちろん一個一個の URL を同期的かつ逐次で取ってくるなんてそんな野暮なことはしない。非同期に並行処理してやろうではないか。

OTP の Task の async/await を使えば簡単に実装できる。


lib/web_archive/server.ex

defmodule WebArchive.Server

use GenServer

...

def extract_titles(urls) when is_list(urls) do
urls
|> Enum.map(&(Task.async(fn -> extract_title(&1) end)))
|> Enum.map(&(Task.await/1))
end


これだけ。万歳!

async/await というと C# の API を思い出すけども、Elixir の場合は async/await の裏側では例によって軽量プロセスが作られ並行処理が実行される。かっこいい。


まとめ


  • Elixir では例外処理はがんばらない。正常系だけ書いて Supervisor にお任せ

  • 状態のライフサイクルを分けるには Supervision Tree を構築する

  • プロセスの実装には OTP ライブラリを使うと簡単。GenServer、Agent etc.

  • Task を使えばタスクの並行処理も簡単に書ける

というわけで、そこに泥水があろうが失敗は気にせず突っ込もう。正常系しか考えない野郎でも無問題、それが Elixir クオリティなのである。