■ ElixirのConcurrent Programmingについてまとめた過去記事
Elixir Concurrent Programming(1) - Spawn - Qiita
Elixir Concurrent Programming(2) - GenServer - Qiita
Elixir Concurrent Programming(3) - Supervisors - Qiita
Elixir Concurrent Programming(4) - Task and Agent -Qiita
ElixirのプログラムはBEAMと呼ばれるVM上で走ります。そしてElixirにはConcurrent Programmingという特徴がありますが、これは特有の軽量プロセス(BEAMプロセス)を使って実現されます。一つのプロセスには基本的に数KBしか必要とされません。またひとつのVM上では、理論的に1億3,400万個のプロセスを走らせることができます。とにかく、一般的に言われるプロセスとは違って、あまりリソースを必要とせずに多数のプロセスを同時に走らせることが可能です。
#1.Elixirで新プロセスを生成する - spawn
Elixir はconcurrencyを実現するために、actor modelを採用しています。actorは独立したプロセスで、他のプロセスとメモリを含め何も共有することはありません。 その代わりmessageをsendし、receiveすることで情報のやり取りを行います。
defmodule MyModule do
def hello(msg) do
IO.puts("hello " <> msg)
end
end
my_module.exを読み込んでiexを起動します。
iex my_module.ex
まずは普通に関数callを行います。特に新プロセスを生成していません。
iex(1)> MyModule.hello("taro")
hello taro
:ok
次にspawnで新プロセスを起動します。MyModuleのhello関数に引数["taro"]を渡してプロセスを生成し実行しています。プロセスの識別子pidが返ってきます。
iex(1)> pid = spawn(MyModule, :hello, ["taro"])
hello taro
#PID<0.89.0>
Enum.mapを使って、複数プロセスを立ち上げます。pidのリストが返ってきます。
iex(2)> names = ["taro","hanako","jiro"]
["taro", "hanako", "jiro"]
iex(3)> plist = Enum.map(names, &spawn(MyModule, :hello, [&1]) )
hello taro
hello hanako
hello jiro
[#PID<0.93.0>, #PID<0.94.0>, #PID<0.95.0>]
iex(4)>
#2.プロセス間 Message - send & receive
それではプロセス間通信の send と receive を見てみましょう。
defmodule MyModule2 do
def hello do
receive do
{:hello, sender, msg} -> ### :hello message
send sender, { :ok, "Goodby, #{msg}" }
other -> ### :hello message 以外のmessage
IO.puts ("次のmessageを受信しました : #{other}")
after 5000 -> ### 何のmessageもなかった場合の処理
IO.puts("message はありませんでした。")
end
end
end
:hello messageを送ってみる
# iex my_module2.ex
iex(1)> pid = spawn(MyModule2, :hello, []) # pid=新プロセス
#PID<0.89.0>
iex(2)> send pid, {:hello, self(), "taro"} # 新プロセスに:hello messageを送る
{:hello, #PID<0.87.0>, "taro"}
iex(3)> receive do # 新プロセスからの返信messageを受信する
...(3)> {:ok, message} ->
...(3)> IO.puts message
...(3)> end
Goodby, taro
:ok
:hello message以外のmessageを送ってみる
# iex my_module2.ex
iex(1)> pid = spawn(MyModule2, :hello, [])
#PID<0.89.0>
iex(2)> send pid, "hanako"
次のmessageを受信しました : hanako
"hanako"
何のmessageも送らずに時間切れ(5000)を待ってみる。
# iex my_module2.ex
iex(1)> pid = spawn(MyModule2, :hello, [])
#PID<0.89.0>
message はありませんでした。
#3.プロセスの死を知る - spawn_link
子プロセスの死を親に伝えるためにはspawn_link/3を使います。この時、exitシグナルをmessageに変換して、子プロセスの死の情報を通常のmessageと同じように扱えるようにします。Process.flag(:trap_exit, true) でexitをtrapすることで実現します。
defmodule MyModule3 do
def die_func do
receive do
{:kill, _} -> ### :kill message
exit(:byebye)
after 5000 ->
IO.puts("die_func : message はありませんでした。")
end
end
def run do
Process.flag(:trap_exit, true) ### exitシグナルをmessageに変換する
pid = spawn_link(MyModule3, :die_func, [])
send pid, { :kill, "Goodby" }
receive do
msg ->
IO.puts "messageを受け取りました: #{inspect msg}"
after 1000 ->
IO.puts "message はありませんでした。"
end
end
end
MyModule3.run
my_module3.exを実行してみましょう。
# elixir -r my_module3.ex
messageを受け取りました: {:EXIT, #PID<0.78.0>, :byebye}
#4.spawnを使ったサーバの定型
それではこれまでの知識を基にシンプルなサーバを作って見ましょう。これまでの例ではreceiveが1個のmessageを受け取るとプロセスは終了してました。永続的にmessageを受け取るようにするためには、tail recursionを使う必要があります。
tail recursionとは、関数の最後の実行が関数callであるときに、スタック操作は行わずに単にgoto文で置き換えるというものです。これで関数は永続的に処理を繰り返すことができるようになります。スタックオーバーフローやメモリ不足に陥ることはありません。
以下のserver.exではloop/0が再帰的に呼ばれることで、永続的にmessageをreceiveし処理しています。
defmodule MyServer do
###
### client functions
###
# loop 関数をspawnする => サーバプロセスを作る
def start do
spawn(&loop/0)
end
# サーバプロセスにリクエストを送る
def send_req(server_pid, req) do
send(server_pid, {:req, self(), req})
end
# メッセージを取得する
def get_res do
receive do
{:res, res} -> res
after
5000 -> {:error, :timeout}
end
end
###
### server functions
###
defp loop do
receive do
{:req, client_pid, req} ->
send(client_pid, {:res, "res for #{req}"})
end
loop()
end
end
さて以下のようにiexで、MyServerのプロセスを100個起動し、負荷分散を目指します。MyServerはStateを持たないので、単純に100個起動するだけです。5個の仕事を行わせるのに、ランダムの5個のプロセスを選択しリクエストを出します。最後に結果をreceiveして表示します。
###
### MyServerのプロセスを100個起動し、負荷分散を目指す。
###
iex(1)> pool = Enum.map(1..100, fn _ -> MyServer.start() end)
[#PID<0.89.0>, #PID<0.90.0>, #PID<0.91.0>, #PID<0.92.0>, #PID<0.93.0>,
---
#PID<0.134.0>, #PID<0.135.0>, #PID<0.136.0>, #PID<0.137.0>, #PID<0.138.0>, ...]
###
### 5個のプロセスをランダムに選びリクエストを送る。
###
iex(2)> Enum.each(
...(2)> 1..5,
...(2)> fn req ->
...(2)> child_pid = Enum.at(pool, :rand.uniform(100) - 1)
...(2)> MyServer.send_req(child_pid, req)
...(2)> end
...(2)> )
:ok
###
### 返信messageを1個づつ取り出す。
###
iex(3)> MyServer.get_res()
"res for 1"
iex(4)> MyServer.get_res()
"res for 3"
iex(5)> MyServer.get_res()
"res for 4"
iex(6)> MyServer.get_res()
"res for 2"
iex(7)> MyServer.get_res()
"res for 5"
iex(8)> MyServer.get_res()
{:error, :timeout}
iex(9)>
今回は以上です。
■ Elixirプロセス関連の過去記事
Elixir でプロセス管理 - A Fibonacci Server - Qiita
Elixirの分散処理(Node)とMnesia - Qiita