Elixir はプロセス指向なだけあって、カジュアルに並列処理を使えます。
今回はこの並列処理を活用して、複数の勉強会サイトの WebAPI に対して一気にリクエストを投げる、いわゆるメタサーチ的な API を作ってみたいと思います。
Phoenix アプリケーションでの外部 API リクエストについてはコチラを参照してください。
事前準備: 各勉強会サイトの API 理解する
ATND, Connpass, Zusaar, Doorkeeper のいずれも、アクセスキー無し(!)で利用できる WebAPI が用意されています。
大変ありがたいことに、これらの API については sharow さんが以下の記事にとても良くまとめてくださっていますので、こちらを参考にしたいと思います。
勉強会サイトのAPI比較 http://qiita.com/sharow/items/508bc876ffb564d2d1ec
Phoenix アプリケーションをセットアップする
metasearch_sample
というアプリケーションを作り、mix.exs に以下のように追記しましょう。
$ mix phoenix.new metasearch_sample
defmodule MetasearchSample.Mixfile do
...
defp deps do
[{:phoenix, "~> 0.17"},
{:phoenix_ecto, "~> 1.1"},
{:postgrex, ">= 0.0.0"},
{:phoenix_html, "~> 2.1"},
{:phoenix_live_reload, "~> 1.0", only: :dev},
{:cowboy, "~> 1.0"},
# HTTPoison を追記
{:httpoison, "~> 0.7.2"}]
end
end
忘れずにライブラリのダウンロードを行います。
$ mix deps.get
ルートを追加する
せっかくなので、「並列処理をするもの」と「直列処理をするもの」の両方を作ってみましょう。
/api/parallel/study?q=xxx
, /api/serial/study?q=xxx
という形式でリクエストするようにします。
GET のリクエストパラメータはルーティングに記載する必要はないので、ひとまずはパスのみの設定となります。
defmodule MetasearchSample.Router do
...
scope "/api", MetasearchSample do
pipe_through :api
get "/parallel/study", PageController, :parallel_search
get "/serial/study", PageController, :serial_search
end
...
end
直列処理のほうから実装する
さっそくコントローラの処理を実装していきましょう。
まずは直列処理から。
流れとしては、「各 API にリクエスト」 -> 「レスポンスをリストに蓄積」 -> 「各レスポンスをマージしたものを返却」となります。
defmodule MetasearchSample.PageController do
...
def create_requests(q) do
[
{:atnd, "http://api.atnd.org/events/?keyword=#{q}&format=json"},
{:connpass, "http://connpass.com/api/v1/event/?keyword=#{q}"},
{:doorkeeper, "http://api.doorkeeper.jp/events/?q=#{q}"},
{:zusaar, "http://www.zusaar.com/api/event/?keyword=#{q}"}
]
end
def get_response(request) do
{site, url} = request
HTTPoison.start
result = HTTPoison.get! url
case result do
%{status_code: 200, body: body} -> {site, Poison.decode!(body)}
%{status_code: code} -> {site, nil}
end
end
def parse_response(response) do
case response do
{:atnd, json} -> Map.get(json, "events") |> Enum.map(&Map.get(&1, "event"))
{:connpass, json} -> Map.get(json, "events")
{:doorkeeper, json} -> json |> Enum.map(&(Map.get(&1, "event")))
{:zusaar, json} -> Map.get(json, "event")
end
end
def serial_search(conn, %{"q" => q}) do
results = create_requests(q)
|> Enum.map(&get_response/1)
|> Enum.map(&parse_response/1)
|> List.flatten
json conn, results
end
...
end
結構頑張って Elixir っぽく書いてみました。
入り口は serial_search/2
で、ここでリクエストパラメータ q
を受け取っています。
この関数自体は怒涛のパイプライン演算子で終わるわけですが、あまり身構えず、1つずつみていきましょう。
まず、受け取った q
を create_requests/1
に渡します。この関数では受け取ったキーワード文字列をもとに各 API へのリクエスト URL を生成します。
ポイントは、リクエスト URL とともに API を識別するためのアトムを添えていることです。この段階でアトムをつけることで、後で case 構文を使うときに役立ちます。
ちなみに関数名の前に & をつける構文は関数のキャプチャというものです。
次に、Enum.map
を使って create_requests/1
から返ってきたリストの各項目を1つずつ get_response/1
に渡します。get_response/1
では受け取ったリクエスト情報をもとに HTTP リクエストを行い、返却された JSON をこれまたアトムを添えて返却します。
この時点( Enum.map(&get_response/1)
)の戻り値は以下のようなイメージです。
[
{:atnd, {...ATND API のレスポンス...} },
{:connpass, {...Connpas API のレスポンス...} },
{:doorkeeper, {...Doorkeeper API のレスポンス...} },
{:zusaar, {...Zusaar API のレスポンス...} },
]
次に、同じく Enum.map
を使って上記の各項目を1つずつ parse_response/1
に渡します。parse_response/1
では受け取った API のレスポンスを、API に応じて解析します。各 API によって返却が微妙に異なるので、その差分を吸収する必要があるわけです。(といっても event 情報の抽出しかしていないのですが)
この API 種別の分岐をするにあたり、最初につけたアトムが活用されるわけですね。
最後に、仕上がってきたレスポンスの塊を List.flatten
で乱暴にフラット化します。
List.flatten
の変換は、以下のようなイメージになります。
// before
[
// API ごとの event リストがそれぞれ存在している状態
[ { ... }, ... ], [ { ... }, ... ], [ ... ], [ ... ]
]
// after
[
// 各 API の event が同じ階層にある状態
{ ... }, { ... }, ...
}
以上で直列部分の実装は完了です。
並列部分のほうを実装する
なんだか前置きが長くなってしまいましたが、ここからが本題です。
Elixir で並列処理を書く場合は Task というモジュールを使います。
早速これを使って実装してみましょう。先ほどの実装に parallel_search/2
を足します。
defmodule MetasearchSample.PageController do
...
def parallel_search(conn, %{"q" => q}) do
results = create_requests(q)
|> Enum.map(&Task.async(fn -> get_response(&1) end))
|> Enum.map(&Task.await(&1, 10_000))
|> Enum.map(&parse_response/1)
|> List.flatten
json conn, results
end
end
なんと、直列のほうとあまり変わりませんね!
最初の Enum.map
の中に Task.async
を入れたのと、その後に Task.await
を入れただけです。
Task.async
はその名の通り、リストの各項目に対する処理を並列化してくれるものです。カジュアルですね。
Task.await
は並列で進んでいる各処理が全て終わるまで待ちます。第2引数は最大待ち時間(タイムアウト)で、今回は 10秒 に設定してあります。
この例からわかるように、Enum.map
によるリスト処理を前提で書いていくと、並列化がしやすいわけですね。
なお、今回は最大4並列なので気にする必要はないのですが、並列数が多くマシン負荷が高くなるような場合は、ある程度並列数を絞るというような考慮を入れたほうが良いそうです。
以上で並列部分の実装は完了です。
API を叩いてみる
早速作成した API を叩いてみましょう。
せっかくなのでいっぱい HIT するようなワードで検索したいところです。
うーん、html とかでしょうか。
$ curl -kL "http://localhost:4000/api/serial/study?q=html" -o /dev/null -w "%{time_total}\n" 2> /dev/null
1.690
$ curl -kL "http://localhost:4000/api/parallel/study?q=html" -o /dev/null -w "%{time_total}\n" 2> /dev/null
0.786
お、並列と直列で露骨に差が出ていますね!
ちなみに各勉強会サイト API のレスポンスはこんな感じです。
(上記リクエストとは別に叩き直したものです)
$ curl -kL "http://api.atnd.org/events/?keyword=html&format=json" -o /dev/null -w "%{time_total}\n" 2> /dev/null
0.183
$ curl -kL "http://connpass.com/api/v1/event/?keyword=html" -o /dev/null -w "%{time_total}\n" 2> /dev/null
0.190
$ curl -kL "http://api.doorkeeper.jp/events/?q=html" -o /dev/null -w "%{time_total}\n" 2> /dev/null
0.961
$ curl -kL "http://www.zusaar.com/api/event/?keyword=html" -o /dev/null -w "%{time_total}\n" 2> /dev/null
0.242
ある程度誤差はありますが、
- 直列 -> 各 API のレスポンス速度を足したもの
- 並列 -> 一番 API のレスポンス速度
という感じになっていると思います。
感想
- パイプライン演算子は直感的でわかりやすい
- 関数型っぽく書くと、本当に変数が出てこない
- 緯度経度によるメタサーチを作っても面白いかも
- 本当は、リクエスト結果をパースして struct に詰めたかった
- 思いの外長くなってしまったので...割愛
- Elixir の勉強会、全然ないですね