LoginSignup
6
2

More than 1 year has passed since last update.

作って学ぶTask.async, await

Last updated at Posted at 2019-11-28

今回はTaskを作ってみたので紹介します。
前提知識はプログラミングElixirの14.1節で登場するspawn, send, receiveです。

作った動機は以下を思ったからです。

  • Taskって何が嬉しいのだろうか
  • spawn, send, receiveを知り、Taskを知ると「Taskって作れそう」
    • asyncでプロセスをバックグランウンドで実行させ、(spawn、send)
    • awaitでその結果を受け取っている(receive)

Message Passingのおさらい

プログラミングElixir p.163のspawn1.exでプロセス間のメッセージ送信をおさらいします。

spawn1.ex
defmodule Spawn1 do
  def greet do
    receive do
      {sender, msg} ->
        send sender, { :ok, "Hello, #{msg}" }
    end
  end
end

pid = spawn(Spawn1, :greet, [])
send pid, {self(), "World"}

receive do
  {:ok, message} ->
    IO.puts message
end

spawn1.exを上から順に読むと

  1. spawnによって生成するプロセスがgreet関数を実行しreceiveで待機 (このプロセスをAと呼びます)
  2. 次の行がプロセスAに対し、{self(), "World"}を送信 self()はプロセスのpidを取得する関数。この場合iexのpidです
  3. プロセスAが{self(), "World"}を受信し、iexに{:ok, "Hello, World"}を送信
  4. iexがreceiveで受信

ここで不思議に思うのはreceiveが実行される前にsendしても良いのだろうか?です。
上記の書き方だとタイミングによってはreceiveが実行される前にsendが起きることは有り得そうです。

結論からいうとこれは問題ないです。
sendによって投げられたメッセージは送信先プロセスのmessage用のqueue(mailboxと呼ばれる)に保存されるからです。

参考

実装

仕様の確認

それでは作るためにガワの仕様を確認します。

iex(1)> h Task.async/1 # 簡単のため/1のみ実装します

                                 def async(fun)                                 

  @spec async((() -> any())) :: t()
# 略

iex(2)> h Task.await/1

                        def await(task, timeout \\ 5000)                        
# 略
  • Task.async/1はfunを受ける
  • Task.await/1はtaskとtimeoutを受ける
    • taskが何かわからないです
iex(3)> Task.async(fn -> 1 end)
%Task{
  owner: #PID<0.109.0>,
  pid: #PID<0.115.0>,
  ref: #Reference<0.3596268900.252444678.63351>
}

taskはTask構造体みたいです。構造体は知識が無いのでmapとしてしまいます。

草案

defmodule MyTask do
  def async(fun) do
    owner = self()
    pid = spawn(
      fn ->
        send(owner, { ???, fun.() })
      end
    )
    %{owner: owner, pid: pid, ref: ref}
  end

  def await(task, timeout \\ 5000) do
    receive do
      { ???, result } -> result
      after timeout -> nil # ここはテキトウ
    end
  end
end
  • ガワの仕様から引数と戻り値は書けました
  • Task.awaitは実行したプロセスのmailboxを使ってるはずなので、
    MyTask.asyncのsendの送信先は実行プロセスとなるようにself()でとるように書きました。
    ownerという言葉のニュアンスもそれっぽい

分からないのはMyTask.awaitが自分宛てのmessageをどうやって区別するかです。
MyTask.asyncを複数実行した後でもMyTask.awaitは自分宛てのmessageを区別できなければならないはずです。
つまり、少なくともownerプロセスにおいてユニークでなければなりません。




使っていない「ref」ってなんだろうか?この疑問が湧いて解決しました。

Built-in typesに以下の記述があります。

Reference - a unique value in the runtime system, created with make_ref/0

まさに欲しかったものです。これを使えばよさそうです。

完成案

defmodule MyTask do
  def async(fun) do
    owner = self()
    ref = make_ref()
    pid = spawn(
      fn ->
        send(owner, { ref, fun.() })
      end
    )
    %{owner: owner, pid: pid, ref: ref}
  end

  def await(task, timeout \\ 5000) do
    ref = task[:ref]
    receive do
      { ^ref, result } -> result # ^の付け忘れ注意
      after timeout -> nil # ここはテキトウ
    end
  end
end

試してみると

iex(1)> hoge = MyTask.async(fn -> "hoge" end)
%{
  owner: #PID<0.137.0>,
  pid: #PID<0.139.0>,
  ref: #Reference<0.1222458256.3309043713.33493>
}
iex(2)> fuga = MyTask.async(fn -> "fuga" end) 
%{
  owner: #PID<0.137.0>,
  pid: #PID<0.141.0>,
  ref: #Reference<0.1222458256.3309043713.33524>
}
iex(3)> MyTask.await(hoge)                    
"hoge"
iex(4)> MyTask.await(fuga)                    
"fuga"

バッチリ!
async/3はapplyを使えば書けると思うので、試してみてください。

まとめ

作ってみることにより以下が分かりました。

  • Taskって何が嬉しいのだろうか
    • 処理をバックグラウンドで走らすことは、spawn, send, receiveを使うことで十分できるが、
      Task.async,awaitを使うことで処理を渡すだけでそれを行ってくれる嬉しさがある。
      (作る前から分かってたじゃんと言われると辛いですが:sweat_smile:
      (実際のTaskはstart_link関数を持つOTPサーバーのため、Supervisorに監視させられる嬉しさもあるようです。
  • send後にreceiveしても問題ない
  • Reference型としてunique valueがあり、make_refで作ることができる
  • 簡易でよければTaskは作れる

今回は以上です。では、また!

「いいね」よろしくお願いします。:wink:

おまけ

ポエム1

今回の取り組みは「kokura.ex#3:Elixirもくもく会~入門もあるよ(19:00)」にリモート参加した際に思いつきで始めました。
もくもく会では雰囲気実装までしかたどり着きませんでしたが、終わりがけにアドバイスをいただけたのが助かりました。

何が言いたいかというと、「kokura.ex#3:Elixirもくもく会~入門もあるよ(19:00)」に感謝です。

ポエム2

Referenceへの疑問はすぐに湧いたわけではなく、ElixirのTask.asyncの実装をみて気づきました。

ElixirのTask.async/3とawaitの実装

elixir/lib/elixir/lib/task.ex
  def async(module, function_name, args)
      when is_atom(module) and is_atom(function_name) and is_list(args) do
    mfa = {module, function_name, args}
    owner = self()
    {:ok, pid} = Task.Supervised.start_link(get_owner(owner), get_callers(owner), :nomonitor, mfa)
    ref = Process.monitor(pid)
    send(pid, {owner, ref})
    %Task{pid: pid, ref: ref, owner: owner}
  end
# 略
  def await(%Task{ref: ref, owner: owner} = task, timeout \\ 5000) when is_timeout(timeout) do
    if owner != self() do
      raise ArgumentError, invalid_owner_error(task)
    end

    receive do
      {^ref, reply} ->
        Process.demonitor(ref, [:flush])
        reply

      {:DOWN, ^ref, _, proc, reason} ->
        exit({reason(reason, proc), {__MODULE__, :await, [task, timeout]}})
    after
      timeout ->
        Process.demonitor(ref, [:flush])
        exit({:timeout, {__MODULE__, :await, [task, timeout]}})
    end
  end

すべてを理解するのは辛いですが、雰囲気は分かります。

  • Task.Supervised.start_linkでプロセスが生成されていること
  • そのプロセスの監視をするProcess.monitorでrefをとっていること
  • そのプロセスに{owner, ref}を送ると、receiveされ、結果がsendされること
  • 結果をawaitのreceiveで受けること

実際のソースを読むことで学べることは沢山ありそうです。

6
2
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
2