Elixir
プロセス

Elixir でプロセス管理 - A Fibonacci Server

 Elixir初心者です。「Programming Elixir ≥ 1.6」でプロセスの説明の例としてA Fibonacci Serverの実装の例がありました。初心者の自分には多少難しかったので、とまどった箇所を忘れないようにメモっておきたいと思います。文章の中で何回も同じことを書いているところがありますが、そいうところが、忘れないように、ということです。決して文章力が弱いからではありません。あしからず。

 A Fibonacci ServerはElixirのプロセスとして起動され、メッセージで整数Nを送ると、Nのフィボナッチ数を計算して、メッセージとして結果を返してくれるものです。このプロセスは1回のリクエストに対してフィボナッチ数を解き終えた後に、メッセージで返答してくれます。その後2回目のリクエストを待つ状態になります。

 Scheduler.run関数を1回呼ぶことで、Fibonacci Serverのプロセスを複数個立ち上げ、37のフィボナッチ数を20回解いています。ちなみに上で述べたように1つのプロセスが1回だけでなく複数回フィボナッチ数を解くことが可能です。

 メインプログラムはScheduler.run関数を10回読んでいます。それぞれFibonacci Serverのプロセス数を1個~10個立ち上げるようにパラメータを変えて呼んでいます。そしてそれぞれ時間を計測します。1つのプロセスより、複数のプロセスで問題を解く方が速いことを実証しようという試みです。何個のプロセスが最適なのかも計ります。

 本プログラムは3つに分かれます。オリジナルとは逆順になりますが、メインから見てみましょう。何故ならば、私の場合、プログラムの全体像がわからないままに、細部のコードを読んで時間を費やしてしまったからです。

fib.exs
to_process = List.duplicate(37, 20)
#IO.inspect to_process
Enum.each 1..10, fn num_processes ->
  {time, result} = :timer.tc(
    Scheduler, :run,
    [num_processes, FibSolver, :fib, to_process]
  )

  if num_processes == 1 do
    IO.puts inspect result
    IO.puts "\n # time (s)"
  end
  :io.format "~2B ~.2f~n", [num_processes, time/1000000.0]
end

 以下は解決すべき問題キューを作成しています。つまり37のフィボナッチ数を20回解くことを表現しています。

to_process = List.duplicate(37, 20)

 このプログラムの目的はフィボナッチ数を解くことではなく、プロセス数と問題を解くのに要する時間の関係を知ることです。それと複数のプロセスの管理のしかたですかね。だから同じ問題を20回解いたり、プロセス数を1から10まで変化させたりしています。

 さて以下が、メインの呼び出しで、37のフィボナッチ数を20回解いて、かかった時間と計算結果を返してくれます。この呼び出しが、num_processes=1~10まで、10回実行されます。(この辺が読みにくかったのです)。つまりトータルで37のフィボナッチ数を20回x10=200回解いている計算になります。

  {time, result} = :timer.tc(
    Scheduler, :run,
    [num_processes, FibSolver, :fib, to_process]
  )

 以下が実行結果です。最初のnum_processes==1の時だけ、フィボナッチ数の計算結果を表示しています。時間は異なるけど、計算結果は同じなので省略ですね。時間はプロセス数が1個~10個の時でそれぞれ異なります。と言っても1個と2個の時に結構時間を要しているけど、3個目以降はほぼ同じですね。オリジナル本では3個目も少し大きな値で、4個目から落ち着いている感じでした。当然ですが実行環境に依存するのでしょう。

# elixir fib.exs
[{37, 24157817}, {37, 24157817}, {37, 24157817}, {37, 24157817},
 {37, 24157817}, {37, 24157817}, {37, 24157817}, {37, 24157817}, 
 {37, 24157817}, {37, 24157817}, {37, 24157817}, {37, 24157817}, 
 {37, 24157817}, {37, 24157817}, {37, 24157817}, {37, 24157817}, 
 {37, 24157817}, {37, 24157817}, {37, 24157817}, {37, 24157817}]

 # time (s)
 1 45.58
 2 22.59
 3 16.20
 4 15.91
 5 15.60
 6 16.03
 7 15.97
 8 15.63
 9 15.26
10 15.68

 以下がFibonacci Serverのプロセスとなるプログラムです。これは簡単なものですが、注目すべきなのが、{ :fib,... }というメッセージと { :shutdown } というメッセージの2種類のメッセージを受け取ることです。

 まず関数fib(scheduler) が子プロセスとして起動されると親プロセスに{ :ready, ... }を返します。ここでschedulerは親プロセスidです。親はreadyな子にリクエストを送ります。子プロセスが{ :fib, ... }というメッセージを受け取ると、37のフィボナッチ数を1回解いて、親プロセスに解答メッセージ{ :answer, ... }を返します。子プロセスが{ :shutdown }というメッセージを受け取ると、自死します。

fib.exs
defmodule FibSolver do
  def fib(scheduler) do
    send scheduler, { :ready, self() }
    receive do
      { :fib, n, client } ->
        send client, { :answer, n, fib_calc(n), self() }
        fib(scheduler)
      { :shutdown } ->
        exit(:normal)
    end
  end
  # very inefficient, deliberately
  defp fib_calc(0), do: 0
  defp fib_calc(1), do: 1
  defp fib_calc(n), do: fib_calc(n-1) + fib_calc(n-2)
end

 最後に本プログラムの一番複雑な部分を見ます。Schedulerモジュールと名づけられています。特にFibonacci Serverに特化したものでなく、プロセス管理として一般性があるので、このような命名となっていると思われます。以下に説明します。

fib.exs
defmodule Scheduler do
  def run(num_processes, module, func, to_calculate) do
    (1..num_processes)
    |> Enum.map(fn(_) -> spawn(module, func, [self()]) end)
    |> schedule_processes(to_calculate, [])
  end

  defp schedule_processes(processes, queue, results) do
    receive do
      {:ready, pid} when length(queue) > 0 ->
        [ next | tail ] = queue
        send pid, {:fib, next, self()}
        schedule_processes(processes, tail, results)

      {:ready, pid} ->
        send pid, {:shutdown}
        if length(processes) > 1 do
          schedule_processes(List.delete(processes, pid), queue, results)
        else
          Enum.sort(results, fn {n1,_}, {n2,_} -> n1 <= n2 end)
        end

      {:answer, number, result, _pid} ->
        schedule_processes(processes, queue, [ {number, result} | results ])
    end
  end
end

 まず以下のrunがこのモジュールのインターフェースとなる関数です。
 第一引数のnum_processesは起動するFibonacci Serverのプロセス数です。続く引数はmodule= FibSolver、func=fibで固定ですね。最後は to_calculate = [37,37,....37] で20個のリストですね。
 つまりnum_processes個のFibonacci Serverを立ち上げて、37のフィボナッチ数を、20回解くものです。メインはnum_processesを1~10まで変化させてrun関数を呼んでいます。

  def run(num_processes, module, func, to_calculate) do
    (1..num_processes)
    |> Enum.map(fn(_) -> spawn(module, func, [self()]) end)
    |> schedule_processes(to_calculate, [])
  end

 以下が子プロセスのFibonacci Serverとやり取りする関数です。processesは立ち上げた子プロセス(Fibonacci Server)のpidのリストです。queueは解決すべき問題キューで、最初メインから渡された時は20個の要素で[37,37,...37]という形をしています。

 queueが空でない状態で、子プロセス(Fibonacci Server)からメッセージ{:ready,...}を受け取れば、子プロセスにメッセージ{:fib,...}をリクエストして、queueを一個減らします。

 queueが空な状態で、子プロセス(Fibonacci Server)からメッセージ{:ready,...}を受け取れば、子プロセスにメッセージ{:shutdown}を返します。もう解く問題がないので、お前の役割は終わった、死んでいいよ、ということでしょう。この時processesから今殺したpidを削除します。これをprocessesに要素がある間、再帰で繰り返します。queueが空な状態になってからこの再帰を繰り返すトリガーは子プロセス側から見れば、子プロセスがそれまでに解きかけの問題を{:answer, ...}で返し、{:ready,...}を再送することによって繰り返されます。
 そして遂にはprocessesが空になります。それはリクエストに対する解答はすべて提出済みであることを意味します。ですからこの段階でresultを最終的な解答としてソートして、この関数の値とします。つまり再帰呼び出しを止めて関数を終わらせます。

 さてそれでは上のresultはいつ蓄えられたものでしょう?最初は空だったはずです。これは子プロセスから{:answer, ...}が送られてきた時に、再帰呼び出しで蓄えていったものです。

  defp schedule_processes(processes, queue, results) do
    receive do
      {:ready, pid} when length(queue) > 0 ->
        [ next | tail ] = queue
        send pid, {:fib, next, self()}
        schedule_processes(processes, tail, results)

      {:ready, pid} ->
        send pid, {:shutdown}
        if length(processes) > 1 do
          schedule_processes(List.delete(processes, pid), queue, results)
        else
          Enum.sort(results, fn {n1,_}, {n2,_} -> n1 <= n2 end)
        end

      {:answer, number, result, _pid} ->
        schedule_processes(processes, queue, [ {number, result} | results ])
    end
  end

 これでこのプログラムの全容が解明されてめでたしめでたしです。このシステムのよいところは、問題解決のためにFibonacci Serverのプロセスをいくつ立ち上げてもよいというところです。今回の場合でいえば3個で十分なのですが、何個立ち上げても後始末はキチンと行ってくれる。逆に1個でも効率は悪いが、正解はキチンと出してくれます。解く問題や、マシンリソースにあわせて、プロセス数を増やせば簡単にスケーラビリティを得ることができます、という気にさせてくれますね。