LoginSignup
8

More than 5 years have passed since last update.

Elixir, PoolboyでRiakのコネクションプールを作る

Posted at

poolboyライブラリを使用して、Riakへのコネクションを複数立ち上げることが必要になったので、そのやり方をメモします。

プロジェクトを作って、mix.exsに必要なライブラリを記入します。

$ mix new riakpool --sup --module RiakPool

mix.exsファイル

defp deps do
  [
    {:poolboy, "~> 1.5"},
    {:riakc, "~> 2.1"}
  ]
end

lib/riakpool.exファイルにpoolboyを設定します。worker_moduleRiakPool.Workerをこれから作成します。

 defmodule RiakPool do
   use Application

   def start(_type, _args) do
     import Supervisor.Spec, warn: false

     poolboy_config = [
       {:name, {:local, :riak_pool}},
       {:worker_module, RiakPool.Worker},
       {:size, 2},
       {:max_overflow, 1}
     ]

     children = [
       :poolboy.child_spec(:riak_pool,
                             poolboy_config,
                             []
                           )
     ]
     opts = [strategy: :one_for_one, name: RiakPool.Supervisor]
     Supervisor.start_link(children, opts)
   end
 end

lib/riakpool/worker.exファイルにRiakPool.Workerモジュールを定義します。

 defmodule RiakPool.Worker do
   use GenServer

   def start_link([]) do
     # # ここにname: __MODULE__をつけることはできない、
     # プロセスは全部同じ名前になってしまうため,エラーが出ます。
     {:ok, pid} = GenServer.start_link(__MODULE__, :ok)
     {:ok, pid}
   end

   def init(:ok) do
     {:ok, %{}}
   end
 end

これで起動してみます。二個のプロセスが起動されていることが確認できます

$ iex -S mix
iex> :observer.start

スクリーンショット 2016-03-08 18.01.06.png

iex(2)> worker1 = :poolboy.checkout(:riak_pool)
#PID<0.148.0>
iex(3)> worker2 = :poolboy.checkout(:riak_pool)
#PID<0.147.0>
iex(4)> worker3 = :poolboy.checkout(:riak_pool)
#PID<0.181.0>
iex(5)> worker4 = :poolboy.checkout(:riak_pool)
** (exit) exited in: :gen_server.call(:riak_pool, {:checkout, #Reference<0.0.1.219>, true}, 5000)
    ** (EXIT) time out
    (stdlib) gen_server.erl:212: :gen_server.call/3
             src/poolboy.erl:55: :poolboy.checkout/3

worker3checkoutされている時点で、もう一つのprocess <0.181.0>がスーパーバイザーに追加されました。これは{:max_overflow, 1}poolboy_configに設定されているためです。sizemax_overflowの違いはsize数のプロセスは常駐している感じで、max_overflowが臨時に追加されて、checkinすると、消えてしまいます。このリンクを見ればわかります。

スクリーンショット 2016-03-08 18.02.20.png

でもこれはriakになんの関係もないので、立ち上がっているプロセスをriakに接続するようにRiakPool.Workerを修正します。

lib/riakpool/worker.ex

 defmodule RiakPool.Worker do
   require Logger
   use GenServer

   def start_link([]) do

     {:ok, pid} = GenServer.start_link(__MODULE__, :ok)
     {:ok, pid}
   end

   def init(:ok) do
     # riakのipはchar listで渡す必要がある
     {:ok, master_pid} = 
:riakc_pb_socket.start_link('ip.of.your.riak',8087)
     {:ok, %{riak_pid: master_pid}}
   end
end

これで立ち上がっているプロセスは全部Riakにつながりました。

スクリーンショット 2016-03-09 18.07.17.png

これでcheckoutしたプロセスはRiakを操作できるようになりますので、Riakを操作する関数を追加します。

lib/riakpool/worker.ex

 defmodule RiakPool.Worker do
   require Logger
   use GenServer

   def start_link([]) do
     {:ok, pid} = GenServer.start_link(__MODULE__, :ok)
     {:ok, pid}
   end

   def init(:ok) do
     {:ok, master_pid} = :riakc_pb_socket.start_link('210.140.87.185',8087)
     {:ok, %{riak_pid: master_pid}}
   end

   def put_obj(pid, obj) do
     GenServer.call(pid, {:put, obj})
   end

   def get_obj(pid, bucket, key) do
     GenServer.call(pid, {:get, bucket, key})
   end

   def delete_obj(pid, bucket, key) do
     GenServer.call(pid, {:delete, bucket, key})
   end

   def handle_call({:put, obj}, _from, %{riak_pid: master_pid} = state) do
     res = :riakc_pb_socket.put(master_pid, obj)
     {:reply, res, state}
   end

   def handle_call({:get, bucket, key}, _from, %{riak_pid: master_pid} = state) do
     res = :riakc_pb_socket.get(master_pid, bucket, key)
     {:reply, res, state}
   end

   def handle_call({:delete, bucket, key}, _from, %{riak_pid: master_pid} = state) do
     res = :riakc_pb_socket.delete(master_pid, bucket, key)
     {:reply, res, state}
   end
 end

これでget, put, deleteができるようになります。

iex(1)> worker1 = :poolboy.checkout(:riak_pool)
#PID<0.151.0>
iex(2)> object = :riakc_obj.new("my bucket", "my key", "my data")
{:riakc_obj, "my bucket", "my key", :undefined, [], :undefined, "my data"}
iex(3)> RiakPool.Worker.put_obj(worker1, object)
:ok
iex(4)> RiakPool.Worker.get_obj(worker1, "my bucket", "my key")
{:ok,
 {:riakc_obj, "my bucket", "my key",
  <<107, 206, 97, 96, 96, 96, 204, 96, 202, 5, 82, 28, 147, 3, 54, 4, 134, 237, 176, 254, 152, 193, 148, 200, 152, 199, 202, 224, 154, 30, 114, 129, 47, 11, 0>>,
  [{{:dict, 2, 16, 16, 8, 80, 48,
     {[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []},
     {{[], [], [], [], [], [], [], [], [], [],
       [["X-Riak-VTag", 52, 101, 88, 98, 112, 55, 69, 79, 69, 117, 104, 112, 83,
         77, 57, 51, 68, 100, 122, 106, 65, 117]], [], [],
       [["X-Riak-Last-Modified" | {1457, 515333, 613191}]], [], []}}},
    "my data"}], :undefined, :undefined}}
iex(5)> RiakPool.Worker.delete_obj(worker1, "my bucket", "my key")
:ok
iex(6)> RiakPool.Worker.get_obj(worker1, "my bucket", "my key")
{:error, :notfound}

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8