(この記事は、「fukuoka.ex(その2) Elixir Advent Calendar 2017」の9日目、および「Data Platform Advent Calendar 2017」の3日目です)
昨日は、@piacere_ex さんの「Elixirでデータ分析入門#1:様々なデータをインプットする」でした
#はじめに
Elixirで実際にプロダクト開発した経験からサンプルコードを交えて解説する本連載。
今回は、これまで連載で使用例を見てきた「Ecto.Multi」でのトランザクション実行でも、単品での実行でも、再利用可能なDB操作関数を作ってみます
以下コラムで行った実装をサンプルとして使用します
- 本連載第1回ElixirでSI開発入門 #1 Ectoで悲観的ロックで実装した例を元にUser、Stockの照会を再利用可能な実装として分割する
以下の環境で実装しました
- Elixir v1.6.1
- Phoenix v1.3.2
- Ecto v2.2.10
- PostgreSQL v10.2
本連載の記事はこちら
|> ElixirでSI開発入門 #1 Ectoで悲観的ロック
|> ElixirでSI開発入門 #2 Ectoで楽観的ロック
|> ElixirでSI開発入門 #3 主キーが"id "じゃない既存DBへの接続
|> [ElixirでSI開発入門 #4 本番パスワードを環境変数に持たせる]
(https://qiita.com/tuchiro/items/4ccba7e210c596c383af)
|> ElixirでSI開発入門 #5 Ectoで自由にSQLを書いて実行する(参照編)
|> ElixirでSI開発入門 #6 Ectoで自由にSQLを書いて実行する(更新編)
|> ElixirでSI開発入門 #7 Multiで使う関数を再利用可能な粒度に分割する
お礼:各種ランキングに69回のランクインを達成しました
4/27から、30日間に渡り、毎日お届けしている「季節外れのfukuoka.ex Elixir Advent Calendar」と「季節外れのfukuoka.ex(その2) Elixir Advent Calender」ですが、Qiitaトップページトレンドランキングに8回入賞、Elixirウィークリーランキングでは5週連続で1/2/3フィニッシュを飾り、各種ランキング通算で、トータル69回ものランクイン(前週比+60.1%)を達成しています
みなさまの暖かい応援に励まされ、合計452件ものQiita「いいね」(前週差+103件)もいただき、fukuoka.exアドバイザーズとfukuoka.exキャストの一同、ますます季節外れのfukuoka.ex Advent Calendar、頑張っていきます
Multi.run()で指定する関数の第一引数の中身を確認する
今までの連載では、Multi.run()で指定する関数(≒パイプ中のMulti.run()だと第三引数で指定)に渡される第一引数は、特に使う場面も無かったため、意識してきませんでしたが、単品での実行でも再利用可能とするため、以下コードでデバッグしてみます
defmodule MultiSample do
alias EctoMultiSample.Repo
alias Ecto.Multi
def multi_sample1(result, arrt) do
IO.puts("-----multi_sample1------------------")
IO.inspect(result)
IO.puts("-----------------------------------")
{:ok, "result_of_multi_sample1"}
end
def multi_sample2(result, arrt) do
IO.puts("-----multi_sample2------------------")
IO.inspect(result)
IO.puts("-----------------------------------")
{:ok, result} = nested_multi_sample(result, arrt)
end
def multi_sample3(result, arrt) do
IO.puts("-----multi_sample3------------------")
IO.inspect(result)
IO.puts("-----------------------------------")
{:ok, "result_of_multi_sample3"}
end
def nested_multi_sample(result, arrt) do
IO.puts("-----nested_multi_sample------------------")
IO.inspect(result)
IO.puts("-----------------------------------")
{:ok, "result_of_nested_multi_sample"}
end
def multi_function() do
Multi.new
|> Multi.run(:multi_sample1_atom, MultiSample, :multi_sample1, ["attr"])
|> Multi.run(:multi_sample2_atom, MultiSample, :multi_sample2, ["attr"])
|> Multi.run(:multi_sample3_atom, MultiSample, :multi_sample3, ["attr"])
|> Repo.transaction()
end
end
multi_sample1 ~ nested_multi_sampleまでの各関数は、Multi.run()で指定する関数で、第一引数をデバッグします。
Multi.run()で指定する関数のルール通り、
- 第一引数にマップ(Multi.run()では指定しない)
- 第二引数(以降)は関数に渡す任意の引数(パイプ中のMulti.run()では第四引数にリストで指定するもの)
- 戻り値は、{ok:, 戻り値}のタプル
の3つの要件を満たします。
さらに、multi_sample2では、内部でnested_multi_sampleを実行しています。
これらの関数を、一つのトランザクション処理として実行するのが、最後のmulti_function()です。
実行結果は下記となります。
iex(12)> MultiSample.multi_function()
-----multi_sample1------------------
[debug] QUERY OK db=0.2ms
begin []
%{}
-----------------------------------
-----multi_sample2------------------
%{multi_sample1_atom: "result_of_multi_sample1"}
-----------------------------------
-----nested_multi_sample------------------
%{multi_sample1_atom: "result_of_multi_sample1"}
-----------------------------------
-----multi_sample3------------------
%{
multi_sample1_atom: "result_of_multi_sample1",
multi_sample2_atom: "result_of_nested_multi_sample"
}
-----------------------------------
[debug] QUERY OK db=0.3ms
commit []
{:ok,
%{
multi_sample1_atom: "result_of_multi_sample1",
multi_sample2_atom: "result_of_nested_multi_sample",
multi_sample3_atom: "result_of_multi_sample3"
}}
resultには、前の関数で実行された戻り値が、パイプ中のMulti.run()で第一引数に指定したアトムをキーとするマップとして、順次詰められていることが分かります。
また、Multi.run()の関数として直接指定せず、関数の中から実行したnested_multi_sample()も、同一トランザクションで実行されていることが分かります。
これらから言えることは、
① Multi.run()で指定する関数を実行した際、第一引数には、前の関数群の実行結果がマップで渡される
② 前の関数の結果を取得したい場合は、第一引数で渡された実行結果マップから、アトムを使って取得できるので、以降の関数内でも利用できる
③ Multi.run()用に実装した関数であれば、Multi.run()用の関数の中で実行することもできる
ということです。
さて、Multi.run()用関数の、3つの要件である、
- 第一引数に前の関数の実行結果マップ
- 第二引数(以降)は関数に渡す任意の引数
- 戻り値は、{ok:, 戻り値}のタプル
を満たしていることに追加して、戻り値を工夫すれば、更なる再利用ができる気がしますが、ここで一つ問題があります。
これまで見てきたように、同一トランザクションで select for updateとinsert/updateを実行するには、
Multi.new
|> Multi.run(:select, Hoge, :get_changeset,[attr])
|> Multi.update(:update, hoge_changeset)
|> Repo.transaction()
という感じで実装すれば良い気がしますが、Ecto.Multiのサンプルを見ていたいただくとお分かりの通り、Multi.update()に引数として渡すhoge_changesetを渡す術がわかりません。
※近日発売のProgramming Ectoに載っているといいのですが・・・
しょうがないので、
「もういい、更新もMulti.run()で実行するから!」
と拗ねてみした。
ただ、それだと面白くないので、更新関数は引数にchangesetのリストを受け取り、複数行を同一トランザクションで更新できる関数として実装します。
実装手順
再利用可能な関数を実装する
実装しました。
まず、第1回で実装したAccountsモジュールにEcto.Multi用の照会・更新関数を追加します。
〜前略〜
defmodule EctoMultiSample.Accounts do
def get_user_with_lock(_result, user_id) do
[user] = User
|> where([u], u.id == ^user_id)
|> lock("FOR UPDATE")
|> Repo.all()
{:ok, user}
end
def update_users(result, user_changeset_list_atom) do
user_changeset_list = result[user_changeset_list_atom]
users = user_changeset_list
|> Enum.map(fn(user_changeset) ->
{:ok, user} = user_changeset
|> Repo.update()
user
end )
{:ok, users}
end
end
get_user_with_lockはEctoMultiSample.CartAction.buy()の実装からUserのselect for update部分を切り出した関数です。
update_usersは引数をリストにし、通常のupdate_userをEnum.mapで複数回実行するようにした関数です。
同じく、第1回で実装したItemsモジュールにもEcto.Multi用の照会・更新関数を追加します。
実装内容は、Accountsと同様で、対象がStockモデルになっただけです。
defmodule EctoMultiSample.Items do
〜前略〜
def get_stock_with_lock(_result,stock_id) do
[stock] = Stock
|> where([s], s.id == ^stock_id)
|> lock("FOR UPDATE")
|> Repo.all()
{:ok, stock}
end
def update_stocks(result, stock_changeset_list_atom) do
stock_changeset_list = result[stock_changeset_list_atom]
stocks = stock_changeset_list
|> Enum.map(fn(stock_changeset) ->
{:ok, stock} = stock_changeset
|> Repo.update()
end )
{:ok, stocks}
end
end
さらに、メインの決済処理を実装したEctoMultiSample.CartActionに以下の関数を追加します。
- ユーザーの所持金を減算するだけの関数
- 在庫の数量を減算するだけの関数
- これまで実装した再利用可能なMulti用関数を同一トランザクションで実行する関数
defmodule EctoMultiSample.CartAction do
〜前略〜
def decrease_user_credit(result, user_atom, stock_atom, amount) do
user = result[user_atom]
stock = result[stock_atom]
# 所持金を引く
uesr_credit = user.credit - ( amount * stock.price )
# ChangeSetを更新
user_changeset = User.changeset(user, %{credit: uesr_credit})
{:ok, user_changeset}
end
def decrease_stock_amount(result, stock_atom, amount) do
stock = result[stock_atom]
# 在庫を減らす
stock_amount = stock.amount - amount
# ChangeSetを更新
stock_changeset = Stock.changeset(stock, %{amount: stock_amount})
{:ok, stock_changeset}
end
def submit_multi_reusable(order) do
result = Ecto.Multi.new
|> Multi.run( :get_user, EctoMultiSample.Accounts, :get_user_with_lock, [order.user_id] )
|> Multi.run( :get_stock, EctoMultiSample.Items, :get_stock_with_lock, [order.stock_id] )
|> Multi.run( :decrease_user_credit, EctoMultiSample.CartAction, :decrease_user_credit, [:get_user, :get_stock, order.amount] )
|> Multi.run( :decrease_stock_amount, EctoMultiSample.CartAction, :decrease_stock_amount, [:get_stock, order.amount] )
|> Multi.run( :set_user, fn %{decrease_user_credit: user_changeset} -> {:ok, [user_changeset]} end)
|> Multi.run( :set_stock, fn %{decrease_stock_amount: stock_changeset} -> {:ok, [stock_changeset]} end)
|> Multi.run( :update_users, EctoMultiSample.Accounts, :update_users, [:set_user] )
|> Multi.run( :update_stocks, EctoMultiSample.Items, :update_stocks, [:set_stock] )
|> Repo.transaction()
end
end
decrease_user_credit()とdecrease_user_credit()は、EctoMultiSample.CartAction.buy()の実装から、ユーザの所持金を減算してuser_changesetを作る部分と、在庫の数量を減算してstock_changesetを作る部分を抜き出したものです。
ここまでの全ての関数は
- 第一引数に前の関数の実行結果マップ
- 第二引数(以降)は関数に渡す任意の引数
- 戻り値は、{ok:, 戻り値}のタプル
の3つの要件を満たしいていることに注目してください。
さらに共通のルールとして、
- resultから必要な情報を取得する関数の場合は、第二引数以降に第一引数のresulrtから、どのアトムで使用したい情報を取ってくるかを指定する
ようにしています。
これは、Multi.runのルール上、第二引数に指定したアトムをキーに、第一引数のマップ(ここではresult)にセットされた値を取り出す必要がありますが、制約としてアトムは、Multiトランザクション中で一意である必要がある為、
Mutli.new()
|> Multi.run(:user, Hoge, :get_user, [])
|> Multi.run(:user, Hoge, :update_user, [attr]) # <-- result.userでユーザを取得
|> Multi.run(:update_stock, Hoge, :update_stock, []) # <-- result.userで更新後のユーザを取得して在庫を更新
といった実装ができない為です。
こうなると、後発の処理は先行する処理がなんという名前(atom)で実行されるかを把握しておく必要がある為、それを引数で指定できなければ再利用に限界があります。
また、submit_multi_reusable()ではこれらを連続でMulti.run()でMulti構造体を構築して、Repo.transaction()で一気に実行しています。
前半の照会関数は単数のchangesetを戻り値としますが、
後続の更新処理はリストで処理するように実装した為、間に単数のchangesetを[changeset]として1要素しかないリストに変換する無名関数を実行しています。
|> Multi.run( :set_user, fn %{decrease_user_credit: user_changeset} -> {:ok, [user_changeset]} end)
|> Multi.run( :set_stock, fn %{decrease_stock_amount: stock_changeset} -> {:ok, [stock_changeset]} end)
「わざわざ、バカなことを・・・」というご意見もあるかと思いますが、Multi対応の関数をわざわざ実装したのに、単数の処理にしか使えないのはもったいないので、折角のMulti実装なのだから複数レコードを処理したいという考えからです。
これであれば、オンラインで1レコードだけ処理したい場合も、バッチ処理で複数レコードを一度に処理したい場合も同じ関数を再利用することができます。
今回のサンプルはプレーンなupdate処理なのピンと来ない方もいるかもしれませんが、これは例えば、
- 日中のオンライン処理で個々の取引の決済する
- 夜間のバッチで一括自動決済する
といった関係で同じ決済処理を実装する必要がある場合、同じ関数を使って更新部分実装する事ができるため、決済処理の仕様が変わった場合も2重の修正をする必要がなくなります。
※もちろんこの関係を成立させるためには上記二つの決済処理は"本質的に同じ処理"であり、異なるのは処理方式の差よるトランザクションの粒度のみである必要があります。
実行してみる
出来上がったコード実行してみましょう。
> iex -S mix
iex(1)> order = %{amount: 2, stock_id: 1, user_id: 1}
iex(1)> EctoMultiSample.CartAction.submit_multi_reusable(order)
下記のSQLログから第1回と同じく同一トランザクションで実行されている事がわかります。
2018-05-28 08:30:02.680 PDT [3270] LOG: execute POSTGREX_BEGIN: BEGIN
2018-05-28 08:30:02.682 PDT [3270] LOG: execute ecto_4: SELECT u0."id", u0."age", u0."credit", u0."mail", u0."name", u0."inserted_at", u0."updated_at" FROM "users" AS u0 WHERE (u0."id" = $1) FOR UPDATE
2018-05-28 08:30:02.682 PDT [3270] DETAIL: parameters: $1 = '1'
2018-05-28 08:30:02.684 PDT [3270] LOG: execute ecto_36: SELECT s0."id", s0."amount", s0."category", s0."name", s0."price", s0."inserted_at", s0."updated_at" FROM "stocks" AS s0 WHERE (s0."id" = $1) FOR UPDATE
2018-05-28 08:30:02.684 PDT [3270] DETAIL: parameters: $1 = '1'
2018-05-28 08:30:02.690 PDT [3270] LOG: execute <unnamed>: UPDATE "users" SET "credit" = $1, "updated_at" = $2 WHERE "id" = $3
2018-05-28 08:30:02.690 PDT [3270] DETAIL: parameters: $1 = '660', $2 = '2018-05-28 15:30:02.688885', $3 = '1'
2018-05-28 08:30:02.693 PDT [3270] LOG: execute <unnamed>: UPDATE "stocks" SET "amount" = $1, "updated_at" = $2 WHERE "id" = $3
2018-05-28 08:30:02.693 PDT [3270] DETAIL: parameters: $1 = '98', $2 = '2018-05-28 15:30:02.691928', $3 = '1'
2018-05-28 08:30:02.694 PDT [3270] LOG: execute POSTGREX_COMMIT: COMMIT
まとめ
Multi.run()で指定する関数について、まとめると、以下3点となります
① Multi.run()で指定する関数を実行した際、第一引数には、前の関数群の実行結果がマップで渡される
② 前の関数の結果を取得したい場合は、第一引数で渡された実行結果マップから、アトムを使って取得できるので、以降の関数内でも利用できる
③ Multi.run()用に実装した関数であれば、Multi.run()用の関数の中で実行することもできる
もう一度、最後の実行部分を見てみましょう。
result = Ecto.Multi.new
|> Multi.run( :get_user, EctoMultiSample.Accounts, :get_user_with_lock, [order.user_id] )
|> Multi.run( :get_stock, EctoMultiSample.Items, :get_stock_with_lock, [order.stock_id] )
|> Multi.run( :decrease_user_credit, EctoMultiSample.CartAction, :decrease_user_credit, [:get_user, :get_stock, order.amount] )
|> Multi.run( :decrease_stock_amount, EctoMultiSample.CartAction, :decrease_stock_amount, [:get_stock, order.amount] )
|> Multi.run( :set_user, fn %{decrease_user_credit: user_changeset} -> {:ok, [user_changeset]} end)
|> Multi.run( :set_stock, fn %{decrease_stock_amount: stock_changeset} -> {:ok, [stock_changeset]} end)
|> Multi.run( :update_users, EctoMultiSample.Accounts, :update_users, [:set_user] )
|> Multi.run( :update_stocks, EctoMultiSample.Items, :update_stocks, [:set_stock] )
|> Repo.transaction()
この実装であれば、関数を増やしてパイプの実行内容を入れ替えるだけで、色々な処理が組み替えられると思いませんか?
いやぁ、Elixirって本当にいいもんですね~
明日は、@twinbee さんの「Elixirから簡単にRustを呼び出せるRustler#4 SHIFTJISを変換する」です
★★★ 満員御礼!Elixir MeetUpを6月末に開催します ★★★
※応募多数により、増枠しました
「fukuoka.ex#11:DB/データサイエンスにコネクトするElixir」を6/22(金)19時に開催します
私もこの連載で取り上げてない、開発ネタを出す予定ですのでぜひ奮ってご応募ください。