今年1年を振り返ってみると、コミュニケーションについての課題がよく話題に挙がっていたように思います。
全員リモートで仕事をすることが普通になり、コミュニケーションの手段、いわば環境が変化して適応するのに少なからず摩擦が生じ、その解決が迫られていたのでしょう。元からあった問題が顕在化しただけという感じもしますが……
コミュニケーションについて僕も思うところがあったので、最近知ったことを共有したいと思います。
なんと コミュニケーションはチューリング完全だった のです。
π-計算 (π-calculus)
π-計算という計算モデルがあります。これはプロセスどうしがチャネルを通してコミュニケーションをとりあう、というだけでチューリング完全な計算モデルとなっています。
文法は以下です:
- $x(y).P$
- x というチャネルからメッセージを受け取り、変数 y をそのメッセージに束縛して P を実行する、というプロセス
- $\overline{x}\langle y \rangle . P$
- x というチャネルに y というメッセージを投げた後に P を実行する、というプロセス
- $P \mid Q$
- プロセス P と プロセス Q を同時に実行する、というプロセス
- $(νx)P$
- チャネル x を新たに作り、P を実行する、というプロセス
- $!P$
- $P \mid P \mid \dots$ と無限に P を並べたもの
- $0$
- 何もしないプロセス
x や y で書かれている部分はすべてチャネルになります。すなわち、メッセージとして送受信できるのはチャネルです。
評価規則は本質的には1つで、同じチャネルに対するメッセージの送受信が並んでいたら解決する、という以下の規則です:
$\overline{x}\langle z \rangle . P \mid x(y).Q → P \mid Q[z/y]$
(ただし、 $Q[z/y]$ というのは Q における y を z に置き換えたものです)
見て分かるように、concurrent な計算をモデル化するのに使えます。
実装
Concurrent な計算といえば Elixir なので、リハビリも兼ねて Elixir で実装してみます。
あんまりちゃんと頑張ってもめんどいだけなので、以下の制約を足して簡素化することにします:
- パーサは作りません。
{:receive, :x, :y, P}
みたいなパース済みの形で項を扱います - $P \mid Q \mid R$ のように | が並んだものは
{:parallel, [P, Q, R]}
とリストでまとめてしまいます - (νx) の部分は無くてもいいと思うので削除します
以上から、項(プロセス)の型を以下のように表現することとします:
defmodule Pi do
@type channel :: atom
@type name :: atom
@type message :: atom
@type process ::
{:receive, channel, name, process}
| {:send, channel, message, process}
| {:parallel, list(process)}
| {:bang, process}
| {:none}
# 続く...
評価の肝となる「置き換え」ですが、チャネルも変数も atom として定義したので、 atom の置き換えを定義しておきます:
@spec substitute_name(term :: atom, value :: atom, name :: atom) :: atom
defp substitute_name(name, value, name), do: value
defp substitute_name(term, _, _), do: term
これを利用して P から P[z/y] を計算する関数を定義します。 channel
と message
の部分だけが置き換え対象で、それ以外の部分はただ再帰的に呼び出すだけです:
@spec substitute(process :: process, channel :: channel, name :: name) :: process
def substitute({:receive, channel, name, process}, target_channel, target_name) do
new_channel = substitute_name(channel, target_channel, target_name)
{:receive, new_channel, name, substitute(process, target_channel, target_name)}
end
def substitute({:send, channel, message, process}, target_channel, target_name) do
new_channel = substitute_name(channel, target_channel, target_name)
new_message = substitute_name(message, target_channel, target_name)
{:send, new_channel, new_message, substitute(process, target_channel, target_name)}
end
def substitute({:parallel, processes}, target_channel, target_name) do
{
:parallel,
Enum.map(processes, &substitute(&1, target_channel, target_name))
}
end
def substitute({:bang, process}, target_channel, target_name) do
{:bang, substitute(process, target_channel, target_name)}
end
def substitute({:none}, _, _) do
{:none}
end
これで評価関数が定義できます。 {:parallel, processes}
の場合が一番の肝なので、別の関数に処理を移譲します:
@spec evaluate(process :: process) :: process
def evaluate({:receive, _, _, _} = p), do: p
def evaluate({:send, _, _, _} = p), do: p
def evaluate({:parallel, processes}) do
case evaluate_parallel(processes) do
[] -> {:none}
[p] -> p
new_processes -> {:parallel, new_processes}
end
end
def evaluate({:new, name, process}), do: {:new, name, evaluate(process)}
def evaluate({:bang, process} = p) do
case evaluate(process) do
^process -> p
new_process -> {:parallel, [new_process, {:bang, process}]}
end
end
def evaluate({:none} = p), do: p
上記において {:bang, process}
の場合が微妙に悩ましいですが、 $!P = P \mid !P$ および $P → P' \implies P \mid Q → P' \mid Q$ を組み合わせて $P → P' \implies !P → P' \mid !P$ としています。
最後に、評価の肝となる evaluate_parallel
を定義していきます。
プロセスのリストを左からなめていって、 receive/send があれば対応する send/receive を探します。
対応するものがなければ再びリストをなめていき、対応するものがあれば置き換えを行えばよいですね。
が、そう簡単でもありません。途中に bang や parallel が登場する可能性があるのです。この bang や parallel の中に receive/send が入っている可能性があります。
これを回避して楽をするため、parallel の直下や bang の直下に parallel は登場しないと仮定してしまいます。これは
$P \mid (Q_1 \mid \dots \mid Q_n) \mid R = P \mid Q_1 \mid \dots \mid Q_n \mid R$ および
$!(P \mid Q) = !P \mid !Q$ を利用して、事前に項を変換しておくことで対応します:
@spec normalize(process :: process) :: process
def normalize({:receive, channel, name, process}) do
{:receive, channel, name, normalize(process)}
end
def normalize({:send, channel, message, process}) do
{:send, channel, message, normalize(process)}
end
def normalize({:parallel, processes}) do
new_processes =
Enum.flat_map(processes, fn process ->
case normalize(process) do
{:parallel, normalized_processes} -> normalized_processes
normalized_process -> [normalized_process]
end
end)
case new_processes do
[] -> {:none}
[p] -> p
_ -> {:parallel, new_processes}
end
end
def normalize({:bang, process}) do
case normalize(process) do
{:bang, _} = p ->
p
{:parallel, normalized_processes} ->
{:parallel, Enum.map(normalized_processes, fn
{:bang, p} -> {:bang, p}
p -> {:bang, p}
end)}
normalized_process ->
{:bang, normalized_process}
end
end
def normalize({:none}), do: {:none}
これで、 parallel の直下には parallel は存在せず、もし bang があればその中には parallel が登場しないということが仮定できます。
したがって、 parallel からはその直下の receive/send と、直下の bang のさらに直下の receive/send だけを探すだけで良くなります。
@spec evaluate_parallel(processes :: list(process)) :: list(process)
defp evaluate_parallel([]), do: []
defp evaluate_parallel([{:receive, channel, name, p11} = p1 | rest]) do
case Enum.split_while(rest, fn
{:send, ^channel, _, _} -> false
{:bang, {:send, ^channel, _, _}} -> false
_ -> true
end) do
{_, []} ->
[p1 | evaluate_parallel(rest)]
{not_matched, [{:send, ^channel, message, p2} | rest]} ->
[substitute(p11, name, message) | not_matched] ++ [p2 | rest]
{not_matched, [{:bang, {:send, ^channel, message, p21}} = p2 | rest]} ->
[substitute(p11, name, message) | not_matched] ++ [p21, p2 | rest]
end
end
defp evaluate_parallel([{:send, channel, message, p11} = p1 | rest]) do
case Enum.split_while(rest, fn
{:receive, ^channel, _, _} -> false
{:bang, {:receive, ^channel, _, _}} -> false
_ -> true
end) do
{_, []} ->
[p1 | evaluate_parallel(rest)]
{not_matched, [{:receive, ^channel, name, p2} | rest]} ->
[p11 | not_matched] ++ [substitute(p2, message, name) | rest]
{not_matched, [{:bang, {:receive, ^channel, name, p21}} = p2 | rest]} ->
[p11 | not_matched] ++ [substitute(p21, message, name), p2 | rest]
end
end
defp evaluate_parallel([{:bang, p11} = p1 | rest]) do
case evaluate_parallel([p11 | rest]) do
[^p11 | new_rest] -> [p1 | new_rest]
new_rest -> [p1 | new_rest]
end
end
defp evaluate_parallel([{:none} | rest]) do
rest
end
end
Gist にソースコード全体を置いときました。
動作確認
割と安直に作ったので色々とエッジケースで間違いはありそうですが、Wikipedia にもあった以下の例で試してみます(νを消したり、$0 \mid$ を先に消したりして少しいじっています):
$\overline{x} \langle z \rangle.0 \mid x(y). \overline{y}\langle x \rangle . x(y).0 \mid z(v) . \overline{v}\langle v \rangle. 0$
→ $0 \mid \overline{z}\langle x \rangle . x(y). 0 \mid z(v). \overline{v}\langle v \rangle .0 $
→ $\overline{z}\langle x \rangle . x(y). 0 \mid z(v). \overline{v}\langle v \rangle .0 $
→ $x(y). 0 \mid \overline{x}\langle x \rangle .0$
→ $0 \mid 0$
→ $0$
iex(1)> p = {:parallel, [{:send, :x, :z, {:none}}, {:receive, :x, :y, {:send, :y, :x, {:receive, :x, :y, {:none}}}}, {:receive, :z, :v, {:send, :v, :v, {:none}}}]}
{:parallel,
[
{:send, :x, :z, {:none}},
{:receive, :x, :y, {:send, :y, :x, {:receive, :x, :y, {:none}}}},
{:receive, :z, :v, {:send, :v, :v, {:none}}}
]}
iex(2)> Pi.evaluate(p)
{:parallel,
[
{:none},
{:send, :z, :x, {:receive, :x, :y, {:none}}},
{:receive, :z, :v, {:send, :v, :v, {:none}}}
]}
iex(3)> Pi.evaluate(v)
{:parallel,
[
{:send, :z, :x, {:receive, :x, :y, {:none}}},
{:receive, :z, :v, {:send, :v, :v, {:none}}}
]}
iex(4)> Pi.evaluate(v)
{:parallel, [{:receive, :x, :y, {:none}}, {:send, :x, :x, {:none}}]}
iex(5)> Pi.evaluate(v)
{:parallel, [{:none}, {:none}]}
iex(6)> Pi.evaluate(v)
{:none}
Pi.normalize/1
はする必要が無いので省略しています。見た感じうまく評価できていそうです。
(Concurrency どこいった……?)
まとめ
π-計算というのが面白いので紹介したかった。