はじめに
先日、@ryo33さんと Pattern の機能拡張をペアプロで行いました。そのときの風景はYouTubeにて公開されています。
この記事では上記の動画で触れなかった Pattern の Dispatcher を解説します。
Patternとは
Pattern とはCizenからCizen.Filter
の機能を抽出し、リファクタリングや機能拡張を図ったものです。
Cizenとは
Cizenはオートマトンと呼ばれるプロセスを使ってアプリケーションを構築するためのイベントハンドリングライブラリです。
@ryo33さんが書いたQiita記事、Elixirでやばいマクロ(Pattern)を書いた理由【イベント購読】から引用します。
https://hexdocs.pm/cizen/readme.html
"Build highly concurrent, monitorable, and extensible applications with a collection of automata."
めちゃくちゃ雑に説明すると、
- プロセスをSagaという形式で統一的に管理し
- Sagaはイベントを購読しイベントを配信することで他のSagaに影響を与えるような枠組みを提供するためのライブラリです。
イベントの購読時にはCizen.Filterを使ってクエリを生成します。
近いうちにCizenからCizen.Filterを抽出したPatternでCizen.Filterを置き換える予定です。
使用例
Pattern は構造体や関数やパターンマッチが複合した柔軟な記法により、イベントをクエリすることができます。
イベントのクエリの前に単純なマッチの例を見ましょう。
pattern = Pattern.new(%A{key1: "a"})
assert Pattern.match?(pattern, %A{key1: "a"})
refute Pattern.match?(pattern, %B{key1: "a"})
関数やパターンマッチも使ってみましょう。
b = "b"
pattern = Pattern.new(fn
%A{key1: a} when is_atom(a) -> a == :a
%A{key1: ^b} -> true
end)
refute Pattern.match?(pattern, %A{key1: "a"})
assert Pattern.match?(pattern, %A{key1: "b"})
assert Pattern.match?(pattern, %A{key1: :a})
次に Dispatcher を用いたイベントのクエリの例を見ましょう。
Dispatcher.register(pid, Pattern.new(fn %Event{body: %TestEventA{}} -> true end), "ref1")
Dispatcher.register(pid, Pattern.new(fn %Event{body: %TestEventA{}} -> true end), "ref2")
Dispatcher.register(pid, Pattern.new(fn %Event{body: %TestEventB{}} -> true end), "ref3")
dispatch = Dispatcher.dispatch(pid, %Event{body: %TestEventA{}})
assert MapSet.new(["ref1", "ref2"]) == MapSet.new(dispatch)
Dispatcher.dispatch/2
の第二引数で指定したクエリに対する適切な購読のリファレンスが帰ってきていることが確認できます。
Dispatcher の要件としてスケールすること、つまり購読数に対し線形探索よりも効率の良いアルゴリズムを用いる必要があります。実際の Dispatcher ではpattern
を前置記法で表現した命令のツリーに変換して格納しています。この設計の話は上司の書いたQiita記事にて解説されています。
Dispatcher のコード
解説はこのような流れで進めていきます。
-
Pattern.Dispatcher.register/3
の第二引数で渡されるpattern
の正体 -
Pattern.Dispatcher
がしていること -
Pattern.Dispatcher.Node
がしていること-
Pattern.Dispatcher.Node
の構造 -
put
、delete
したとき -
get
したとき
-
Pattern.Dispatcher.register/3
の第二引数で渡されるpattern
の正体
Dispatcher本体の解説の前にpattern
がどういうものなのかを見てみましょう。pattern
とはPattern.new/1
したときに生成される構造体です。引数である構造体や無名関数は、木構造に入れやすく遅延評価できる前置記法に変換され、code
フィールドに格納されます。
iex> Pattern.new(%A{})
%Pattern{
code: {:and, [is_map: [access: []], ==: [{:access, [:__struct__]}, A]]}
}
iex> Pattern.new(%A{key1: "a"})
%Pattern{
code: {:and,
[
and: [is_map: [access: []], ==: [{:access, [:__struct__]}, A]],
==: [{:access, [:key1]}, "a"]
]}
}
Pattern.Dispatcher
がしていること
Pattern.Dispatcher
はGenServer
を実装する形でAPIを提供しています。init/1
初期値となるルートノードを生成しregister/3
、unregister/3
、dispatch/2
からPattern.Dispatch.Node
の関数を呼び出しています。
以下はPattern.Dispatcher
の抜粋です。
defmodule Pattern.Dispatcher do
# ...
use GenServer
@spec start_link([GenServer.option()]) :: GenServer.on_start()
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts)
end
@spec register(GenServer.server(), Pattern.t(), any) :: :ok
def register(name, pattern, ref) do
GenServer.cast(name, {:put, pattern, ref})
end
# ...
@impl GenServer
def init(_) do
{:ok, Node.new()}
end
@impl GenServer
def handle_cast({:put, pattern, ref}, state) do
{:noreply, Node.put(state, pattern.code, ref)}
end
# ...
@impl GenServer
def handle_call(:get, _from, state) do
{:reply, state, state}
end
end
Pattern.Dispatcher.Node
がしていること
Pattern.Dispatcher.Node
の構造
Nodeの型は次のように定義されています。
@type t :: %__MODULE__{subscriptions: MapSet.t(), operations: %{Code.t() => %{term => t}}}
-
subscriptions
: ノードに紐づけされている購読のマップセット -
operations
:%{子ノードのキーを見つけるためのコード => %{キー => 子ノード}}
いくつかpattern
を追加してみます。
iex> Dispatcher.register(pid, Pattern.new(%{}), "ref1") # => :ok
iex> Dispatcher.register(pid, Pattern.new(%A{foo: "a"}), "ref2") # => :ok
iex> Dispatcher.register(pid, Pattern.new(%A{foo: "b"}), "ref3") # => :ok
iex> Dispatcher.register(pid, Pattern.new(%B{bar: "c"}), "ref4") # => :ok
iex> Dispatcher.register(pid, Pattern.new(%B{bar: "c"}), "ref5") # => :ok
iex> Dispatcher.register(pid, Pattern.new(fn "e" -> true end), "ref6") # => :ok
iex> Dispatcher.register(pid, Pattern.any([Pattern.new(%A{}), Pattern.new(%B{})]), "ref7") # => :ok
iex> GenServer.call(pid, :get)
%Pattern.Dispatcher.Node{
operations: %{
{:access, []} => %{
"e" => %Pattern.Dispatcher.Node{
operations: %{},
subscriptions: #MapSet<["ref6"]>
}
},
{:is_map, [access: []]} => %{
true: %Pattern.Dispatcher.Node{
operations: %{
{:access, [:__struct__]} => %{
A => %Pattern.Dispatcher.Node{
operations: %{
{:access, [:foo]} => %{
"a" => %Pattern.Dispatcher.Node{
operations: %{},
subscriptions: #MapSet<["ref2"]>
},
"b" => %Pattern.Dispatcher.Node{
operations: %{},
subscriptions: #MapSet<["ref3"]>
}
}
},
subscriptions: #MapSet<["ref7"]>
},
B => %Pattern.Dispatcher.Node{
operations: %{
{:access, [:bar]} => %{
"c" => %Pattern.Dispatcher.Node{
operations: %{},
subscriptions: #MapSet<["ref4", "ref5"]>
}
}
},
subscriptions: #MapSet<["ref7"]>
}
}
},
subscriptions: #MapSet<["ref1"]>
}
}
},
subscriptions: #MapSet<[]>
}
put
、delete
したとき
Node.put/3
、Node.delete/3
は継続渡しスタイル (CPS: Continuation-passing style)で書かれたrun/2
を呼び出しています。
:and
と :or
コードが{:and, [left, right]}
や{:or, [left, right]}
の場合を見てみましょう。
defp run(node, {:update, {:and, [left, right]}, continuation}) do
run(node, {:update, left, {:update, right, continuation}})
end
defp run(node, {:update, {:or, [left, right]}, continuation}) do
node
|> run({:update, left, continuation})
|> run({:update, right, continuation})
end
:and
はleft
の子ノードにright
を吊り下げることでandを実現しています。
{:and, [is_map: [access: []], ==: [{:access, [:__struct__]}, A]]}
:or
はleft
とright
それぞれに対しnext
を渡すことでorを実現しています。
{:or,
[
and: [is_map: [access: []], ==: [{:access, [:__struct__]}, A]],
and: [is_map: [access: []], ==: [{:access, [:__struct__]}, B]]
]}
update_operation/4
update_operation/4
は子ノードに対しcontinuation
を渡してアップデートします。
defp update_operation(node, operation, value, continuation) do
values = Map.get(node.operations, operation, %{})
next_node =
values
|> Map.get(value, new())
|> run(continuation)
node = put_in(node.operations[operation], values)
put_in(node.operations[operation][value], next_node)
end
end
%Pattern.Dispatcher.Node{
operations: %{
{:access, [:__struct__]} => %{
A => %Pattern.Dispatcher.Node{
operations: %{},
subscriptions: #MapSet<["ref1"]>
}
}
},
subscriptions: #MapSet<[]>
} |> update_operation({:access, [:__struct__]}, A, {:delete_subscription, "ref1"})
# => ref1が削除される
%Pattern.Dispatcher.Node{
operations: %{
{:access, [:__struct__]} => %{
A => %Pattern.Dispatcher.Node{
operations: %{},
subscriptions: #MapSet<[]>
}
}
},
subscriptions: #MapSet<[]>
}
get
したとき
Node.get/2
はoperations
を評価しながらノードを走査し、その際にたどったノードに紐付いているsubscriptions
をマージしていくことで購読を得ます。
Node.get/2
の手順は以下の通りです。
- 今いるノードの
operations
1つに対しPattern.eval
を実行し、子ノードのキーを得る -
operation
に紐付いているnodes
から1.で得たキーを使って子ノードを得る - 子ノードで
get/2
を実行する - 今いるノードの
subscriptions
と、子ノードのget/2
で帰ってきたsubscriptions
を結合する - 以上の手順をすべての
operations
で行う
もう少し詳しく説明します。Node.get/2
のコードは以下の通りです。
@spec get(t, struct) :: MapSet.t()
def get(node, struct) do
%__MODULE__{subscriptions: subscriptions, operations: operations} = node
Enum.reduce(operations, subscriptions, fn {operation, nodes}, subscriptions ->
value = Pattern.eval(operation, struct)
node = Map.get(nodes, value, new())
MapSet.union(subscriptions, get(node, struct))
end)
end
operation
には{:access, [:key1]}
のようなコードが格納されています。Pattern.eval
でそのコードをstruct
に対して評価し、子ノードのキーを得ます。
assert A == Pattern.eval({:access, [:__struct__]}, %A{key1: "a"})
実際のNodeではコードと子ノードはこのような配置になっています。
%Pattern.Dispatcher.Node{
operations: %{
{:access, [:__struct__]} => %{ # == nodes
A => %Pattern.Dispatcher.Node{ # 子ノード
# ...
},
B => # ...
},
# ...
},
# ...
}
次の行では先ほど得た子ノードのキーをつかってnodes
の中から子ノードを得ます。キーが見つからない場合は空のNodeを作成します。
node = Map.get(nodes, value, new())
最後の行ではsubscriptions
と「子ノードのget/2
で得られるsubscriptions
」をマージします。
MapSet.union(subscriptions, get(node, struct))
実際に下のようなクエリを実行すると、図のように探索します。
iex> Dispatcher.dispatch(pid, %A{foo: "a"})
["ref1", "ref2", "ref7"]
参考文献まとめ
- Pattern: https://gitlab.com/cizen/pattern
- Cizen Project: https://gitlab.com/cizen
- @ryo33さんの記事: https://qiita.com/ryo33/items/5baedf6dcb66d72f303c
- CPS: https://ja.wikipedia.org/wiki/継続渡しスタイル