poolboyライブラリを使用して、Riakへのコネクションを複数立ち上げることが必要になったので、そのやり方をメモします。
プロジェクトを作って、mix.exs
に必要なライブラリを記入します。
$ mix new riakpool --sup --module RiakPool
mix.exs
ファイル
defp deps do
[
{:poolboy, "~> 1.5"},
{:riakc, "~> 2.1"}
]
end
lib/riakpool.ex
ファイルにpoolboy
を設定します。worker_module
のRiakPool.Worker
をこれから作成します。
defmodule RiakPool do
use Application
def start(_type, _args) do
import Supervisor.Spec, warn: false
poolboy_config = [
{:name, {:local, :riak_pool}},
{:worker_module, RiakPool.Worker},
{:size, 2},
{:max_overflow, 1}
]
children = [
:poolboy.child_spec(:riak_pool,
poolboy_config,
[]
)
]
opts = [strategy: :one_for_one, name: RiakPool.Supervisor]
Supervisor.start_link(children, opts)
end
end
lib/riakpool/worker.ex
ファイルにRiakPool.Worker
モジュールを定義します。
defmodule RiakPool.Worker do
use GenServer
def start_link([]) do
# # ここにname: __MODULE__をつけることはできない、
# プロセスは全部同じ名前になってしまうため,エラーが出ます。
{:ok, pid} = GenServer.start_link(__MODULE__, :ok)
{:ok, pid}
end
def init(:ok) do
{:ok, %{}}
end
end
これで起動してみます。二個のプロセスが起動されていることが確認できます
$ iex -S mix
iex> :observer.start
iex(2)> worker1 = :poolboy.checkout(:riak_pool)
#PID<0.148.0>
iex(3)> worker2 = :poolboy.checkout(:riak_pool)
#PID<0.147.0>
iex(4)> worker3 = :poolboy.checkout(:riak_pool)
#PID<0.181.0>
iex(5)> worker4 = :poolboy.checkout(:riak_pool)
** (exit) exited in: :gen_server.call(:riak_pool, {:checkout, #Reference<0.0.1.219>, true}, 5000)
** (EXIT) time out
(stdlib) gen_server.erl:212: :gen_server.call/3
src/poolboy.erl:55: :poolboy.checkout/3
worker3
がcheckout
されている時点で、もう一つのprocess <0.181.0>
がスーパーバイザーに追加されました。これは{:max_overflow, 1}
がpoolboy_config
に設定されているためです。size
とmax_overflow
の違いはsize
数のプロセスは常駐している感じで、max_overflow
が臨時に追加されて、checkin
すると、消えてしまいます。このリンクを見ればわかります。
でもこれはriakになんの関係もないので、立ち上がっているプロセスをriakに接続するようにRiakPool.Worker
を修正します。
lib/riakpool/worker.ex
defmodule RiakPool.Worker do
require Logger
use GenServer
def start_link([]) do
{:ok, pid} = GenServer.start_link(__MODULE__, :ok)
{:ok, pid}
end
def init(:ok) do
# riakのipはchar listで渡す必要がある
{:ok, master_pid} =
:riakc_pb_socket.start_link('ip.of.your.riak',8087)
{:ok, %{riak_pid: master_pid}}
end
end
これで立ち上がっているプロセスは全部Riakにつながりました。
これでcheckout
したプロセスはRiakを操作できるようになりますので、Riakを操作する関数を追加します。
lib/riakpool/worker.ex
defmodule RiakPool.Worker do
require Logger
use GenServer
def start_link([]) do
{:ok, pid} = GenServer.start_link(__MODULE__, :ok)
{:ok, pid}
end
def init(:ok) do
{:ok, master_pid} = :riakc_pb_socket.start_link('210.140.87.185',8087)
{:ok, %{riak_pid: master_pid}}
end
def put_obj(pid, obj) do
GenServer.call(pid, {:put, obj})
end
def get_obj(pid, bucket, key) do
GenServer.call(pid, {:get, bucket, key})
end
def delete_obj(pid, bucket, key) do
GenServer.call(pid, {:delete, bucket, key})
end
def handle_call({:put, obj}, _from, %{riak_pid: master_pid} = state) do
res = :riakc_pb_socket.put(master_pid, obj)
{:reply, res, state}
end
def handle_call({:get, bucket, key}, _from, %{riak_pid: master_pid} = state) do
res = :riakc_pb_socket.get(master_pid, bucket, key)
{:reply, res, state}
end
def handle_call({:delete, bucket, key}, _from, %{riak_pid: master_pid} = state) do
res = :riakc_pb_socket.delete(master_pid, bucket, key)
{:reply, res, state}
end
end
これでget
, put
, delete
ができるようになります。
iex(1)> worker1 = :poolboy.checkout(:riak_pool)
#PID<0.151.0>
iex(2)> object = :riakc_obj.new("my bucket", "my key", "my data")
{:riakc_obj, "my bucket", "my key", :undefined, [], :undefined, "my data"}
iex(3)> RiakPool.Worker.put_obj(worker1, object)
:ok
iex(4)> RiakPool.Worker.get_obj(worker1, "my bucket", "my key")
{:ok,
{:riakc_obj, "my bucket", "my key",
<<107, 206, 97, 96, 96, 96, 204, 96, 202, 5, 82, 28, 147, 3, 54, 4, 134, 237, 176, 254, 152, 193, 148, 200, 152, 199, 202, 224, 154, 30, 114, 129, 47, 11, 0>>,
[{{:dict, 2, 16, 16, 8, 80, 48,
{[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []},
{{[], [], [], [], [], [], [], [], [], [],
[["X-Riak-VTag", 52, 101, 88, 98, 112, 55, 69, 79, 69, 117, 104, 112, 83,
77, 57, 51, 68, 100, 122, 106, 65, 117]], [], [],
[["X-Riak-Last-Modified" | {1457, 515333, 613191}]], [], []}}},
"my data"}], :undefined, :undefined}}
iex(5)> RiakPool.Worker.delete_obj(worker1, "my bucket", "my key")
:ok
iex(6)> RiakPool.Worker.get_obj(worker1, "my bucket", "my key")
{:error, :notfound}