Help us understand the problem. What is going on with this article?

Elixir, PoolboyでRiakのコネクションプールを作る

More than 3 years have passed since last update.

poolboyライブラリを使用して、Riakへのコネクションを複数立ち上げることが必要になったので、そのやり方をメモします。

プロジェクトを作って、mix.exsに必要なライブラリを記入します。

$ mix new riakpool --sup --module RiakPool

mix.exsファイル

defp deps do
  [
    {:poolboy, "~> 1.5"},
    {:riakc, "~> 2.1"}
  ]
end

lib/riakpool.exファイルにpoolboyを設定します。worker_moduleRiakPool.Workerをこれから作成します。

 defmodule RiakPool do
   use Application

   def start(_type, _args) do
     import Supervisor.Spec, warn: false

     poolboy_config = [
       {:name, {:local, :riak_pool}},
       {:worker_module, RiakPool.Worker},
       {:size, 2},
       {:max_overflow, 1}
     ]

     children = [
       :poolboy.child_spec(:riak_pool,
                             poolboy_config,
                             []
                           )
     ]
     opts = [strategy: :one_for_one, name: RiakPool.Supervisor]
     Supervisor.start_link(children, opts)
   end
 end

lib/riakpool/worker.exファイルにRiakPool.Workerモジュールを定義します。

 defmodule RiakPool.Worker do
   use GenServer

   def start_link([]) do
     # # ここにname: __MODULE__をつけることはできない、
     # プロセスは全部同じ名前になってしまうため,エラーが出ます。
     {:ok, pid} = GenServer.start_link(__MODULE__, :ok)
     {:ok, pid}
   end

   def init(:ok) do
     {:ok, %{}}
   end
 end

これで起動してみます。二個のプロセスが起動されていることが確認できます

$ iex -S mix
iex> :observer.start

スクリーンショット 2016-03-08 18.01.06.png

iex(2)> worker1 = :poolboy.checkout(:riak_pool)
#PID<0.148.0>
iex(3)> worker2 = :poolboy.checkout(:riak_pool)
#PID<0.147.0>
iex(4)> worker3 = :poolboy.checkout(:riak_pool)
#PID<0.181.0>
iex(5)> worker4 = :poolboy.checkout(:riak_pool)
** (exit) exited in: :gen_server.call(:riak_pool, {:checkout, #Reference<0.0.1.219>, true}, 5000)
    ** (EXIT) time out
    (stdlib) gen_server.erl:212: :gen_server.call/3
             src/poolboy.erl:55: :poolboy.checkout/3

worker3checkoutされている時点で、もう一つのprocess <0.181.0>がスーパーバイザーに追加されました。これは{:max_overflow, 1}poolboy_configに設定されているためです。sizemax_overflowの違いはsize数のプロセスは常駐している感じで、max_overflowが臨時に追加されて、checkinすると、消えてしまいます。このリンクを見ればわかります。

スクリーンショット 2016-03-08 18.02.20.png

でもこれはriakになんの関係もないので、立ち上がっているプロセスをriakに接続するようにRiakPool.Workerを修正します。

lib/riakpool/worker.ex

 defmodule RiakPool.Worker do
   require Logger
   use GenServer

   def start_link([]) do

     {:ok, pid} = GenServer.start_link(__MODULE__, :ok)
     {:ok, pid}
   end

   def init(:ok) do
     # riakのipはchar listで渡す必要がある
     {:ok, master_pid} = 
:riakc_pb_socket.start_link('ip.of.your.riak',8087)
     {:ok, %{riak_pid: master_pid}}
   end
end

これで立ち上がっているプロセスは全部Riakにつながりました。

スクリーンショット 2016-03-09 18.07.17.png

これでcheckoutしたプロセスはRiakを操作できるようになりますので、Riakを操作する関数を追加します。

lib/riakpool/worker.ex

 defmodule RiakPool.Worker do
   require Logger
   use GenServer

   def start_link([]) do
     {:ok, pid} = GenServer.start_link(__MODULE__, :ok)
     {:ok, pid}
   end

   def init(:ok) do
     {:ok, master_pid} = :riakc_pb_socket.start_link('210.140.87.185',8087)
     {:ok, %{riak_pid: master_pid}}
   end

   def put_obj(pid, obj) do
     GenServer.call(pid, {:put, obj})
   end

   def get_obj(pid, bucket, key) do
     GenServer.call(pid, {:get, bucket, key})
   end

   def delete_obj(pid, bucket, key) do
     GenServer.call(pid, {:delete, bucket, key})
   end

   def handle_call({:put, obj}, _from, %{riak_pid: master_pid} = state) do
     res = :riakc_pb_socket.put(master_pid, obj)
     {:reply, res, state}
   end

   def handle_call({:get, bucket, key}, _from, %{riak_pid: master_pid} = state) do
     res = :riakc_pb_socket.get(master_pid, bucket, key)
     {:reply, res, state}
   end

   def handle_call({:delete, bucket, key}, _from, %{riak_pid: master_pid} = state) do
     res = :riakc_pb_socket.delete(master_pid, bucket, key)
     {:reply, res, state}
   end
 end

これでget, put, deleteができるようになります。

iex(1)> worker1 = :poolboy.checkout(:riak_pool)
#PID<0.151.0>
iex(2)> object = :riakc_obj.new("my bucket", "my key", "my data")
{:riakc_obj, "my bucket", "my key", :undefined, [], :undefined, "my data"}
iex(3)> RiakPool.Worker.put_obj(worker1, object)
:ok
iex(4)> RiakPool.Worker.get_obj(worker1, "my bucket", "my key")
{:ok,
 {:riakc_obj, "my bucket", "my key",
  <<107, 206, 97, 96, 96, 96, 204, 96, 202, 5, 82, 28, 147, 3, 54, 4, 134, 237, 176, 254, 152, 193, 148, 200, 152, 199, 202, 224, 154, 30, 114, 129, 47, 11, 0>>,
  [{{:dict, 2, 16, 16, 8, 80, 48,
     {[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []},
     {{[], [], [], [], [], [], [], [], [], [],
       [["X-Riak-VTag", 52, 101, 88, 98, 112, 55, 69, 79, 69, 117, 104, 112, 83,
         77, 57, 51, 68, 100, 122, 106, 65, 117]], [], [],
       [["X-Riak-Last-Modified" | {1457, 515333, 613191}]], [], []}}},
    "my data"}], :undefined, :undefined}}
iex(5)> RiakPool.Worker.delete_obj(worker1, "my bucket", "my key")
:ok
iex(6)> RiakPool.Worker.get_obj(worker1, "my bucket", "my key")
{:error, :notfound}
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした