LoginSignup
70
58

More than 5 years have passed since last update.

[翻訳] Elixirのストリーム

Last updated at Posted at 2015-07-14

Drew Olsonさんの2015年6月8日付のブログ記事Elixir Streamsの翻訳です。
Elixir言語のマニュアルにはストリームについて、"Any enumerable that generates items one by one during enumeration is called a stream."「列挙できるもの~うまい訳がないのでEnumerableってみんなそのまま言ってますね。「列挙値」と訳してるものもあるけど~のうち列挙している最中に項目を一つ一つ生成できるものはストリームと呼ばれる」とあります。
他の言語だと「遅延評価できる無限リスト」とでも言えばいいのかな。たいてい「普通に書くとこうですが遅延評価を使うとこういうこともできます」と教科書的な例が出てるだけで終わっちゃうんですがこの記事には面白い利用例があったので紹介します。
私見ですがErlangのiex、戻り値のデータの中身がチラ見えするのでこういう遅延評価機構が何をしようとしているのか考えるのに適していますね(他の言語だと型やIDだけ表示するものが多いので)。
しかしパイプ演算子よくできてるな…


私は前にElixirの明快さについて書きました1。私は、あるプログラミング言語がどのぐらい明快さを持っているかを見るポイントとしてコレクションに対して「先行操作(eager operation)」と「遅延操作(lazy operation)」を行うのにどのぐらい違いがあるか、を好んで使います。ElixirEnumモジュールを使うと先行操作をすることになります。コレクションは直ちに変換/マップ/列挙されます。一方Streamモジュールを使うと遅延操作をすることになるわけです。
ストリーム(Stream)モジュールはEnumと同じく多数の操作を提供しています。しかしEnumを使うとそれらの操作は即時実行されるのではなくこれから計算されることを記述することになります。うまい具合にストリームは全てEnumerableプロトコルも実装しているのでEnumに含まれる全ての関数を使うことができます。

enumの中にあるどんな関数を使っても(先行的に計算を実行し結果を出すことで)ストリームを理解できます。いくつかの実例を見てみましょう。実例は概念を説明する助けになるように、iexのセッションとElixirのフスクリプトァイルの内容が混じったものになります。

実例をいくつか

Stream.map関数を使った極めてシンプルな例。

iex(1)> [1, 2, 3] |> Stream.map(&(&1 + 1))  
#Stream<[enum: [1, 2, 3], funs: [#Function<45.29647706/1 in Stream.map/2>]]>

iexセッションだと戻り値の意味がわかりやすいので使っています。見ての通り、Stream構造体が戻ってきていますね。細かいところは今は重要ではなく「実際にデータに対して何か計算が即時に行われるのではなく将来適用される関数をこの構造体が保持している」という点だけ見ておいてください。

Enum.to_listを使うと計算を実行させられます2

iex(2)> [1, 2, 3] \
|> Stream.map(&(&1 + 1))\
|> Enum.to_list
[2, 3, 4]

他の例も見てみましょう。そうすれば古典的なコレクションに対してストリームが提供する優れた点がもっとよくわかるはずです。今度はレンジ(Range)を操作してみましょう。Elixirではレンジはストリームとして実装されています。つまり非常に大きなレンジをすぐに生成しないままで扱えるということです。

iex(1)> 1..1000000  \
|> Stream.map(&("This is number #{&1}")) \
|> Enum.take(2)
["This is number 1", "This is number 2"]

これから理解すべき大事な点は1,000,000個ものアイテムからなるレンジの変換を表現するのにStream.mapを使っていますが、たったの2回の計算しか実行していないということです。Enum.take関数は2つのアイテムしか要求していません。つまりストリームはこのレンジの最初の2つのアイテムにしか計算をしていないことになります。

これは非常に大きな(あるいは無限の大きさの)データのセットに対する計算の表現を取り扱いやすい方法で行うのに非常に有益です。

ストリームは合成可能です。つまり任意の段数の計算を効率的に大きなコレクションに対して行うことができます。

iex(1)> require Integer
nil
iex(2)> 1..1000000 \
|> Stream.filter(&Integer.is_even/1) \
|> Stream.map(&("This is number #{&1}")) \
|> Enum.take(2)
["This is number 2", "This is number 4"]

標準streamライブラリ

Elixirの標準ライブラリは最初からいくつかのストリームを提供しています。既にRangeは見ましたね。他のよく使われるストリームについても検討していきましょう。

File.stream!関数はファイルの中の(デフォルトでは)行またはバイトをストリームにして提供します。これはファイルからデータを楽に取り出していける方法です。

"./my_file.txt" \
|> File.stream! \
|> Stream.map(&String.strip/1) \
|> Stream.with_index \
|> Stream.map(fn {line, i} -> "#{i}: #{line}" end) \
|> Enum.take(1) \
|> IO.inspect

# => ["0: first line"]

このプログラムはファイル内の各行の末尾から改行を取り除き、各行の先頭にインデックスを付け最初の行だけ取り出して表示します。
最初の1行しか要求していないのでファイルのサイズは極めて大きくてもよく、その場合でもこのプログラムは効率よく動作します。

他によく使われてストリームを返す関数としてはIO.streamがあります。ご期待通りこれは例えば標準入力などからのストリームのインターフェースを提供します。先ほどのプログラムをファイルではなく標準入力で動作するようにしてみましょう。

IO.stream(:stdio, :line) \
|> Stream.map(&String.strip/1) \
|> Stream.with_index \
|> Stream.map(fn {line, i} -> "#{i}: #{line}" end) \
|> Enum.take(1) \
|> IO.inspect

# 実際にはスクリプトファイルに落としてelixirから実行しないと動かない。こんな感じで。
# $ elixir foo.ex < my_file.txt
# ["0: first line"]

最後の例ですがGenEvent.stream関数はGenEventマネージャから来るイベントのストリームを生成します。GenEventの詳細についてはこの投稿の範囲を超えるのでこの関数については名前を挙げるだけにします。もし興味のある方はElixirのドキュメントにもっと詳しく書いてありますのでそちらを参照してください。

ストリームを作る

これまでElixirの標準ライブラリにあるストリームをいくつか見てきました。Elixirはそれ以外にStreamモジュール内に、独自にストリームを作るためのコンストラクタ関数をいくつか提供しています。簡単なストリームのコンストラクタから始めて段階的に複雑なものに進みましょう。

Stream.repeatedlyはシンプルなストリームコンストラクタで与えられた関数を、ストリームに対して要素をくれと要求されるたびに呼び出すことで無限長のストリームを生成します。全て1を返す無限長ストリームを書いてみましょう。

iex(1)> Stream.repeatedly(fn -> 1 end) |> Enum.take(5)  
[1, 1, 1, 1, 1]

次はStream.iterateです。この関数は初期値と「ジェネレータ」関数を引数に取ります。このジェネレータ関数は(初期値から始まる)ストリーム内の一つ前のアイテムを引数にして、次のアイテムを返すこと、となっています。「全ての正の整数」のストリームを書いてみましょう。

iex(1)> Stream.iterate(1, &(&1 + 1)) |> Enum.take(5)  
[1, 2, 3, 4, 5]

では、Stream.unfoldについて述べましょう。この関数は今までのものより複雑ですがより柔軟性の高いストリームを作ることができます。Unfoldは2つの引数を取ります。「アキュムレータ」のための初期値とジェネレータ関数です。このジェネレータ関数はアキュムレータの値を引数として呼び出され、戻り値として{next_element, new_accumulator}を返すこと、となっています。next_elementはストリームの次のアイテムを示し、new_accumulatorは続くジェネレータ関数の呼び出し時に渡されます。Stream.unfoldを使ってフィボナッチ数列の無限長ストリームを書いてみましょう。

fibs = Stream.unfold({1, 1}, fn {a, b} ->  
  {a, {b, a + b}}
end)

fibs \
|> Enum.take(10) \
|> IO.inspect

# => [1, 1, 2, 3, 5, 8, 13, 21, 34, 55]

この例のアキュムレータは数列の中の2つの数字を表している点に注意してください。フィボナッチ数列は次の要素を得るのにこれら2つの数字の両方が必要だからです。

最後に紹介するコンストラクタ関数はStream.resourceです。この関数は外部リソース関連のストリームを作るのに適しています。「スタート関数」「次を呼び出す関数」そして「後始末用の関数」を与えて呼び出されます。これは実例を見ると理解しやすいでしょう。次でGithubのAPIのラッパーに使ってみましょう。

ストリームでAPIを作る

GithubのAPIで提供される多くのリソースはページ単位形式の結果を返します。我々のライブラリを使う人のためにページ単位化は簡単にしたいし効率よくもしたい。見えない状態でしかし与えられたページに結果が必要な場合だけ実行される。まずGithub APIとの通信を受け持つGitHubゲートウェイになるモジュールを作りましょう。HTTP通信にはHTTPoisonを使います。

defmodule Github.Gateway do  
  use HTTPoison.Base

  @endpoint "https://github.com/api/v3"

  def endpoint do
    @endpoint
  end

  defp process_url(url) do
    @endpoint <> url
  end

  defp process_request_headers(headers) do
    headers ++ [
      {"Authorization", "Basic #{:base64.encode(credentials)}"}
    ]
  end

  defp credentials do
    "#{config[:access_token]}:x-oauth-basic"
  end

  defp config do
    Application.get_env(:house_keeper, __MODULE__)
  end
end  

このゲートウェイモジュールは個人のアクセストークンは設定済みという前提で書かれています。

GithubからのレスポンスはこのAPIに関連する2つの属性を含んでいます:Linkヘッダーとリクエストボディです。Linkヘッダーは(もしあれば)結果の次のページについての情報を含みます。リクエストボディはJSONでシリアライズされた結果の集まりです。では与えられたURLに対するAPIのレスポンスのStreamを生成するモジュールを作りましょう。

defmodule Github.ResultStream do  
  alias Github.Gateway

  def new(url) do
    Stream.resource(
      fn -> fetch_page(url) end,
      &process_page/1,
      fn _ -> end
    )
  end

  defp fetch_page(url) do
    response = Gateway.get!(url)
    items = Poison.decode!(response.body)
    links = parse_links(response.headers["Link"])

    {items, links["next"]}
  end

  def parse_links(nil) do
    %{}
  end

  def parse_links(links_string) do
    links = String.split(links_string, ", ")

    Enum.map(links, fn link ->
      [_,name] = Regex.run(~r{rel="([a-z]+)"}, link)
      [_,url] = Regex.run(~r{<([^>]+)>}, link)
      short_url = String.replace(url, Gateway.endpoint, "")

      {name, short_url}
    end) |> Enum.into(%{})
  end

  defp process_page({nil, nil}) do
    {:halt, nil}
  end

  defp process_page({nil, next_page_url}) do
    next_page_url
    |> fetch_page
    |> process_page
  end

  defp process_page({items, next_page_url}) do
    {items, {nil, next_page_url}}
  end
end  

コード量が多いので部分部分について見ていきます。

  def new(url) do
    Stream.resource(
      fn -> fetch_page(url) end,
      &process_page/1,
      fn _ -> end
    )
  end

new関数はStream.resource関数によって作られたStreamを返します。「スタート関数」として最初のページを取得する関数を、「次を呼び出す関数」として、もし必要ならば新しいページを取得する関数を、そして空の「後始末用の関数」を与えています。「後始末用の関数」が空なのは特にクリーンアップの必要がないからです。

次はfetch_pageです。

  defp fetch_page(url) do
    response = Gateway.get!(url)
    items = Poison.decode!(response.body)
    links = parse_links(response.headers["Link"])

    {items, links["next"]}
  end

fetch_pageは「スタート関数」です。与えられたURLを取得するのに先に示したHTTPoisonベースのゲートウェイを使います。JSONのボディをPoisonでパースしてlink name => urlのマップの中にLinkヘッダーのパース結果を入れていきます。ここでリンクのパースについては特に重要ではありません。

最後に「次を呼び出す関数」であるprocess_pageを見ましょう。

  defp process_page({nil, nil}) do
    {:halt, nil}
  end

  defp process_page({nil, next_page_url}) do
    next_page_url
    |> fetch_page
    |> process_page
  end

  defp process_page({items, next_page_url}) do
    {items, {nil, next_page_url}}
  end

process_page関数は3つの節(clause)を持っています。どの節が実行されるかは関数に渡されるタプルによって決まります。これはパターンマッチングとして知られています。各節は最初の要素がストリームに追加されるべきアイテムのリストを含み、2番めの要素が次のprocess_pageを呼び出すためのアキュムレータの値からなるタプルを返す必要があります。各節をそれぞれ見ていきましょう。

  defp process_page({nil, nil}) do
    {:halt, nil}
  end

最初の節ではもうアイテムも次のリンクもない場合を扱います。これはつまり最後のページに到達して全てのアイテムがストリームに追加されている、ということを意味します。ページ化を終了してstream.resourceにストリームが終了したことを知らせる特殊な値:haltを返します。

  defp process_page({nil, next_page_url}) do
    next_page_url
    |> fetch_page
    |> process_page
  end

2番めの節はページに含まれるアイテムを全部取り出し終わってたどるべき次のページヘのリンクがnilでない場合に起動します。このケースでは次のページを受け取って処理します。

  defp process_page({items, next_page_url}) do
    {items, {nil, next_page_url}}
  end

3番目、つまり最後の節は「通常処理」のケースです。まだストリームに追加すべきアイテムが残っているケースに呼び出されます。タプルの最初の要素としてアイテムを返し、それをストリームに追加します。それからタプル3{nil, next_page_url}をアキュムレータとして返します。それの意味するところは「まだアイテムが必要で受け取るべき次のURLがある」です。

さあ、これでGithub APIのレスポンスをページ化するのに必要なコードを全部見終わりました。この例は今まで他で見てきた例よりも少し複雑かもしれません。しかしGithub APIからの任意のレスポンスを非可視で遅延評価のページ化する機能を提供するコードとしてはとても小さいのです。

最後に、Github.ResultStreamモジュールを使ってユーザーがあるオーガナイゼーションのレポジトリを取得するためにナイスなAPIを公開しましょう。

defmodule Github do  
  alias Github.ResultStream

  def repos(organization) do
    ResultStream.new("/orgs/#{organization}/repos")
  end
end  

この新しいAPIを使って、elixir-langオーガナイゼーションの全てのレポジトリ名を取得することができます。

"elixir-lang"
|> Github.repos
|> Stream.map(fn repo -> repo["full_name"] end)
|> Enum.take(1)

# => ["elixir-lang/elixir"]

このコードは遅延実行されるので結果のうちの最初のページだけ取り出していることをお忘れなく。それで最初のレポジトリ名を得るために結果のうちの最初のページだけ使っています。

このEnumerable的なAPIは結果についてユーザーが熟知したやり方を行えるようにできています。例えばelixir-langtryghostの両方のオーガナイゼーションの全てのレポジトリの名前はStream.flat_mapを使って得ることができます。

["elixir-lang", "tryghost"]
|> Stream.flat_map(&Github.repos/1)
|> Stream.map(fn repo -> repo["full_name"] end)
|> Enum.take(1)

# => ["elixir-lang/elixir"]

繰り返しになりますが、これは遅延実行で効率のよいやり方なので必要なだけ結果をページ化します。すごいですね。

まとめ

この記事でElixirStreamの能力がもたらすパワーと柔軟さがお分かりいただければよいなと思います。あなたのライブラリに活用してみましょう!


翻訳時のおまけ - Clojureと比較する

Clojureでfibonacci数列を遅延評価シーケンスで書いた例:
上の例もそうですが古典的なフィボナッチ数列は1から始まり、定義は

$F_n = F_{n-1} + F_{n-2}$
$F_0 = 1, F_1 = 1$

です。Clojureで書くとこんなかんじですね。

user=> (def fibs (lazy-cat [1 1] (map + (rest fibs) fibs)))
#'user/fibs
user=> (take 10 fibs)
(1 1 2 3 5 8 13 21 34 55)

  1. 気が向いたら訳すかも 

  2. オリジナル記事では見やすくするために改行を入れているのでそのままコピペするとエラーになります。この訳では行末に \ を入れて継続行にしました。 

  3. 戻り値のタプルの2番めの要素がアキュムレータでした。 

70
58
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
70
58