9
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Elixirライブラリ「Pattern」のDispatcherを読む

Last updated at Posted at 2020-04-18

はじめに

先日、@ryo33さんと Pattern の機能拡張をペアプロで行いました。そのときの風景はYouTubeにて公開されています。

この記事では上記の動画で触れなかった Pattern の Dispatcher を解説します。

Patternとは

Pattern とはCizenからCizen.Filterの機能を抽出し、リファクタリングや機能拡張を図ったものです。

Cizenとは

Cizenはオートマトンと呼ばれるプロセスを使ってアプリケーションを構築するためのイベントハンドリングライブラリです。
@ryo33さんが書いたQiita記事、Elixirでやばいマクロ(Pattern)を書いた理由【イベント購読】から引用します。

https://hexdocs.pm/cizen/readme.html
"Build highly concurrent, monitorable, and extensible applications with a collection of automata."

めちゃくちゃ雑に説明すると、

  1. プロセスをSagaという形式で統一的に管理し
  2. Sagaはイベントを購読しイベントを配信することで他のSagaに影響を与えるような枠組みを提供するためのライブラリです。

イベントの購読時にはCizen.Filterを使ってクエリを生成します。
近いうちにCizenからCizen.Filterを抽出したPatternでCizen.Filterを置き換える予定です。

使用例

Pattern は構造体や関数やパターンマッチが複合した柔軟な記法により、イベントをクエリすることができます。

イベントのクエリの前に単純なマッチの例を見ましょう。

pattern = Pattern.new(%A{key1: "a"})
assert Pattern.match?(pattern, %A{key1: "a"})
refute Pattern.match?(pattern, %B{key1: "a"})

関数やパターンマッチも使ってみましょう。

b = "b"
pattern = Pattern.new(fn
            %A{key1: a} when is_atom(a) -> a == :a
            %A{key1: ^b} -> true
          end)
refute Pattern.match?(pattern, %A{key1: "a"})
assert Pattern.match?(pattern, %A{key1: "b"})
assert Pattern.match?(pattern, %A{key1: :a})

次に Dispatcher を用いたイベントのクエリの例を見ましょう。

Dispatcher.register(pid, Pattern.new(fn %Event{body: %TestEventA{}} -> true end), "ref1")
Dispatcher.register(pid, Pattern.new(fn %Event{body: %TestEventA{}} -> true end), "ref2")
Dispatcher.register(pid, Pattern.new(fn %Event{body: %TestEventB{}} -> true end), "ref3")
dispatch = Dispatcher.dispatch(pid, %Event{body: %TestEventA{}})
assert MapSet.new(["ref1", "ref2"]) == MapSet.new(dispatch)

Dispatcher.dispatch/2の第二引数で指定したクエリに対する適切な購読のリファレンスが帰ってきていることが確認できます。

Dispatcher の要件としてスケールすること、つまり購読数に対し線形探索よりも効率の良いアルゴリズムを用いる必要があります。実際の Dispatcher ではpatternを前置記法で表現した命令のツリーに変換して格納しています。この設計の話は上司の書いたQiita記事にて解説されています。

Dispatcher のコード

解説はこのような流れで進めていきます。

  1. Pattern.Dispatcher.register/3の第二引数で渡されるpatternの正体
  2. Pattern.Dispatcherがしていること
  3. Pattern.Dispatcher.Nodeがしていること
    1. Pattern.Dispatcher.Nodeの構造
    2. putdeleteしたとき
    3. getしたとき

Pattern.Dispatcher.register/3の第二引数で渡されるpatternの正体

Dispatcher本体の解説の前にpatternがどういうものなのかを見てみましょう。patternとはPattern.new/1したときに生成される構造体です。引数である構造体や無名関数は、木構造に入れやすく遅延評価できる前置記法に変換され、codeフィールドに格納されます。

iex> Pattern.new(%A{})                                               
%Pattern{
  code: {:and, [is_map: [access: []], ==: [{:access, [:__struct__]}, A]]}
}
iex> Pattern.new(%A{key1: "a"})                                                                                      
%Pattern{
  code: {:and,
   [
     and: [is_map: [access: []], ==: [{:access, [:__struct__]}, A]],
     ==: [{:access, [:key1]}, "a"]
   ]}
}

Pattern.Dispatcherがしていること

Pattern.DispatcherGenServerを実装する形でAPIを提供しています。init/1初期値となるルートノードを生成しregister/3unregister/3dispatch/2からPattern.Dispatch.Nodeの関数を呼び出しています。

以下はPattern.Dispatcherの抜粋です。

defmodule Pattern.Dispatcher do
  # ...
  use GenServer

  @spec start_link([GenServer.option()]) :: GenServer.on_start()
  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts)
  end

  @spec register(GenServer.server(), Pattern.t(), any) :: :ok
  def register(name, pattern, ref) do
    GenServer.cast(name, {:put, pattern, ref})
  end
  # ...
  @impl GenServer
  def init(_) do
    {:ok, Node.new()}
  end

  @impl GenServer
  def handle_cast({:put, pattern, ref}, state) do
    {:noreply, Node.put(state, pattern.code, ref)}
  end
  # ...
  @impl GenServer
  def handle_call(:get, _from, state) do
    {:reply, state, state}
  end
end

Pattern.Dispatcher.Nodeがしていること

Pattern.Dispatcher.Nodeの構造

Nodeの型は次のように定義されています。

@type t :: %__MODULE__{subscriptions: MapSet.t(), operations: %{Code.t() => %{term => t}}}
  • subscriptions : ノードに紐づけされている購読のマップセット
  • operations : %{子ノードのキーを見つけるためのコード => %{キー => 子ノード}}

いくつかpatternを追加してみます。

iex> Dispatcher.register(pid, Pattern.new(%{}), "ref1") # => :ok
iex> Dispatcher.register(pid, Pattern.new(%A{foo: "a"}), "ref2") # => :ok
iex> Dispatcher.register(pid, Pattern.new(%A{foo: "b"}), "ref3") # => :ok
iex> Dispatcher.register(pid, Pattern.new(%B{bar: "c"}), "ref4") # => :ok
iex> Dispatcher.register(pid, Pattern.new(%B{bar: "c"}), "ref5") # => :ok
iex> Dispatcher.register(pid, Pattern.new(fn "e" -> true end), "ref6") # => :ok
iex> Dispatcher.register(pid, Pattern.any([Pattern.new(%A{}), Pattern.new(%B{})]), "ref7") # => :ok

iex> GenServer.call(pid, :get)                                      
%Pattern.Dispatcher.Node{
  operations: %{
    {:access, []} => %{
      "e" => %Pattern.Dispatcher.Node{
        operations: %{},
        subscriptions: #MapSet<["ref6"]>
      }
    },
    {:is_map, [access: []]} => %{
      true: %Pattern.Dispatcher.Node{
        operations: %{
          {:access, [:__struct__]} => %{
            A => %Pattern.Dispatcher.Node{
              operations: %{
                {:access, [:foo]} => %{
                  "a" => %Pattern.Dispatcher.Node{
                    operations: %{},
                    subscriptions: #MapSet<["ref2"]>
                  },
                  "b" => %Pattern.Dispatcher.Node{
                    operations: %{},
                    subscriptions: #MapSet<["ref3"]>
                  }
                }
              },
              subscriptions: #MapSet<["ref7"]>
            },
            B => %Pattern.Dispatcher.Node{
              operations: %{
                {:access, [:bar]} => %{
                  "c" => %Pattern.Dispatcher.Node{
                    operations: %{},
                    subscriptions: #MapSet<["ref4", "ref5"]>
                  }
                }
              },
              subscriptions: #MapSet<["ref7"]>
            }
          }
        },
        subscriptions: #MapSet<["ref1"]>
      }
    }
  },
  subscriptions: #MapSet<[]>
}

これを図で表現するとこのようになります。
pattern_tree.png

putdeleteしたとき

Node.put/3Node.delete/3継続渡しスタイル (CPS: Continuation-passing style)で書かれたrun/2を呼び出しています。

:and:or

コードが{:and, [left, right]}{:or, [left, right]}の場合を見てみましょう。

  defp run(node, {:update, {:and, [left, right]}, continuation}) do
    run(node, {:update, left, {:update, right, continuation}})
  end

  defp run(node, {:update, {:or, [left, right]}, continuation}) do
    node
    |> run({:update, left, continuation})
    |> run({:update, right, continuation})
  end

:andleftの子ノードにrightを吊り下げることでandを実現しています。

{:and, [is_map: [access: []], ==: [{:access, [:__struct__]}, A]]}

pattern_tree_and.png

:orleftrightそれぞれに対しnextを渡すことでorを実現しています。

{:or,
  [
    and: [is_map: [access: []], ==: [{:access, [:__struct__]}, A]],
    and: [is_map: [access: []], ==: [{:access, [:__struct__]}, B]]
  ]}

pattern_tree_or.png

update_operation/4

update_operation/4は子ノードに対しcontinuationを渡してアップデートします。

  defp update_operation(node, operation, value, continuation) do
    values = Map.get(node.operations, operation, %{})

    next_node =
      values
      |> Map.get(value, new())
      |> run(continuation)

    node = put_in(node.operations[operation], values)
    put_in(node.operations[operation][value], next_node)
  end
end
update_operationの例
%Pattern.Dispatcher.Node{
  operations: %{
    {:access, [:__struct__]} => %{
      A => %Pattern.Dispatcher.Node{
        operations: %{},
        subscriptions: #MapSet<["ref1"]>
      }
    }
  },
  subscriptions: #MapSet<[]>
} |> update_operation({:access, [:__struct__]}, A, {:delete_subscription, "ref1"})
# => ref1が削除される
%Pattern.Dispatcher.Node{
  operations: %{
    {:access, [:__struct__]} => %{
      A => %Pattern.Dispatcher.Node{
        operations: %{},
        subscriptions: #MapSet<[]>
      }
    }
  },
  subscriptions: #MapSet<[]>
}

getしたとき

Node.get/2operationsを評価しながらノードを走査し、その際にたどったノードに紐付いているsubscriptionsをマージしていくことで購読を得ます。

Node.get/2の手順は以下の通りです。

  1. 今いるノードのoperations1つに対しPattern.evalを実行し、子ノードのキーを得る
  2. operationに紐付いているnodesから1.で得たキーを使って子ノードを得る
  3. 子ノードでget/2を実行する
  4. 今いるノードのsubscriptionsと、子ノードのget/2で帰ってきたsubscriptionsを結合する
  5. 以上の手順をすべてのoperationsで行う

もう少し詳しく説明します。Node.get/2のコードは以下の通りです。

  @spec get(t, struct) :: MapSet.t()
  def get(node, struct) do
    %__MODULE__{subscriptions: subscriptions, operations: operations} = node

    Enum.reduce(operations, subscriptions, fn {operation, nodes}, subscriptions ->
      value = Pattern.eval(operation, struct)
      node = Map.get(nodes, value, new())
      MapSet.union(subscriptions, get(node, struct))
    end)
  end

operationには{:access, [:key1]}のようなコードが格納されています。Pattern.evalでそのコードをstructに対して評価し、子ノードのキーを得ます。

Pattern.evalの例
assert A == Pattern.eval({:access, [:__struct__]}, %A{key1: "a"})

実際のNodeではコードと子ノードはこのような配置になっています。

%Pattern.Dispatcher.Node{
  operations: %{
    {:access, [:__struct__]} => %{ # == nodes
      A => %Pattern.Dispatcher.Node{ # 子ノード
        # ...
      },
      B => # ...
    },
    # ...
  },
  # ...
}

次の行では先ほど得た子ノードのキーをつかってnodesの中から子ノードを得ます。キーが見つからない場合は空のNodeを作成します。

node = Map.get(nodes, value, new())

最後の行ではsubscriptionsと「子ノードのget/2で得られるsubscriptions」をマージします。

MapSet.union(subscriptions, get(node, struct))

実際に下のようなクエリを実行すると、図のように探索します。

iex> Dispatcher.dispatch(pid, %A{foo: "a"})
["ref1", "ref2", "ref7"]

pattern_tree_get.png

参考文献まとめ

9
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?