Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationEventAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
8
Help us understand the problem. What are the problem?

More than 5 years have passed since last update.

@ColdFreak

Elixir, PoolboyでRiakのコネクションプールを作る

poolboyライブラリを使用して、Riakへのコネクションを複数立ち上げることが必要になったので、そのやり方をメモします。

プロジェクトを作って、mix.exsに必要なライブラリを記入します。

$ mix new riakpool --sup --module RiakPool

mix.exsファイル

defp deps do
  [
    {:poolboy, "~> 1.5"},
    {:riakc, "~> 2.1"}
  ]
end

lib/riakpool.exファイルにpoolboyを設定します。worker_moduleRiakPool.Workerをこれから作成します。

 defmodule RiakPool do
   use Application

   def start(_type, _args) do
     import Supervisor.Spec, warn: false

     poolboy_config = [
       {:name, {:local, :riak_pool}},
       {:worker_module, RiakPool.Worker},
       {:size, 2},
       {:max_overflow, 1}
     ]

     children = [
       :poolboy.child_spec(:riak_pool,
                             poolboy_config,
                             []
                           )
     ]
     opts = [strategy: :one_for_one, name: RiakPool.Supervisor]
     Supervisor.start_link(children, opts)
   end
 end

lib/riakpool/worker.exファイルにRiakPool.Workerモジュールを定義します。

 defmodule RiakPool.Worker do
   use GenServer

   def start_link([]) do
     # # ここにname: __MODULE__をつけることはできない、
     # プロセスは全部同じ名前になってしまうため,エラーが出ます。
     {:ok, pid} = GenServer.start_link(__MODULE__, :ok)
     {:ok, pid}
   end

   def init(:ok) do
     {:ok, %{}}
   end
 end

これで起動してみます。二個のプロセスが起動されていることが確認できます

$ iex -S mix
iex> :observer.start

スクリーンショット 2016-03-08 18.01.06.png

iex(2)> worker1 = :poolboy.checkout(:riak_pool)
#PID<0.148.0>
iex(3)> worker2 = :poolboy.checkout(:riak_pool)
#PID<0.147.0>
iex(4)> worker3 = :poolboy.checkout(:riak_pool)
#PID<0.181.0>
iex(5)> worker4 = :poolboy.checkout(:riak_pool)
** (exit) exited in: :gen_server.call(:riak_pool, {:checkout, #Reference<0.0.1.219>, true}, 5000)
    ** (EXIT) time out
    (stdlib) gen_server.erl:212: :gen_server.call/3
             src/poolboy.erl:55: :poolboy.checkout/3

worker3checkoutされている時点で、もう一つのprocess <0.181.0>がスーパーバイザーに追加されました。これは{:max_overflow, 1}poolboy_configに設定されているためです。sizemax_overflowの違いはsize数のプロセスは常駐している感じで、max_overflowが臨時に追加されて、checkinすると、消えてしまいます。このリンクを見ればわかります。

スクリーンショット 2016-03-08 18.02.20.png

でもこれはriakになんの関係もないので、立ち上がっているプロセスをriakに接続するようにRiakPool.Workerを修正します。

lib/riakpool/worker.ex

 defmodule RiakPool.Worker do
   require Logger
   use GenServer

   def start_link([]) do

     {:ok, pid} = GenServer.start_link(__MODULE__, :ok)
     {:ok, pid}
   end

   def init(:ok) do
     # riakのipはchar listで渡す必要がある
     {:ok, master_pid} = 
:riakc_pb_socket.start_link('ip.of.your.riak',8087)
     {:ok, %{riak_pid: master_pid}}
   end
end

これで立ち上がっているプロセスは全部Riakにつながりました。

スクリーンショット 2016-03-09 18.07.17.png

これでcheckoutしたプロセスはRiakを操作できるようになりますので、Riakを操作する関数を追加します。

lib/riakpool/worker.ex

 defmodule RiakPool.Worker do
   require Logger
   use GenServer

   def start_link([]) do
     {:ok, pid} = GenServer.start_link(__MODULE__, :ok)
     {:ok, pid}
   end

   def init(:ok) do
     {:ok, master_pid} = :riakc_pb_socket.start_link('210.140.87.185',8087)
     {:ok, %{riak_pid: master_pid}}
   end

   def put_obj(pid, obj) do
     GenServer.call(pid, {:put, obj})
   end

   def get_obj(pid, bucket, key) do
     GenServer.call(pid, {:get, bucket, key})
   end

   def delete_obj(pid, bucket, key) do
     GenServer.call(pid, {:delete, bucket, key})
   end

   def handle_call({:put, obj}, _from, %{riak_pid: master_pid} = state) do
     res = :riakc_pb_socket.put(master_pid, obj)
     {:reply, res, state}
   end

   def handle_call({:get, bucket, key}, _from, %{riak_pid: master_pid} = state) do
     res = :riakc_pb_socket.get(master_pid, bucket, key)
     {:reply, res, state}
   end

   def handle_call({:delete, bucket, key}, _from, %{riak_pid: master_pid} = state) do
     res = :riakc_pb_socket.delete(master_pid, bucket, key)
     {:reply, res, state}
   end
 end

これでget, put, deleteができるようになります。

iex(1)> worker1 = :poolboy.checkout(:riak_pool)
#PID<0.151.0>
iex(2)> object = :riakc_obj.new("my bucket", "my key", "my data")
{:riakc_obj, "my bucket", "my key", :undefined, [], :undefined, "my data"}
iex(3)> RiakPool.Worker.put_obj(worker1, object)
:ok
iex(4)> RiakPool.Worker.get_obj(worker1, "my bucket", "my key")
{:ok,
 {:riakc_obj, "my bucket", "my key",
  <<107, 206, 97, 96, 96, 96, 204, 96, 202, 5, 82, 28, 147, 3, 54, 4, 134, 237, 176, 254, 152, 193, 148, 200, 152, 199, 202, 224, 154, 30, 114, 129, 47, 11, 0>>,
  [{{:dict, 2, 16, 16, 8, 80, 48,
     {[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []},
     {{[], [], [], [], [], [], [], [], [], [],
       [["X-Riak-VTag", 52, 101, 88, 98, 112, 55, 69, 79, 69, 117, 104, 112, 83,
         77, 57, 51, 68, 100, 122, 106, 65, 117]], [], [],
       [["X-Riak-Last-Modified" | {1457, 515333, 613191}]], [], []}}},
    "my data"}], :undefined, :undefined}}
iex(5)> RiakPool.Worker.delete_obj(worker1, "my bucket", "my key")
:ok
iex(6)> RiakPool.Worker.get_obj(worker1, "my bucket", "my key")
{:error, :notfound}
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
8
Help us understand the problem. What are the problem?