LoginSignup
9
0

More than 1 year has passed since last update.

π-計算の紹介と簡単な実装

Last updated at Posted at 2021-12-15

今年1年を振り返ってみると、コミュニケーションについての課題がよく話題に挙がっていたように思います。

全員リモートで仕事をすることが普通になり、コミュニケーションの手段、いわば環境が変化して適応するのに少なからず摩擦が生じ、その解決が迫られていたのでしょう。元からあった問題が顕在化しただけという感じもしますが……

コミュニケーションについて僕も思うところがあったので、最近知ったことを共有したいと思います。

なんと コミュニケーションはチューリング完全だった のです。

π-計算 (π-calculus)

π-計算という計算モデルがあります。これはプロセスどうしがチャネルを通してコミュニケーションをとりあう、というだけでチューリング完全な計算モデルとなっています。

文法は以下です:

$x(y).P$
x というチャネルからメッセージを受け取り、変数 y をそのメッセージに束縛して P を実行する、というプロセス
$\overline{x}\langle y \rangle . P$
x というチャネルに y というメッセージを投げた後に P を実行する、というプロセス
$P \mid Q$
プロセス P と プロセス Q を同時に実行する、というプロセス
$(νx)P$
チャネル x を新たに作り、P を実行する、というプロセス
$!P$
$P \mid P \mid \dots$ と無限に P を並べたもの
$0$
何もしないプロセス

x や y で書かれている部分はすべてチャネルになります。すなわち、メッセージとして送受信できるのはチャネルです。

評価規則は本質的には1つで、同じチャネルに対するメッセージの送受信が並んでいたら解決する、という以下の規則です:
$\overline{x}\langle z \rangle . P \mid x(y).Q → P \mid Q[z/y]$
(ただし、 $Q[z/y]$ というのは Q における y を z に置き換えたものです)

見て分かるように、concurrent な計算をモデル化するのに使えます。

実装

Concurrent な計算といえば Elixir なので、リハビリも兼ねて Elixir で実装してみます。

あんまりちゃんと頑張ってもめんどいだけなので、以下の制約を足して簡素化することにします:

  • パーサは作りません。{:receive, :x, :y, P} みたいなパース済みの形で項を扱います
  • $P \mid Q \mid R$ のように | が並んだものは {:parallel, [P, Q, R]} とリストでまとめてしまいます
  • (νx) の部分は無くてもいいと思うので削除します

以上から、項(プロセス)の型を以下のように表現することとします:

defmodule Pi do
  @type channel :: atom
  @type name :: atom
  @type message :: atom
  @type process ::
          {:receive, channel, name, process}
          | {:send, channel, message, process}
          | {:parallel, list(process)}
          | {:bang, process}
          | {:none}

  # 続く...

評価の肝となる「置き換え」ですが、チャネルも変数も atom として定義したので、 atom の置き換えを定義しておきます:

  @spec substitute_name(term :: atom, value :: atom, name :: atom) :: atom
  defp substitute_name(name, value, name), do: value
  defp substitute_name(term, _, _), do: term

これを利用して P から P[z/y] を計算する関数を定義します。 channelmessage の部分だけが置き換え対象で、それ以外の部分はただ再帰的に呼び出すだけです:

  @spec substitute(process :: process, channel :: channel, name :: name) :: process
  def substitute({:receive, channel, name, process}, target_channel, target_name) do
    new_channel = substitute_name(channel, target_channel, target_name)
    {:receive, new_channel, name, substitute(process, target_channel, target_name)}
  end

  def substitute({:send, channel, message, process}, target_channel, target_name) do
    new_channel = substitute_name(channel, target_channel, target_name)
    new_message = substitute_name(message, target_channel, target_name)
    {:send, new_channel, new_message, substitute(process, target_channel, target_name)}
  end

  def substitute({:parallel, processes}, target_channel, target_name) do
    {
      :parallel,
      Enum.map(processes, &substitute(&1, target_channel, target_name))
    }
  end

  def substitute({:bang, process}, target_channel, target_name) do
    {:bang, substitute(process, target_channel, target_name)}
  end

  def substitute({:none}, _, _) do
    {:none}
  end

これで評価関数が定義できます。 {:parallel, processes} の場合が一番の肝なので、別の関数に処理を移譲します:

  @spec evaluate(process :: process) :: process
  def evaluate({:receive, _, _, _} = p), do: p
  def evaluate({:send, _, _, _} = p), do: p

  def evaluate({:parallel, processes}) do
    case evaluate_parallel(processes) do
      [] -> {:none}
      [p] -> p
      new_processes -> {:parallel, new_processes}
    end
  end

  def evaluate({:new, name, process}), do: {:new, name, evaluate(process)}

  def evaluate({:bang, process} = p) do
    case evaluate(process) do
      ^process -> p
      new_process -> {:parallel, [new_process, {:bang, process}]}
    end
  end

  def evaluate({:none} = p), do: p

上記において {:bang, process} の場合が微妙に悩ましいですが、 $!P = P \mid !P$ および $P → P' \implies P \mid Q → P' \mid Q$ を組み合わせて $P → P' \implies !P → P' \mid !P$ としています。

最後に、評価の肝となる evaluate_parallel を定義していきます。

プロセスのリストを左からなめていって、 receive/send があれば対応する send/receive を探します。
対応するものがなければ再びリストをなめていき、対応するものがあれば置き換えを行えばよいですね。

が、そう簡単でもありません。途中に bang や parallel が登場する可能性があるのです。この bang や parallel の中に receive/send が入っている可能性があります。

これを回避して楽をするため、parallel の直下や bang の直下に parallel は登場しないと仮定してしまいます。これは
$P \mid (Q_1 \mid \dots \mid Q_n) \mid R = P \mid Q_1 \mid \dots \mid Q_n \mid R$ および
$!(P \mid Q) = !P \mid !Q$ を利用して、事前に項を変換しておくことで対応します:

  @spec normalize(process :: process) :: process
  def normalize({:receive, channel, name, process}) do
    {:receive, channel, name, normalize(process)}
  end

  def normalize({:send, channel, message, process}) do
    {:send, channel, message, normalize(process)}
  end

  def normalize({:parallel, processes}) do
    new_processes =
      Enum.flat_map(processes, fn process ->
        case normalize(process) do
          {:parallel, normalized_processes} -> normalized_processes
          normalized_process -> [normalized_process]
        end
      end)
    case new_processes do
      [] -> {:none}
      [p] -> p
      _ -> {:parallel, new_processes}
    end
  end

  def normalize({:bang, process}) do
    case normalize(process) do
      {:bang, _} = p ->
        p
      {:parallel, normalized_processes} ->
        {:parallel, Enum.map(normalized_processes, fn
          {:bang, p} -> {:bang, p}
          p -> {:bang, p}
        end)}
      normalized_process ->
        {:bang, normalized_process}
    end
  end

  def normalize({:none}), do: {:none}

これで、 parallel の直下には parallel は存在せず、もし bang があればその中には parallel が登場しないということが仮定できます。
したがって、 parallel からはその直下の receive/send と、直下の bang のさらに直下の receive/send だけを探すだけで良くなります。

  @spec evaluate_parallel(processes :: list(process)) :: list(process)
  defp evaluate_parallel([]), do: []

  defp evaluate_parallel([{:receive, channel, name, p11} = p1 | rest]) do
    case Enum.split_while(rest, fn
           {:send, ^channel, _, _} -> false
           {:bang, {:send, ^channel, _, _}} -> false
           _ -> true
         end) do
      {_, []} ->
        [p1 | evaluate_parallel(rest)]

      {not_matched, [{:send, ^channel, message, p2} | rest]} ->
        [substitute(p11, name, message) | not_matched] ++ [p2 | rest]

      {not_matched, [{:bang, {:send, ^channel, message, p21}} = p2 | rest]} ->
        [substitute(p11, name, message) | not_matched] ++ [p21, p2 | rest]
    end
  end

  defp evaluate_parallel([{:send, channel, message, p11} = p1 | rest]) do
    case Enum.split_while(rest, fn
           {:receive, ^channel, _, _} -> false
           {:bang, {:receive, ^channel, _, _}} -> false
           _ -> true
         end) do
      {_, []} ->
        [p1 | evaluate_parallel(rest)]

      {not_matched, [{:receive, ^channel, name, p2} | rest]} ->
        [p11 | not_matched] ++ [substitute(p2, message, name) | rest]

      {not_matched, [{:bang, {:receive, ^channel, name, p21}} = p2 | rest]} ->
        [p11 | not_matched] ++ [substitute(p21, message, name), p2 | rest]
    end
  end

  defp evaluate_parallel([{:bang, p11} = p1 | rest]) do
    case evaluate_parallel([p11 | rest]) do
      [^p11 | new_rest] -> [p1 | new_rest]
      new_rest -> [p1 | new_rest]
    end
  end

  defp evaluate_parallel([{:none} | rest]) do
    rest
  end
end

Gist にソースコード全体を置いときました。

動作確認

割と安直に作ったので色々とエッジケースで間違いはありそうですが、Wikipedia にもあった以下の例で試してみます(νを消したり、$0 \mid$ を先に消したりして少しいじっています):

$\overline{x} \langle z \rangle.0 \mid x(y). \overline{y}\langle x \rangle . x(y).0 \mid z(v) . \overline{v}\langle v \rangle. 0$
→ $0 \mid \overline{z}\langle x \rangle . x(y). 0 \mid z(v). \overline{v}\langle v \rangle .0 $
→ $\overline{z}\langle x \rangle . x(y). 0 \mid z(v). \overline{v}\langle v \rangle .0 $
→ $x(y). 0 \mid \overline{x}\langle x \rangle .0$
→ $0 \mid 0$
→ $0$

iex(1)> p = {:parallel, [{:send, :x, :z, {:none}}, {:receive, :x, :y, {:send, :y, :x, {:receive, :x, :y, {:none}}}}, {:receive, :z, :v, {:send, :v, :v, {:none}}}]}
{:parallel,
 [
   {:send, :x, :z, {:none}},
   {:receive, :x, :y, {:send, :y, :x, {:receive, :x, :y, {:none}}}},
   {:receive, :z, :v, {:send, :v, :v, {:none}}}
 ]}
iex(2)> Pi.evaluate(p)                      
{:parallel,
 [
   {:none},
   {:send, :z, :x, {:receive, :x, :y, {:none}}},
   {:receive, :z, :v, {:send, :v, :v, {:none}}}
 ]}
iex(3)> Pi.evaluate(v)
{:parallel,
 [
   {:send, :z, :x, {:receive, :x, :y, {:none}}},
   {:receive, :z, :v, {:send, :v, :v, {:none}}}
 ]}
iex(4)> Pi.evaluate(v)
{:parallel, [{:receive, :x, :y, {:none}}, {:send, :x, :x, {:none}}]}
iex(5)> Pi.evaluate(v)
{:parallel, [{:none}, {:none}]}
iex(6)> Pi.evaluate(v)
{:none}

Pi.normalize/1 はする必要が無いので省略しています。見た感じうまく評価できていそうです。

(Concurrency どこいった……?)

まとめ

π-計算というのが面白いので紹介したかった。

参考文献

9
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
0