LoginSignup
14
5

はじめてな Elixir(34) FLAME で関数をまるっと外に飛ばす

Last updated at Posted at 2023-12-29

これは #Elixir Advent Calendar 2023 の16日目です。昨日は @g_kenkun さんの 3年ぶりにElixirのAtCoder用Mixタスクライブラリを更新しました でした。

はじめに

Phoenix の開発者である Chris McCord さんが X-twitter にこんな投稿をしてると知人に教えられました。

中身を読むとむむむむ、われわれがやろうとしてたネタと被るではないですか。FLAME (Fleeting Lambda Application for Modular Execution) … これは調べにゃいかんとつついてみました。

ざっくりどういうものか

手元のデバイスの処理能力が貧弱なときに、クラウドに一部の計算を飛ばすような分散処理をしたくなります。これは一般的には RPC (Remote Procedure Call) と呼ばれる技術で、こういう処理を書こうとしたときには結構な量のプログラムを書かないといけません。Elixir/Erlang はこのあたりは大変良く出来てはいますが、それでもそれなりには書く必要があります。それを超簡単に実施できるようにしましょうという話です。

例えば、重た〜い処理をする関数

def heavy_func do
  重た〜い処理
end

があったとします。ここで FLAME を使えば

def heavy_func do
  FLAME.call(..., fn -> 重た〜い処理 end)
end

と書くだけで heavy_func が手元ではなく、どっか別のところに飛んでいって計算してくれる… という便利な枠組みです。

この処理を肩代わりしてくれる「どっか別の所」を FLAME では Backend と呼びます。現在の FLAME では、手元の Erlang VM 上での Backend である LocalBackend と、クラウド上の Backend である FlyBackend との2種類がデフォールトで用意されています。開発の途中では LocalBackend を用いて、プロダクトとして出す頃には FlyBackend を使って… というようなストーリーに感じます。

さらに、FLAMEの説明によると関数だけでなくプロセスも同様に外に飛ばすように記述できるようです。以下で DynamicSupervisor でいごかしてたプロセスを FLAME.place_child/3 関数でどこか遠くに飛ばせるらしいです。

- {:ok, pid} = DynamicSupervisor.start_child(@sup, spec)
+ {:ok, pid} = FLAME.place_child(..., spec)

こういう枠組みはこの FLAME が初めてというものではありません。例えば Java では中川郁夫さんがDripcast という枠組みをつくってました。Chirisさんの記事によると Lubien という方が JavaScript で同様の枠組み をつくってるようです。

私が所属している研究開発グループでも Elixir でこのようなことをしようと、Dripcastの枠組みをベースにしてElixirに展開して機能アップしたGiocci(ジョッキ) という枠組みを構築しているところです。この FLAME の話を聞いて「先にやられたか?!」と戦々恐々としているところです。

いごかしてみる

さて、実際にFLAMEをいごかしてみましょう。今回は飛ばす先を手元のマシンとする LocalBackend を用いてどんな動作をするのか見てみます。

まず FLAMEのソースコードgit clone して手元に持ってきます。今回用いたのは Release 0.1.7 です。当該ディレクトリに行って mix deps.get します。

% cd flame                                                 
% mix deps.get

このままだと mix test とかは出来ますが、自分でなにかするのに都合が良くないので少し改変します。

Runner をいごかす

関数やプロセスを飛ばすには、それを受け入れて代わりに計算を処理する Runner を先に立ち上げておく必要があります。それを lib/application.ex に付け加えます。

lib/application.ex
defmodule FLAME.Application do
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    {shutdown_timeout, opts} =
      :flame
      |> Application.get_env(:terminator, [])
      |> Keyword.pop(:shutdown_timeout, 30_000)

    opts = Keyword.put(opts, :name, FLAME.Terminator)

    children = [
-      Supervisor.child_spec({FLAME.Terminator, opts}, shutdown: shutdown_timeout)
+      Supervisor.child_spec({FLAME.Terminator, opts}, shutdown: shutdown_timeout),
+      {FLAME.Pool, name: MyRunner, min: 0, max: 1, max_concurrency: 1, idle_shutdown_after: 10_000},
    ]

    opts = [strategy: :one_for_one, name: FLAME.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

これで MyRunner という名前の Runner が走ります。

同期処理

まずは簡単な関数を走らせてみます。ここでは100の階乗を求める無名関数を使います。

% iex -S mix
Erlang/OTP 26 [erts-14.2.1] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit]

Interactive Elixir (1.16.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> fn -> 1..100 |> Enum.product end
#Function<43.105768164/0 in :erl_eval.expr/6>
iex(2)> fn -> 1..100 |> Enum.product end.()
93326215443944152681699238856266700490715968264381621468592963895217599993229915608941463976156518286253697920827223758251185210916864000000000000000000000000

これを自分で計算するのでなく、どこかに飛ばします。今回はどこかといってもクラウドとかではなく、手元でいごくであろう先程設定した MyRunner に飛ばします。
計算結果を受け取るには FLAME.call/3 を用います。これは結果を待つので GenServerhandle.call/3 同様の同期方式の処理になります。

iex(3)> FLAME.call(MyRunner, fn -> 1..100 |> Enum.product end)
93326215443944152681699238856266700490715968264381621468592963895217599993229915608941463976156518286253697920827223758251185210916864000000000000000000000000

これだとどこで実行したんだか良くわかりませんが、メカニズムとしては階乗のプログラムを MyRunner にお願いして計算してます。

非同期処理

結果を受け取らなくて良いなら、非同期に処理する FLAME.cast/2 もあります。これは GenServerhandle_cast/2 同様に、処理の結果をまたずに直ちに制御が戻ってきます。

iex(4)> FLAME.cast(MyRunner, fn -> 1..100 |> Enum.product end)
:ok

これは Runner で階乗の計算はしていますが、それの返り値を待たずにすぐに :ok が返ってきてます。

並行動作

上の例ですと、どこか別のところで計算されてるのかもしれないけど、一つの計算がどこかで実施されてるだけで、FLAME を使ってる感動がありません。複数の計算を次々に飛ばしてるという感じが欲しいのでもうすこし工夫してみます。

試験用の test ディレクトリに ExUnit 用のファイルが用意されてます。test/flame_test.exs の先頭にある sim_long_running/2 関数を流用します。

lib/test.ex
defmodule FLAME.Test do
  def sim_long_running(pool, time \\ 1_000) do
    ref = make_ref()
    parent = self()

    task =
      Task.start_link(fn ->
        FLAME.call(pool, fn ->
          send(parent, {ref, :called})
          Process.sleep(time)
        end)
      end)

    receive do
      {^ref, :called} -> task
    end
  end
end

これはもともと defp されてたのを def にして別のところにおいただけのモジュールです。

% iex -S mix
Erlang/OTP 26 [erts-14.2.1] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit]

Interactive Elixir (1.16.0) - press Ctrl+C to exit (type h() ENTER for help)

iex(1)> FLAME.Test.sim_long_running(MyRunner)
{:ok, #PID<0.224.0>}

これをいごかすと FLAME.call 後に Taskの PID を返して Process.Sleep します。これを1回やるとすぐに iex に制御が返ってきますが、連続してやってみるとどうでしょう。

iex(4)> FLAME.Test.sim_long_running(MyRunner, 10_000)
{:ok, #PID<0.244.0>} # これはすぐに返ってくる
iex(5)> FLAME.Test.sim_long_running(MyRunner, 10_000)
{:ok, #PID<0.252.0>} # しばらく(10秒ほど)待たされる

2回目を実行させた後、しばらくプロンプトが返ってきません。これは Runner が1つしかないので、1回目に起動された Runner がいごいている間は2回目の処理に進まないからです。

Runner の最大数は Application を立ち上げるときに :max パラメータで指定できます。

lib/application.ex
defmodule FLAME.Application do
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    {shutdown_timeout, opts} =
      :flame
      |> Application.get_env(:terminator, [])
      |> Keyword.pop(:shutdown_timeout, 30_000)

    opts = Keyword.put(opts, :name, FLAME.Terminator)

    children = [
      Supervisor.child_spec({FLAME.Terminator, opts}, shutdown: shutdown_timeout),
-      {FLAME.Pool, name: MyRunner, min: 0, max: 1, max_concurrency: 1, idle_shutdown_after: 10_000},
+      {FLAME.Pool, name: MyRunner, min: 0, max: 2, max_concurrency: 1, idle_shutdown_after: 10_000},
    ]

    opts = [strategy: :one_for_one, name: FLAME.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

これで iex -S mix して、先程同様に何回も FLAME.call をしてみます。

iex(1)> FLAME.Test.sim_long_running(MyRunner, 10_000)
{:ok, #PID<0.236.0>} # すぐ返ってくる
iex(2)> FLAME.Test.sim_long_running(MyRunner, 10_000)
{:ok, #PID<0.244.0>} # すぐ返ってくる
iex(3)> FLAME.Test.sim_long_running(MyRunner, 10_000)
{:ok, #PID<0.252.0>} # しばらく(10秒ほど)待たされる
iex(4)> 

こんどは最初の2回の実行後のプロンプトはすぐ返ってきて、3回目の実行のプロンプトがしばらく返ってきません。これは Runner が2つ起動できるようになって、同時に2つのFLAME.callの実行が可能になったからです。

まとめ

関数やプロセスの実行をお手軽に外に飛ばす FLAME を試してみました。一言のおまじないを追加するだけで処理が飛んで行きました。

「別のところで計算させる」と言っても今回は LocalBackend すなわち手元でしか計算させてませんでした。FLAME にはクラウドに飛ばす FlyBackend というのもついてます。これは Fly.io というクラウドサービスを用います。私は使ったこと無いので、お試しするにはちょっとハードルがあります。いずれ時間のあるときに試してみたいと思います。

さて、明日の #Elixir Advent Calendar 2023 の記事は Nobuyuki Inoue さんの Elixir Enum.at()がどれだけ遅いか。そしてMap です。お楽しみに!

参考文献

14
5
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
5