Drew Olsonさんの2015年6月8日付のブログ記事Elixir Streamsの翻訳です。
Elixir言語のマニュアルにはストリームについて、"Any enumerable that generates items one by one during enumeration is called a stream."「列挙できるもの~うまい訳がないのでEnumerableってみんなそのまま言ってますね。「列挙値」と訳してるものもあるけど~のうち列挙している最中に項目を一つ一つ生成できるものはストリームと呼ばれる」とあります。
他の言語だと「遅延評価できる無限リスト」とでも言えばいいのかな。たいてい「普通に書くとこうですが遅延評価を使うとこういうこともできます」と教科書的な例が出てるだけで終わっちゃうんですがこの記事には面白い利用例があったので紹介します。
私見ですがErlangのiex、戻り値のデータの中身がチラ見えするのでこういう遅延評価機構が何をしようとしているのか考えるのに適していますね(他の言語だと型やIDだけ表示するものが多いので)。
しかしパイプ演算子よくできてるな…
私は前にElixirの明快さについて書きました1。私は、あるプログラミング言語がどのぐらい明快さを持っているかを見るポイントとしてコレクションに対して「先行操作(eager operation)」と「遅延操作(lazy operation)」を行うのにどのぐらい違いがあるか、を好んで使います。ElixirでEnum
モジュールを使うと先行操作をすることになります。コレクションは直ちに変換/マップ/列挙されます。一方Stream
モジュールを使うと遅延操作をすることになるわけです。
ストリーム(Stream)
モジュールはEnum
と同じく多数の操作を提供しています。しかしEnum
を使うとそれらの操作は即時実行されるのではなくこれから計算されることを記述することになります。うまい具合にストリームは全てEnumerable
プロトコルも実装しているのでEnum
に含まれる全ての関数を使うことができます。
enum
の中にあるどんな関数を使っても(先行的に計算を実行し結果を出すことで)ストリームを理解できます。いくつかの実例を見てみましょう。実例は概念を説明する助けになるように、iex
のセッションとElixirのフスクリプトァイルの内容が混じったものになります。
実例をいくつか
Stream.map
関数を使った極めてシンプルな例。
iex(1)> [1, 2, 3] |> Stream.map(&(&1 + 1))
#Stream<[enum: [1, 2, 3], funs: [#Function<45.29647706/1 in Stream.map/2>]]>
iex
セッションだと戻り値の意味がわかりやすいので使っています。見ての通り、Stream
構造体が戻ってきていますね。細かいところは今は重要ではなく「実際にデータに対して何か計算が即時に行われるのではなく将来適用される関数をこの構造体が保持している」という点だけ見ておいてください。
Enum.to_list
を使うと計算を実行させられます2。
iex(2)> [1, 2, 3] \
|> Stream.map(&(&1 + 1))\
|> Enum.to_list
[2, 3, 4]
他の例も見てみましょう。そうすれば古典的なコレクションに対してストリームが提供する優れた点がもっとよくわかるはずです。今度はレンジ(Range)
を操作してみましょう。Elixirではレンジはストリームとして実装されています。つまり非常に大きなレンジをすぐに生成しないままで扱えるということです。
iex(1)> 1..1000000 \
|> Stream.map(&("This is number #{&1}")) \
|> Enum.take(2)
["This is number 1", "This is number 2"]
これから理解すべき大事な点は1,000,000個ものアイテムからなるレンジの変換を表現するのにStream.map
を使っていますが、たったの2回の計算しか実行していないということです。Enum.take
関数は2つのアイテムしか要求していません。つまりストリームはこのレンジの最初の2つのアイテムにしか計算をしていないことになります。
これは非常に大きな(あるいは無限の大きさの)データのセットに対する計算の表現を取り扱いやすい方法で行うのに非常に有益です。
ストリームは合成可能です。つまり任意の段数の計算を効率的に大きなコレクションに対して行うことができます。
iex(1)> require Integer
nil
iex(2)> 1..1000000 \
|> Stream.filter(&Integer.is_even/1) \
|> Stream.map(&("This is number #{&1}")) \
|> Enum.take(2)
["This is number 2", "This is number 4"]
標準streamライブラリ
Elixirの標準ライブラリは最初からいくつかのストリームを提供しています。既にRange
は見ましたね。他のよく使われるストリームについても検討していきましょう。
File.stream!
関数はファイルの中の(デフォルトでは)行またはバイトをストリームにして提供します。これはファイルからデータを楽に取り出していける方法です。
"./my_file.txt" \
|> File.stream! \
|> Stream.map(&String.strip/1) \
|> Stream.with_index \
|> Stream.map(fn {line, i} -> "#{i}: #{line}" end) \
|> Enum.take(1) \
|> IO.inspect
# => ["0: first line"]
このプログラムはファイル内の各行の末尾から改行を取り除き、各行の先頭にインデックスを付け最初の行だけ取り出して表示します。
最初の1行しか要求していないのでファイルのサイズは極めて大きくてもよく、その場合でもこのプログラムは効率よく動作します。
他によく使われてストリームを返す関数としてはIO.stream
があります。ご期待通りこれは例えば標準入力などからのストリームのインターフェースを提供します。先ほどのプログラムをファイルではなく標準入力で動作するようにしてみましょう。
IO.stream(:stdio, :line) \
|> Stream.map(&String.strip/1) \
|> Stream.with_index \
|> Stream.map(fn {line, i} -> "#{i}: #{line}" end) \
|> Enum.take(1) \
|> IO.inspect
# 実際にはスクリプトファイルに落としてelixirから実行しないと動かない。こんな感じで。
# $ elixir foo.ex < my_file.txt
# ["0: first line"]
最後の例ですがGenEvent.stream
関数はGenEvent
マネージャから来るイベントのストリームを生成します。GenEvent
の詳細についてはこの投稿の範囲を超えるのでこの関数については名前を挙げるだけにします。もし興味のある方はElixirのドキュメントにもっと詳しく書いてありますのでそちらを参照してください。
ストリームを作る
これまでElixirの標準ライブラリにあるストリームをいくつか見てきました。Elixirはそれ以外にStream
モジュール内に、独自にストリームを作るためのコンストラクタ関数をいくつか提供しています。簡単なストリームのコンストラクタから始めて段階的に複雑なものに進みましょう。
Stream.repeatedly
はシンプルなストリームコンストラクタで与えられた関数を、ストリームに対して要素をくれと要求されるたびに呼び出すことで無限長のストリームを生成します。全て1
を返す無限長ストリームを書いてみましょう。
iex(1)> Stream.repeatedly(fn -> 1 end) |> Enum.take(5)
[1, 1, 1, 1, 1]
次はStream.iterate
です。この関数は初期値と「ジェネレータ」関数を引数に取ります。このジェネレータ関数は(初期値から始まる)ストリーム内の一つ前のアイテムを引数にして、次のアイテムを返すこと、となっています。「全ての正の整数」のストリームを書いてみましょう。
iex(1)> Stream.iterate(1, &(&1 + 1)) |> Enum.take(5)
[1, 2, 3, 4, 5]
では、Stream.unfold
について述べましょう。この関数は今までのものより複雑ですがより柔軟性の高いストリームを作ることができます。Unfoldは2つの引数を取ります。「アキュムレータ」のための初期値とジェネレータ関数です。このジェネレータ関数はアキュムレータの値を引数として呼び出され、戻り値として{next_element, new_accumulator}
を返すこと、となっています。next_element
はストリームの次のアイテムを示し、new_accumulator
は続くジェネレータ関数の呼び出し時に渡されます。Stream.unfold
を使ってフィボナッチ数列の無限長ストリームを書いてみましょう。
fibs = Stream.unfold({1, 1}, fn {a, b} ->
{a, {b, a + b}}
end)
fibs \
|> Enum.take(10) \
|> IO.inspect
# => [1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
この例のアキュムレータは数列の中の2つの数字を表している点に注意してください。フィボナッチ数列は次の要素を得るのにこれら2つの数字の両方が必要だからです。
最後に紹介するコンストラクタ関数はStream.resource
です。この関数は外部リソース関連のストリームを作るのに適しています。「スタート関数」「次を呼び出す関数」そして「後始末用の関数」を与えて呼び出されます。これは実例を見ると理解しやすいでしょう。次でGithubのAPIのラッパーに使ってみましょう。
ストリームでAPIを作る
GithubのAPIで提供される多くのリソースはページ単位形式の結果を返します。我々のライブラリを使う人のためにページ単位化は簡単にしたいし効率よくもしたい。見えない状態でしかし与えられたページに結果が必要な場合だけ実行される。まずGithub APIとの通信を受け持つGitHubゲートウェイになるモジュールを作りましょう。HTTP通信にはHTTPoisonを使います。
defmodule Github.Gateway do
use HTTPoison.Base
@endpoint "https://github.com/api/v3"
def endpoint do
@endpoint
end
defp process_url(url) do
@endpoint <> url
end
defp process_request_headers(headers) do
headers ++ [
{"Authorization", "Basic #{:base64.encode(credentials)}"}
]
end
defp credentials do
"#{config[:access_token]}:x-oauth-basic"
end
defp config do
Application.get_env(:house_keeper, __MODULE__)
end
end
このゲートウェイモジュールは個人のアクセストークンは設定済みという前提で書かれています。
GithubからのレスポンスはこのAPIに関連する2つの属性を含んでいます:Link
ヘッダーとリクエストボディです。Link
ヘッダーは(もしあれば)結果の次のページについての情報を含みます。リクエストボディはJSONでシリアライズされた結果の集まりです。では与えられたURLに対するAPIのレスポンスのStream
を生成するモジュールを作りましょう。
defmodule Github.ResultStream do
alias Github.Gateway
def new(url) do
Stream.resource(
fn -> fetch_page(url) end,
&process_page/1,
fn _ -> end
)
end
defp fetch_page(url) do
response = Gateway.get!(url)
items = Poison.decode!(response.body)
links = parse_links(response.headers["Link"])
{items, links["next"]}
end
def parse_links(nil) do
%{}
end
def parse_links(links_string) do
links = String.split(links_string, ", ")
Enum.map(links, fn link ->
[_,name] = Regex.run(~r{rel="([a-z]+)"}, link)
[_,url] = Regex.run(~r{<([^>]+)>}, link)
short_url = String.replace(url, Gateway.endpoint, "")
{name, short_url}
end) |> Enum.into(%{})
end
defp process_page({nil, nil}) do
{:halt, nil}
end
defp process_page({nil, next_page_url}) do
next_page_url
|> fetch_page
|> process_page
end
defp process_page({items, next_page_url}) do
{items, {nil, next_page_url}}
end
end
コード量が多いので部分部分について見ていきます。
def new(url) do
Stream.resource(
fn -> fetch_page(url) end,
&process_page/1,
fn _ -> end
)
end
new
関数はStream.resource
関数によって作られたStream
を返します。「スタート関数」として最初のページを取得する関数を、「次を呼び出す関数」として、もし必要ならば新しいページを取得する関数を、そして空の「後始末用の関数」を与えています。「後始末用の関数」が空なのは特にクリーンアップの必要がないからです。
次はfetch_page
です。
defp fetch_page(url) do
response = Gateway.get!(url)
items = Poison.decode!(response.body)
links = parse_links(response.headers["Link"])
{items, links["next"]}
end
fetch_page
は「スタート関数」です。与えられたURLを取得するのに先に示したHTTPoison
ベースのゲートウェイを使います。JSONのボディをPoison
でパースしてlink name => url
のマップの中にLink
ヘッダーのパース結果を入れていきます。ここでリンクのパースについては特に重要ではありません。
最後に「次を呼び出す関数」であるprocess_page
を見ましょう。
defp process_page({nil, nil}) do
{:halt, nil}
end
defp process_page({nil, next_page_url}) do
next_page_url
|> fetch_page
|> process_page
end
defp process_page({items, next_page_url}) do
{items, {nil, next_page_url}}
end
process_page
関数は3つの節(clause)を持っています。どの節が実行されるかは関数に渡されるタプルによって決まります。これはパターンマッチングとして知られています。各節は最初の要素がストリームに追加されるべきアイテムのリストを含み、2番めの要素が次のprocess_page
を呼び出すためのアキュムレータの値からなるタプルを返す必要があります。各節をそれぞれ見ていきましょう。
defp process_page({nil, nil}) do
{:halt, nil}
end
最初の節ではもうアイテムも次のリンクもない場合を扱います。これはつまり最後のページに到達して全てのアイテムがストリームに追加されている、ということを意味します。ページ化を終了してstream.resource
にストリームが終了したことを知らせる特殊な値:halt
を返します。
defp process_page({nil, next_page_url}) do
next_page_url
|> fetch_page
|> process_page
end
2番めの節はページに含まれるアイテムを全部取り出し終わってたどるべき次のページヘのリンクがnilでない場合に起動します。このケースでは次のページを受け取って処理します。
defp process_page({items, next_page_url}) do
{items, {nil, next_page_url}}
end
3番目、つまり最後の節は「通常処理」のケースです。まだストリームに追加すべきアイテムが残っているケースに呼び出されます。タプルの最初の要素としてアイテムを返し、それをストリームに追加します。それからタプル3{nil, next_page_url}
をアキュムレータとして返します。それの意味するところは「まだアイテムが必要で受け取るべき次のURLがある」です。
さあ、これでGithub APIのレスポンスをページ化するのに必要なコードを全部見終わりました。この例は今まで他で見てきた例よりも少し複雑かもしれません。しかしGithub APIからの任意のレスポンスを非可視で遅延評価のページ化する機能を提供するコードとしてはとても小さいのです。
最後に、Github.ResultStream
モジュールを使ってユーザーがあるオーガナイゼーションのレポジトリを取得するためにナイスなAPIを公開しましょう。
defmodule Github do
alias Github.ResultStream
def repos(organization) do
ResultStream.new("/orgs/#{organization}/repos")
end
end
この新しいAPIを使って、elixir-lang
オーガナイゼーションの全てのレポジトリ名を取得することができます。
"elixir-lang"
|> Github.repos
|> Stream.map(fn repo -> repo["full_name"] end)
|> Enum.take(1)
# => ["elixir-lang/elixir"]
このコードは遅延実行されるので結果のうちの最初のページだけ取り出していることをお忘れなく。それで最初のレポジトリ名を得るために結果のうちの最初のページだけ使っています。
このEnumerable的なAPIは結果についてユーザーが熟知したやり方を行えるようにできています。例えばelixir-lang
とtryghost
の両方のオーガナイゼーションの全てのレポジトリの名前はStream.flat_map
を使って得ることができます。
["elixir-lang", "tryghost"]
|> Stream.flat_map(&Github.repos/1)
|> Stream.map(fn repo -> repo["full_name"] end)
|> Enum.take(1)
# => ["elixir-lang/elixir"]
繰り返しになりますが、これは遅延実行で効率のよいやり方なので必要なだけ結果をページ化します。すごいですね。
まとめ
この記事でElixirのStream
の能力がもたらすパワーと柔軟さがお分かりいただければよいなと思います。あなたのライブラリに活用してみましょう!
翻訳時のおまけ - Clojureと比較する
Clojureでfibonacci数列を遅延評価シーケンスで書いた例:
上の例もそうですが古典的なフィボナッチ数列は1から始まり、定義は
$F_n = F_{n-1} + F_{n-2}$
$F_0 = 1, F_1 = 1$
です。Clojureで書くとこんなかんじですね。
user=> (def fibs (lazy-cat [1 1] (map + (rest fibs) fibs)))
#'user/fibs
user=> (take 10 fibs)
(1 1 2 3 5 8 13 21 34 55)