ElixirDay 4

たのしいpoolboy

More than 1 year has passed since last update.


はじめに

poolboy とはひとことで言えば、Erlang製のワーカープロセス管理システムです。

あるワーカーをforkしまくりたい!かつそれをとっておきたい!しかもそれらを適切にローテーションして使いたい!みたいなケースにどストライクなやつです。

非常にわかりやすい例のひとつを挙げると、Ectoではコネクションプーリングに使われてたりしますね。

当記事では、Ecto非対応なデータストアの一つ、Redisのコネクションプーリングをしつつ、使う側にコネクションをあまり意識しなくて済むような実装をしてみましたので、その紹介をさせていただこうかと思います。


Elixirからpoolboyを使う

まずは公式Usage を見てみます。

worker = :poolboy.checkout(:pools) # #PID<0.487.0>

GenServer.call(worker, request) # ワーカープロセスで何かする
:poolboy.checkin(:pools, worker) # :ok

こんな感じで書き直せますかね。簡単ですね!

もちろんこのサンプルだけでは動きません!最初に目的のワーカープロセスを checkin してあげる必要があります。当たり前ですね。

プールする場所が落ちてたら困るなんて通り越して大パニックなので、これまた公式通りsupervisorのお世話になってみます。

def init([]) do

pool_options = [
{:name, {:local, :pools}},
{:worker_module, Worker.Module},
{:size, 5},
{:max_overflow, 10}
]
children = [
:poolboy.child_spec(:pools, pool_options, Application.get_env(:pools, worker_module))
]

supervise(children, strategy: :one_for_one)
end

わかりやすい! Worker.Module に当たるところに目的のモジュールを指定して、child_specで初期化パラメータを指定してあげれば、サクッと5プロセスがforkされてプールされている状態になります。

この状態であればUsageのコードが動きまくってしまうわけですね。素晴らしいですね。


Redisのコネクションプーリングをしてみる

やっと本題です。最終的な目標はこうです!!

def fugafuga do

redis_transaction do
Redis.Client.query(:set, "key", "fugafuga")
Redis.Client.query(:incr, "key") # 文字列がセットされてるのでエラーになる
Redis.Client.query(:set, "key", "hoge" |> String.reverse)
end
# 最終的に"key"にセットされているのは "egoh"
end
# このfugafuga関数の最終的な出力は、
# {:error, ["OK", "ERR value is not an integer or out of range", "OK"]}
# とします。:errorにしているのは、ひとつ失敗しているからです。


準備

ベースのRedis操作ライブラリとしてeredisを使います。


1. mix.exsのdepsにpoolboyとeredisを追加

いつものアイツなので割愛です。

Phoenixをベースにしてたりすると、Ectoのついでにpoolboyだけは付いてきますね。


2. config.exs(など)にeredisの接続設定と、poolboyの設定を入れておく


config/dev.exs

config :eredis,

host: 'localhost', # stringではなくchar listである必要があります!
port: 6379,
database: 0,
reconnect: :no_reconnect,
max_queue: :infinity

config :poolboy, :eredis_pool_options,
name: name: {:local, :eredis_pool},
worker_module: :eredis,
size: 5,
max_overflow: 10



3. eredisのプールをsupervisorに登録


lib/client.ex

defmodule Redis.Client do

use Supervisor

def start_link do
Supervisor.start_link(__MODULE__, [])
end

def init([]) do
children = [
:poolboy.child_spec(
:eredis_pool,
Application.get_env(:poolboy, :eredis_pool_options),
Application.get_all_env(:eredis)
)
]
supervise(children, strategy: :one_for_one)
end

def checkout, do: :poolboy.checkout(:eredis_pool)
def checkin(pid), do: :poolboy.checkin(:eredis_pool, pid)

def query(args) when is_list(args) do
:poolboy.transaction(:eredis_pool, &(:eredis.q(&1, args)))
end

def query(pid, args) when is_pid(pid) do
result = :eredis.q(pid, args)
# IO.inspect [pid, args, result]
result
end
end


冒頭のサンプルが拡張され、checkin/checkoutできたり、クエリーを発行できたりするようになっています。

この段階で、以下のコードが実行可能になっているはずです。


tekitou.ex

# この形だとプロセスとかあんまり考えなくていいので楽チン

Redis.Client.query(["set", "hoge", "fuga"]) # {:ok, "OK"}
Redis.Client.query(["get", "hoge"]) # {:ok, "fuga"}

# 自分でcheckout/checkinする運用もできる
pid = Redis.Client.checkout # #PID<0.417.0>
Redis.Client.query(pid, ["get", "hoge"]) # {:ok, "fuga"} / ちゃんとPID<0.417.0>を使っているはず
Redis.Client.checkin(pid) # :ok / 出したらしまう


poolboyは以上です!簡単すぎて最高ですね!!


poolboyでpidの存在を隠蔽したRedisクライアントライブラリが欲しい

完全に本題を逸脱しまして、Redisのコネクションプールをしつつ便利に使えるライブラリが欲しいという欲求を満たしていきましょう。

というのも、定番(?)のExredisにちょっと物足りなさを感じていまして1、それだったら新しくいい感じに使えるのを作っちゃおう!というのが本稿のモチベーションだったりするのです。


lib/transaction.ex

defmodule Redis.Transaction do

defp var_client, do: Macro.var(:client, nil)

defp parse({{:., meta1, [{:__aliases__, meta2, [:Redis, :Client]}, :query]}, meta3, args}) when is_list(args) do
{{:., meta1, [{:__aliases__, meta2, [:Redis, :Client]}, :query]}, meta3, [var_client, args]}
end
defp parse({atom, meta, ast}), do: {parse(atom), meta, parse(ast)}
defp parse({atom, ast}), do: {parse(atom), parse(ast)}

defp parse(asts) when is_list(asts) do
asts |> Enum.map(fn(ast) -> parse(ast) end)
end

defp parse(ast), do: ast

defmacro redis_transaction(block) do
ast = Macro.prewalk(Keyword.get(block, :do, nil), &parse/1)
quote do
unquote(var_client) = Redis.Client.checkout
Redis.Client.query(unquote(var_client), [:multi])
unquote(ast)
{:ok, result} = Redis.Client.query(unquote(var_client), [:exec])
Redis.Client.checkin(unquote(var_client))

errors = result |> Enum.filter(fn(v) -> Regex.match?(~r/\Aerr /i, v) end)
{if(length(errors) > 0, do: :error, else: :ok), result}
end
end
end


このモジュールをimportして以下のコードを実行すると、

def fugafuga do

redis_transaction do
Redis.Client.query(:set, "key", "fugafuga")
Redis.Client.query(:incr, "key")
Redis.Client.query(:set, "key", "hoge" |> String.reverse)
end
end

これが

def fugafuga do

client = Redis.Client.checkout
Redis.Client.query(client, [:multi])

Redis.Client.query(client, [:set, "key", "fugafuga"])
Redis.Client.query(client, [:incr, "key"])
Redis.Client.query(client, [:set, "key", "hoge" |> String.reverse])

{:ok, result} = Redis.Client.query(client, [:exec])
Redis.Client.checkin(client)

errors = result |> Enum.filter(fn(v) -> Regex.match?(~r/\Aerr /i, v) end)
{if(length(errors) > 0, do: :error, else: :ok), result}
end

こんな泥臭いコードに展開されてコンパイルされるわけですね〜。Macroすごい!!


まとめ

いかがでしたか?poolboyはデータストア系に限らず、forkして並べておきたいみたいな要件にはしっかり応えてくれるライブラリだと思います!

enjoy!! :thumbsup:

自分もせっかく作りこんだので、rubyでいうところのredis-objectsみたいなところを目指して頑張っていきたいなと考えています!!





  1. コネクションプールできないというのと、pipeline/multiがRedisコマンド丸出しであまりイケてない印象がある