Posted at

Elixir, PoolboyでRiakのコネクションプールを作る

More than 3 years have passed since last update.

poolboyライブラリを使用して、Riakへのコネクションを複数立ち上げることが必要になったので、そのやり方をメモします。

プロジェクトを作って、mix.exsに必要なライブラリを記入します。

$ mix new riakpool --sup --module RiakPool

mix.exsファイル

defp deps do

[
{:poolboy, "~> 1.5"},
{:riakc, "~> 2.1"}
]
end

lib/riakpool.exファイルにpoolboyを設定します。worker_moduleRiakPool.Workerをこれから作成します。

 defmodule RiakPool do

use Application

def start(_type, _args) do
import Supervisor.Spec, warn: false

poolboy_config = [
{:name, {:local, :riak_pool}},
{:worker_module, RiakPool.Worker},
{:size, 2},
{:max_overflow, 1}
]

children = [
:poolboy.child_spec(:riak_pool,
poolboy_config,
[]
)
]
opts = [strategy: :one_for_one, name: RiakPool.Supervisor]
Supervisor.start_link(children, opts)
end
end

lib/riakpool/worker.exファイルにRiakPool.Workerモジュールを定義します。

 defmodule RiakPool.Worker do

use GenServer

def start_link([]) do
# # ここにname: __MODULE__をつけることはできない、
# プロセスは全部同じ名前になってしまうため,エラーが出ます。
{:ok, pid} = GenServer.start_link(__MODULE__, :ok)
{:ok, pid}
end

def init(:ok) do
{:ok, %{}}
end
end

これで起動してみます。二個のプロセスが起動されていることが確認できます

$ iex -S mix

iex> :observer.start

iex(2)> worker1 = :poolboy.checkout(:riak_pool)

#PID<0.148.0>
iex(3)> worker2 = :poolboy.checkout(:riak_pool)
#PID<0.147.0>
iex(4)> worker3 = :poolboy.checkout(:riak_pool)
#PID<0.181.0>
iex(5)> worker4 = :poolboy.checkout(:riak_pool)
** (exit) exited in: :gen_server.call(:riak_pool, {:checkout, #Reference<0.0.1.219>, true}, 5000)
** (EXIT) time out
(stdlib) gen_server.erl:212: :gen_server.call/3
src/poolboy.erl:55: :poolboy.checkout/3

worker3checkoutされている時点で、もう一つのprocess <0.181.0>がスーパーバイザーに追加されました。これは{:max_overflow, 1}poolboy_configに設定されているためです。sizemax_overflowの違いはsize数のプロセスは常駐している感じで、max_overflowが臨時に追加されて、checkinすると、消えてしまいます。このリンクを見ればわかります。

でもこれはriakになんの関係もないので、立ち上がっているプロセスをriakに接続するようにRiakPool.Workerを修正します。

lib/riakpool/worker.ex

 defmodule RiakPool.Worker do

require Logger
use GenServer

def start_link([]) do

{:ok, pid} = GenServer.start_link(__MODULE__, :ok)
{:ok, pid}
end

def init(:ok) do
# riakのipはchar listで渡す必要がある
{:ok, master_pid} =
:riakc_pb_socket.start_link('ip.of.your.riak',8087)
{:ok, %{riak_pid: master_pid}}
end
end

これで立ち上がっているプロセスは全部Riakにつながりました。

これでcheckoutしたプロセスはRiakを操作できるようになりますので、Riakを操作する関数を追加します。

lib/riakpool/worker.ex

 defmodule RiakPool.Worker do

require Logger
use GenServer

def start_link([]) do
{:ok, pid} = GenServer.start_link(__MODULE__, :ok)
{:ok, pid}
end

def init(:ok) do
{:ok, master_pid} = :riakc_pb_socket.start_link('210.140.87.185',8087)
{:ok, %{riak_pid: master_pid}}
end

def put_obj(pid, obj) do
GenServer.call(pid, {:put, obj})
end

def get_obj(pid, bucket, key) do
GenServer.call(pid, {:get, bucket, key})
end

def delete_obj(pid, bucket, key) do
GenServer.call(pid, {:delete, bucket, key})
end

def handle_call({:put, obj}, _from, %{riak_pid: master_pid} = state) do
res = :riakc_pb_socket.put(master_pid, obj)
{:reply, res, state}
end

def handle_call({:get, bucket, key}, _from, %{riak_pid: master_pid} = state) do
res = :riakc_pb_socket.get(master_pid, bucket, key)
{:reply, res, state}
end

def handle_call({:delete, bucket, key}, _from, %{riak_pid: master_pid} = state) do
res = :riakc_pb_socket.delete(master_pid, bucket, key)
{:reply, res, state}
end
end

これでget, put, deleteができるようになります。

iex(1)> worker1 = :poolboy.checkout(:riak_pool)

#PID<0.151.0>
iex(2)> object = :riakc_obj.new("my bucket", "my key", "my data")
{:riakc_obj, "my bucket", "my key", :undefined, [], :undefined, "my data"}
iex(3)> RiakPool.Worker.put_obj(worker1, object)
:ok
iex(4)> RiakPool.Worker.get_obj(worker1, "my bucket", "my key")
{:ok,
{:riakc_obj, "my bucket", "my key",
<<107, 206, 97, 96, 96, 96, 204, 96, 202, 5, 82, 28, 147, 3, 54, 4, 134, 237, 176, 254, 152, 193, 148, 200, 152, 199, 202, 224, 154, 30, 114, 129, 47, 11, 0>>,
[{{:dict, 2, 16, 16, 8, 80, 48,
{[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []},
{{[], [], [], [], [], [], [], [], [], [],
[["X-Riak-VTag", 52, 101, 88, 98, 112, 55, 69, 79, 69, 117, 104, 112, 83,
77, 57, 51, 68, 100, 122, 106, 65, 117]], [], [],
[["X-Riak-Last-Modified" | {1457, 515333, 613191}]], [], []}}},
"my data"}], :undefined, :undefined}}
iex(5)> RiakPool.Worker.delete_obj(worker1, "my bucket", "my key")
:ok
iex(6)> RiakPool.Worker.get_obj(worker1, "my bucket", "my key")
{:error, :notfound}