Erlang
Elixir
OTP

Elixir で Stream を使わないで無限リストを実装してみた

More than 3 years have passed since last update.


前置き

Elixir には、標準で無限リストを扱う Stream というモジュールが用意されています。

これは Erlang には用意されていないものです。

どう言う仕組みで実装しているのだろう、とちょっとだけ調べてみました。

その結果。

別に Stream モジュールに用意されている構造体や関数群を利用しなくても、無限リスト(ストリーム)を作成することは可能だと言うことが判明1

ということでその仕組みを、色々実験してみた結果とともに、ちょと紹介してみます。


環境


  • 環境1:


    • Mac OSX 10.9.5

    • Erlang 7.0.2/OTP 18

    • Elixir 1.0.5 / 1.1.0



  • 環境2:



  • 環境3:


    • glot.io

    • Erlang 6.2/OTP 17

    • Elixir 1.0.2




Enumerable プロトコル

Elixir の列挙の仕組みの肝は、Enumerable プロトコル。

その中で特に重要なのが、Enumerable.reduce/3 関数。第1引数(collection)が、第2・第3引数(acc, fun)を 適切な処理 をして 適切な戻り値 を返すように、このプロトコルを実装すれば、オリジナルのモジュールでも列挙可能になり、Enum モジュールや Stream モジュールに用意されている各関数(例:Enum.take/2, Stream.filter/2 等)が適用できるようになります。

「適切な処理」とか「適切な戻り値」って何やねん。というと。

まず。第2引数 acc

こちらは、以下の3種類のいずれかが来るという想定。それぞれについて「どう処理すべきか」が規定されています。



  • {:cont, <term>}

    … 列挙処理を継続する。


  • {:halt, <term>}

    … 列挙処理を(強制)終了する。


  • {:suspend, <term>}

    … 列挙処理を(一時)中断する。

続いて。第3引数 fun

これは2引数関数で、以下のような動作をするようなものが渡ってきます:


  • 第1引数:列挙されるべき次の値(が渡されることを想定)。

  • 第2引数:アキュムレータ。先ほどの acc 引数で渡ってきた tuple の2番目の要素 <term> が渡されることを想定。

  • 戻り値:上述の acc と同じ形式の3種類の tuple のいずれか。

また返すべき戻り値の種類は、以下の tuple のいずれか:



  • {:done, <term>}

    … 列挙が(正常)終了したことを表す。


  • {:halted, <term>}

    … 列挙が(強制)終了したことを表す。


  • {:suspended, <term>, <continuation>}

    … 列挙が一時中断したことを表す。


    • 第3要素 <continuation>:処理を再開するための関数。引数は1つ、元の引数 acc と同じ形式の tuple、戻り値はいまここで説明している戻り値のいずれか。



そして。

Enumerable.reduce/3 が実装すべき処理とその戻り値の概要は、まとめると以下の通り:


  • 第二引数が {:halt, <term>} の場合:



    • {:halted, <term>} を返せば OK(<term> はそのまま引き渡せばOK、以下同)。



  • 第二引数が {:suspend, <term>} の場合:



    • {:suspended, <term>, <continuation>} の形式の tuple を返す。

      <continuation> は適切に設定する必要がある(詳細例後述)。



  • 第二引数が {:cont, <term>} の場合:


    • 列挙すべき次の値を算出(or 取得)。

    • もう次の値がない(最後まで列挙した後)→ {:done, <term>} を返せば OK。

    • 次の値が存在(=:v):


      • 第3引数 fun に、v<term> を適用(例:fun.(v, term)

      • ↑の結果を第2引数 acc として、fun をそのまま第3引数として、reduce 処理を再帰呼び出しする。





こんな感じで、オリジナルモジュール(オリジナル構造体)に対して defimpl を利用して reduce を実装すれば OK2 です。

ですが。

これをちょっとだけ手軽に実現する方法が、実は用意されていました。


関数(Function)は Enumerable

Elixir のソースを眺めてみると、実は FunctionEnumerable を実装するよう記述されています。

defimpl Enumerable, for: Function … (v1.0.5) / (v1.1.0-rc.0)

その実装はとても単純:


enum.ex(抜粋)

  def reduce(function, acc, fun) when is_function(function, 2),

do: function.(acc, fun)

つまり。

2引数関数として(認識されるように)実装して、上述の acc および fun の2つの引数を上述の通りに 適切な処理 をして 適切な戻り値 を返すようにしさえすれば OK。ということ3

定石的には、処理の本体はプライベート関数で用意して、&other_fn(〜,&1,&2) みたいな書き方で返すようにすれば OK。

{:suspended, <term>, <continuation>} の形式の戻り値の、第3要素 <continuation> も、その関数(自身)を &other_fn(〜,&1,fun) のように記述すれば良いし、acc{:cont, <term>} の形式の時も、再帰的な処理はそのままその関数自体を再帰呼び出しすれば OK。


具体例

ということで、関数実装を利用した無限リストをいくつか実装してみました4


sequences.ex

defmodule Sequences do

@doc ~S"""
Natural Integers (Non-Negative Integers).

## Examples

# take 1..10
iex> Sequences.nats |> Enum.take(10)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
"""
def nats do
&nats(1, &1, &2)
end

defp nats(_, {:halt, acc}, _fun), do: {:halted, acc}
defp nats(n, {:suspend, acc}, fun), do: {:suspended, acc, &nats(n, &1, fun)}
defp nats(n, {:cont, acc}, fun), do: nats(n + 1, fun.(n, acc), fun)

@doc ~S"""
Fibonacci Sequence ([1, 1, 2, 3, ...]).

## Examples

# take first 10 Fibonacci Numbers.
iex> Sequences.fibs |> Enum.take(10)
[1, 1, 2, 3, 5, 8, 13, 21, 34, 55]

# The 100th Fibonacci Number
iex> Sequences.fibs |> Enum.at(99)
354224848179261915075
"""
def fibs do
&fibs({1, 1}, &1, &2)
end

defp fibs(_, {:halt, acc}, _fun), do: {:halted, acc}
defp fibs(v, {:suspend, acc}, fun), do: {:suspended, acc, &fibs(v, &1, fun)}
defp fibs({a, b}, {:cont, acc}, fun), do: fibs({b, a + b}, fun.(a, acc), fun)

@doc ~S"""
Primes.

## Examples

# First 20 primes
iex> Sequences.primes |> Enum.take(20)
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71]

# The 10000th prime
iex> Sequences.primes |> Enum.at(9999)
104729
"""
def primes do
&primes(2, &1, &2)
end

defp primes(_, {:halt, acc}, _fun), do: {:halted, acc}
defp primes(v, {:suspend, acc}, fun) do
{:suspended, acc, &primes(v, &1, fun)}
end
defp primes(2, {:cont, acc}, fun) do
primes(3, fun.(2, acc), fun)
end
defp primes(3, {:cont, acc}, fun) do
primes({%{}, 5, 2, nil}, fun.(3, acc), fun)
end
defp primes({h, p, d, nil}, {:cont, acc}, fun) do
m = next_m(h, p, p, 4, true)
n = p + d
next_h = h |> Map.put(m, p)
primes({next_h, n, 6 - d, Map.get(next_h, n)}, fun.(p, acc), fun)
end
defp primes({h, q, d, b}, {:cont, _} = acc, fun) do
k = if rem(q + b, 3) == 0, do: 2, else: 4
m = next_m(h, b, q, k, true)
n = q + d
next_h = h |> Map.put(m, b) |> Map.delete(q)
primes({next_h, n, 6 - d, Map.get(next_h, n)}, acc, fun)
end

defp next_m(_, _, m, _, false), do: m
defp next_m(h, n, pre_m, k, true) do
m = pre_m + n * k
next_m(h, n, m, 6 - k, Map.has_key?(h, m))
end
end


ideone.com での実行例

glot.io での実行例(サイトを表示したら下方の[Run]ボタンをクリックしてください)。

ちなみに素数列挙は、過去に Ruby で実装した 汎用的でけっこう速い素数無限列挙5 を Elixir に翻訳しました。結構速いです6

あとここでは Enum.take/2Enum.at/2 の使用例しか載せていませんが、もちろん Stream モジュールの各関数も適用可能。


発展としてもう一例

さらに。

Stream モジュールに用意されている、ストリーム生成・変換・フィルタリング系の各種関数も、関数実装だけで実現できます。

まー車輪の再発明ほどナンセンスなものはないのであまり深入りはしませんが、1つだけ。drop 関数の実装例だけ挙げておきます7


drop.ex

defmodule Drop do

@doc ~S"""
Drop Function.
Similar to `Stream.drop`, but STRICTLY drops first `n` elements (n > 0).

## Examples

iex> Sequences.fibs |> Drop.drop(5) |> Enum.take(5)
[8, 13, 21, 34, 55]
"""
def drop(enum, count) do
fun = fn
(v, 0) -> {:suspend, v}
(_, n) -> {:cont, n - 1}
end
case Enumerable.reduce(enum, {:cont, count - 1}, fun) do
{:halted, _} -> []
{:done, _} -> []
{:suspended, _, next_fun} -> &do_drop(next_fun, &1, &2)
end
end

defp do_drop(_, {:halt, acc}, _fun), do: {:halted, acc}
defp do_drop(next, {:suspend, acc}, fun) do
{:suspended, acc, &do_drop(next, &1, fun)}
end
defp do_drop(next, {:cont, acc}, fun) do
case next.({:cont, 0}) do
{:halted, _} -> {:halted, acc}
{:done, _} -> {:done, acc}
{:suspended, v, next_fun} -> do_drop(next_fun, fun.(v, acc), fun)
end
end
end


ideone.com での実行例

glot.io での実行例(サイトを表示したら下方の[Run]ボタンをクリックしてください)。

ポイントは、適切な関数を用意して Enumerable.reduce を走らせて、メインの処理は先ほどと同様に別のプライベート関数に委譲する、という感じ。

手元では、このほかに map とか filter とか言った Stream モジュールにある関数とか、span, split_at のような Haskell の Data.List モジュールにあるような関数(と見た目の結果が同じになるようなもの8)も試験実装したコードが転がってます。


まとめ

コツをつかめば、Stream に依存せずにストリーム(無限リスト)やそれを処理する関数を自作するのは、そんなに難しくない。

ただ、考えなきゃならないことが多い(毎回同じような処理を記述しなければならないとか)ので、やっぱり Stream モジュールを有効活用した方がきっと幸せになれると思います。

…ところでいつも思うのですが、果たしてこれ、需要あるのだろうか (^-^;

ま、気にしたら負け。アウトプットしないと先に進めない9ので残しておきます。





  1. もちろん、Stream モジュールは、ストリームの生成や変形・フィルタリングを効率よくまたは簡潔に記述できるよう、よく考えられてまとめられたものだと思います素晴らしい! 



  2. 実際には count/1, member?/2 も実装しなければならないのですが、取り敢えず {:error, __MODULE__} を返すようにしてしまえば取り敢えず OK。もちろん「要素数」とか「要素が存在するかどうか」がちゃんと分かっているときは適切に実装すべきでしょうけれど。 



  3. そもそも、Stream の仕組みを見るためにiexで、取り敢えず s0=Stream.iterate(1,&(&1+1)) とかしてみたときに、Stream 構造体ではなく Function が返ってきて「何コレ?」と思ったのが、この記事をまとめようと思ったきっかけ。 



  4. 例によって個人的な趣味に走ってるような気がしますが気にしないでください。 



  5. オリジナル: http://log.ttsky.net/article/19515687.html 



  6. Map を利用しており、OTPの maps モジュールにほぼ依存しているのですが、OTP17 だとかなり遅いようです。手元の環境を Erlang/OTP18 にアップグレードしたら爆速になりました。【追記】glot.io はちょっともっさりすると思ったら調べたら OTP17 でした。 



  7. なぜ drop を選んだのかというと。Stream.drop/2 が完全な遅延処理をするようになっており、毎回「指定した n 個を無視してからそれ以降を列挙」する(n が充分に大きいと毎回少し処理を待たされる)実装になっているので。「宣言時に最初の n 個を実際に除去し、列挙時には (n+1) 番目以降しか扱わない」バージョンが欲しいな、と思ったので(そのように実装してみました)。 



  8. Haskell は遅延評価(など)によって、リストの各要素も「本当に必要になるときまで算出されない」仕組みになっている(しかも一度算出されれば再計算されることはない)のですが、そこまでエミュレートするのはちょっと無理。なので、一部正格評価(先行評価)で要素を算出してから残りは drop のようにその都度算出するストリームとして準備、という方法です。 



  9. Elixir の(言語仕様レベルの)勉強はもうこの辺にしておく予定。あとしばらくは、ちょっと本職に近い方の勉強に戻る予定。勉強会の発表資料も作らなきゃだし。