今回はTaskを作ってみたので紹介します。
前提知識はプログラミングElixirの14.1節で登場するspawn, send, receiveです。
作った動機は以下を思ったからです。
- Taskって何が嬉しいのだろうか
- spawn, send, receiveを知り、Taskを知ると「Taskって作れそう」
- asyncでプロセスをバックグランウンドで実行させ、(spawn、send)
- awaitでその結果を受け取っている(receive)
Message Passingのおさらい
プログラミングElixir p.163のspawn1.exでプロセス間のメッセージ送信をおさらいします。
defmodule Spawn1 do
def greet do
receive do
{sender, msg} ->
send sender, { :ok, "Hello, #{msg}" }
end
end
end
pid = spawn(Spawn1, :greet, [])
send pid, {self(), "World"}
receive do
{:ok, message} ->
IO.puts message
end
spawn1.exを上から順に読むと
- spawnによって生成するプロセスがgreet関数を実行しreceiveで待機 (このプロセスをAと呼びます)
- 次の行がプロセスAに対し、{self(), "World"}を送信 self()はプロセスのpidを取得する関数。この場合iexのpidです
- プロセスAが{self(), "World"}を受信し、iexに{:ok, "Hello, World"}を送信
- iexがreceiveで受信
ここで不思議に思うのはreceiveが実行される前にsendしても良いのだろうか?です。
上記の書き方だとタイミングによってはreceiveが実行される前にsendが起きることは有り得そうです。
結論からいうとこれは問題ないです。
sendによって投げられたメッセージは送信先プロセスのmessage用のqueue(mailboxと呼ばれる)に保存されるからです。
参考
- https://ubiteku.oinker.me/2016/08/09/how-do-erlang-microprocesses-work-internally/
- http://erlang.org/doc/getting_started/conc_prog.html#message-passing
実装
仕様の確認
それでは作るためにガワの仕様を確認します。
iex(1)> h Task.async/1 # 簡単のため/1のみ実装します
def async(fun)
@spec async((() -> any())) :: t()
# 略
iex(2)> h Task.await/1
def await(task, timeout \\ 5000)
# 略
- Task.async/1はfunを受ける
- Task.await/1はtaskとtimeoutを受ける
- taskが何かわからないです
iex(3)> Task.async(fn -> 1 end)
%Task{
owner: #PID<0.109.0>,
pid: #PID<0.115.0>,
ref: #Reference<0.3596268900.252444678.63351>
}
taskはTask構造体みたいです。構造体は知識が無いのでmapとしてしまいます。
草案
defmodule MyTask do
def async(fun) do
owner = self()
pid = spawn(
fn ->
send(owner, { ???, fun.() })
end
)
%{owner: owner, pid: pid, ref: ref}
end
def await(task, timeout \\ 5000) do
receive do
{ ???, result } -> result
after timeout -> nil # ここはテキトウ
end
end
end
- ガワの仕様から引数と戻り値は書けました
- Task.awaitは実行したプロセスのmailboxを使ってるはずなので、
MyTask.asyncのsendの送信先は実行プロセスとなるようにself()でとるように書きました。
ownerという言葉のニュアンスもそれっぽい
分からないのはMyTask.awaitが自分宛てのmessageをどうやって区別するかです。
MyTask.asyncを複数実行した後でもMyTask.awaitは自分宛てのmessageを区別できなければならないはずです。
つまり、少なくともownerプロセスにおいてユニークでなければなりません。
。
。
。
。
使っていない「ref」ってなんだろうか?この疑問が湧いて解決しました。
Built-in typesに以下の記述があります。
Reference - a unique value in the runtime system, created with make_ref/0
まさに欲しかったものです。これを使えばよさそうです。
完成案
defmodule MyTask do
def async(fun) do
owner = self()
ref = make_ref()
pid = spawn(
fn ->
send(owner, { ref, fun.() })
end
)
%{owner: owner, pid: pid, ref: ref}
end
def await(task, timeout \\ 5000) do
ref = task[:ref]
receive do
{ ^ref, result } -> result # ^の付け忘れ注意
after timeout -> nil # ここはテキトウ
end
end
end
試してみると
iex(1)> hoge = MyTask.async(fn -> "hoge" end)
%{
owner: #PID<0.137.0>,
pid: #PID<0.139.0>,
ref: #Reference<0.1222458256.3309043713.33493>
}
iex(2)> fuga = MyTask.async(fn -> "fuga" end)
%{
owner: #PID<0.137.0>,
pid: #PID<0.141.0>,
ref: #Reference<0.1222458256.3309043713.33524>
}
iex(3)> MyTask.await(hoge)
"hoge"
iex(4)> MyTask.await(fuga)
"fuga"
バッチリ!
async/3はapplyを使えば書けると思うので、試してみてください。
まとめ
作ってみることにより以下が分かりました。
- Taskって何が嬉しいのだろうか
- 処理をバックグラウンドで走らすことは、spawn, send, receiveを使うことで十分できるが、
Task.async,awaitを使うことで処理を渡すだけでそれを行ってくれる嬉しさがある。
(作る前から分かってたじゃんと言われると辛いですが
(実際のTaskはstart_link関数を持つOTPサーバーのため、Supervisorに監視させられる嬉しさもあるようです。
- 処理をバックグラウンドで走らすことは、spawn, send, receiveを使うことで十分できるが、
- send後にreceiveしても問題ない
- Reference型としてunique valueがあり、make_refで作ることができる
- 簡易でよければTaskは作れる
今回は以上です。では、また!
「いいね」よろしくお願いします。
おまけ
ポエム1
今回の取り組みは「kokura.ex#3:Elixirもくもく会~入門もあるよ(19:00)」にリモート参加した際に思いつきで始めました。
もくもく会では雰囲気実装までしかたどり着きませんでしたが、終わりがけにアドバイスをいただけたのが助かりました。
何が言いたいかというと、「kokura.ex#3:Elixirもくもく会~入門もあるよ(19:00)」に感謝です。
ポエム2
Referenceへの疑問はすぐに湧いたわけではなく、ElixirのTask.asyncの実装をみて気づきました。
ElixirのTask.async/3とawaitの実装
def async(module, function_name, args)
when is_atom(module) and is_atom(function_name) and is_list(args) do
mfa = {module, function_name, args}
owner = self()
{:ok, pid} = Task.Supervised.start_link(get_owner(owner), get_callers(owner), :nomonitor, mfa)
ref = Process.monitor(pid)
send(pid, {owner, ref})
%Task{pid: pid, ref: ref, owner: owner}
end
# 略
def await(%Task{ref: ref, owner: owner} = task, timeout \\ 5000) when is_timeout(timeout) do
if owner != self() do
raise ArgumentError, invalid_owner_error(task)
end
receive do
{^ref, reply} ->
Process.demonitor(ref, [:flush])
reply
{:DOWN, ^ref, _, proc, reason} ->
exit({reason(reason, proc), {__MODULE__, :await, [task, timeout]}})
after
timeout ->
Process.demonitor(ref, [:flush])
exit({:timeout, {__MODULE__, :await, [task, timeout]}})
end
end
すべてを理解するのは辛いですが、雰囲気は分かります。
- Task.Supervised.start_linkでプロセスが生成されていること
- そのプロセスの監視をするProcess.monitorでrefをとっていること
- そのプロセスに{owner, ref}を送ると、receiveされ、結果がsendされること
- 結果をawaitのreceiveで受けること
実際のソースを読むことで学べることは沢山ありそうです。