2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Elixir Concurrent Programming(1) - Spawn

Last updated at Posted at 2018-09-24

■ ElixirのConcurrent Programmingについてまとめた過去記事
Elixir Concurrent Programming(1) - Spawn - Qiita
Elixir Concurrent Programming(2) - GenServer - Qiita
Elixir Concurrent Programming(3) - Supervisors - Qiita
Elixir Concurrent Programming(4) - Task and Agent -Qiita

 ElixirのプログラムはBEAMと呼ばれるVM上で走ります。そしてElixirにはConcurrent Programmingという特徴がありますが、これは特有の軽量プロセス(BEAMプロセス)を使って実現されます。一つのプロセスには基本的に数KBしか必要とされません。またひとつのVM上では、理論的に1億3,400万個のプロセスを走らせることができます。とにかく、一般的に言われるプロセスとは違って、あまりリソースを必要とせずに多数のプロセスを同時に走らせることが可能です。

#1.Elixirで新プロセスを生成する - spawn

 Elixir はconcurrencyを実現するために、actor modelを採用しています。actorは独立したプロセスで、他のプロセスとメモリを含め何も共有することはありません。 その代わりmessageをsendし、receiveすることで情報のやり取りを行います。

my_module.ex
defmodule MyModule do
  def hello(msg) do
    IO.puts("hello  " <> msg)
  end
end

 my_module.exを読み込んでiexを起動します。

iex my_module.ex

 まずは普通に関数callを行います。特に新プロセスを生成していません。

iex(1)> MyModule.hello("taro")
hello  taro
:ok

 次にspawnで新プロセスを起動します。MyModuleのhello関数に引数["taro"]を渡してプロセスを生成し実行しています。プロセスの識別子pidが返ってきます。

iex(1)>  pid = spawn(MyModule, :hello, ["taro"])
hello  taro
#PID<0.89.0>

 Enum.mapを使って、複数プロセスを立ち上げます。pidのリストが返ってきます。

iex(2)> names = ["taro","hanako","jiro"]
["taro", "hanako", "jiro"]
iex(3)> plist = Enum.map(names, &spawn(MyModule, :hello, [&1]) )
hello  taro
hello  hanako
hello  jiro
[#PID<0.93.0>, #PID<0.94.0>, #PID<0.95.0>]
iex(4)>

#2.プロセス間 Message - send & receive

 それではプロセス間通信の send と receive を見てみましょう。

my_module2.ex
defmodule MyModule2 do
  def hello do
    receive do
      {:hello, sender, msg} ->  ### :hello message
        send sender, { :ok, "Goodby, #{msg}" }
      other ->                  ### :hello message 以外のmessage
        IO.puts ("次のmessageを受信しました : #{other}")
      after 5000 ->             ### 何のmessageもなかった場合の処理
        IO.puts("message はありませんでした。")
    end
  end
end

 :hello messageを送ってみる

# iex my_module2.ex
iex(1)> pid = spawn(MyModule2, :hello, []) # pid=新プロセス
#PID<0.89.0>
iex(2)> send pid, {:hello, self(), "taro"} # 新プロセスに:hello messageを送る
{:hello, #PID<0.87.0>, "taro"}
iex(3)> receive do          # 新プロセスからの返信messageを受信する
...(3)>   {:ok, message} ->
...(3)>     IO.puts message
...(3)> end
Goodby, taro
:ok

 :hello message以外のmessageを送ってみる

# iex my_module2.ex
iex(1)> pid = spawn(MyModule2, :hello, [])
#PID<0.89.0>
iex(2)> send pid, "hanako"
次のmessageを受信しました : hanako
"hanako"

 何のmessageも送らずに時間切れ(5000)を待ってみる。

# iex my_module2.ex
iex(1)> pid = spawn(MyModule2, :hello, [])
#PID<0.89.0>
message はありませんでした。

#3.プロセスの死を知る - spawn_link

 子プロセスの死を親に伝えるためにはspawn_link/3を使います。この時、exitシグナルをmessageに変換して、子プロセスの死の情報を通常のmessageと同じように扱えるようにします。Process.flag(:trap_exit, true) でexitをtrapすることで実現します。

my_module3.ex
defmodule MyModule3 do

  def die_func do
    receive do
      {:kill, _} ->  ### :kill message
        exit(:byebye)
      after 5000 ->
        IO.puts("die_func : message はありませんでした。")
    end
  end

  def run do
    Process.flag(:trap_exit, true)   ### exitシグナルをmessageに変換する
    pid = spawn_link(MyModule3, :die_func, [])
    send pid, { :kill, "Goodby" }
    receive do
      msg -> 
        IO.puts "messageを受け取りました: #{inspect msg}"
    after 1000 ->
        IO.puts "message はありませんでした。"
    end
  end
end

MyModule3.run

 my_module3.exを実行してみましょう。

# elixir -r my_module3.ex
messageを受け取りました: {:EXIT, #PID<0.78.0>, :byebye}

#4.spawnを使ったサーバの定型

 それではこれまでの知識を基にシンプルなサーバを作って見ましょう。これまでの例ではreceiveが1個のmessageを受け取るとプロセスは終了してました。永続的にmessageを受け取るようにするためには、tail recursionを使う必要があります。

 tail recursionとは、関数の最後の実行が関数callであるときに、スタック操作は行わずに単にgoto文で置き換えるというものです。これで関数は永続的に処理を繰り返すことができるようになります。スタックオーバーフローやメモリ不足に陥ることはありません。

 以下のserver.exではloop/0が再帰的に呼ばれることで、永続的にmessageをreceiveし処理しています。

server.ex
defmodule MyServer do

  ###
  ### client functions
  ###

  # loop 関数をspawnする => サーバプロセスを作る
  def start do
    spawn(&loop/0)
  end

  # サーバプロセスにリクエストを送る
  def send_req(server_pid, req) do
    send(server_pid, {:req, self(), req})
  end

  # メッセージを取得する
  def get_res do
    receive do
      {:res, res} -> res
    after
      5000 -> {:error, :timeout}
    end
  end


  ###
  ### server functions
  ###

  defp loop do
    receive do
      {:req, client_pid, req} ->
        send(client_pid, {:res, "res for #{req}"})
    end
    loop()
  end
end

 さて以下のようにiexで、MyServerのプロセスを100個起動し、負荷分散を目指します。MyServerはStateを持たないので、単純に100個起動するだけです。5個の仕事を行わせるのに、ランダムの5個のプロセスを選択しリクエストを出します。最後に結果をreceiveして表示します。

###
### MyServerのプロセスを100個起動し、負荷分散を目指す。
###
iex(1)> pool = Enum.map(1..100, fn _ -> MyServer.start() end)
[#PID<0.89.0>, #PID<0.90.0>, #PID<0.91.0>, #PID<0.92.0>, #PID<0.93.0>,
 ---
 #PID<0.134.0>, #PID<0.135.0>, #PID<0.136.0>, #PID<0.137.0>, #PID<0.138.0>, ...]

###
### 5個のプロセスをランダムに選びリクエストを送る。
###
iex(2)> Enum.each(
...(2)>           1..5,
...(2)>           fn req ->
...(2)>             child_pid = Enum.at(pool, :rand.uniform(100) - 1)
...(2)>             MyServer.send_req(child_pid, req)
...(2)>           end
...(2)>         )
:ok

###
### 返信messageを1個づつ取り出す。
###
iex(3)> MyServer.get_res()
"res for 1"
iex(4)> MyServer.get_res()
"res for 3"
iex(5)> MyServer.get_res()
"res for 4"
iex(6)> MyServer.get_res()
"res for 2"
iex(7)> MyServer.get_res()
"res for 5"
iex(8)> MyServer.get_res()
{:error, :timeout}
iex(9)>

 今回は以上です。

■ Elixirプロセス関連の過去記事
Elixir でプロセス管理 - A Fibonacci Server - Qiita
Elixirの分散処理(Node)とMnesia - Qiita

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?