この記事はElixir Advent Calendar 2019 19日目の記事です!
昨日は masatam81さんの「年末の大掃除がてらリファクタリングをしよう」 でした。そちらもぜひ!
まえおき
(飛ばして大丈夫です)
ActiveTsv
いきなりElixirじゃない話ですが、 ActiveTsv
ってご存知ですか?
ActiveRecordっぽくTSV/CSVが読めるActiveTsvを作った
https://github.com/ksss/active_tsv
ActiveRecord
として操作するけどデータの読み先はCSVやTSVですよ、ってやつです。これが意外と便利で、ちょっとした動作の確認やテストデータなどにもってこいだったりします。
そこでおもった。
Ecto
でCSVを扱えたらめちゃくちゃ良いのでは??テストデータとか流し放題じゃんww
→ Ecto.Adapters.CSV
つくるぞー!
EctoのAdapterってどうやって作るんだろう
ゆーてEctoといえばドキュメントが厚いことで有名じゃないですか
ドキュメント読んでちょちょいのちょいや。そう思っていた時期がありました。
Ecto、実はEctoを利用する側のドキュメントは豊富なんですが、Adapterを実装する側に関してはまじでドキュメントありません。
一応Module一覧を覗けば Ecto.Adapter
から始まるモジュール群がなんか関係ありそうだな…という程度のことがわかります。
https://hexdocs.pm/ecto/api-reference.html#modules
ゆーてEctoってすでにいろんなアダプターがあるじゃないですか
ぐぐると出てくるし。
EctoMnesia
とか。
MnesiaなんてCSVとやってることほとんど同じなんじゃないですか??フォークして書き換えたらつくれるじゃーん…?
Ecto
は今バージョン3系なんですよね。EctoMnesiaその他、いろんなアダプターがあったのは2系でした。
3系になってデータの変換・バリデーターである Ecto
と Ecto.SQL
が切り離され、単体で利用しやすくなりました。神アプデです。(Ectoガンガン使ってAPIのリクエストパラメーターバリデーションとかやるとちょっと辛さもありますが)
このことが災いして昔のAdapterは互換性ないし、参考にしようとしても手間になります。じゃあ3系に存在するアダプターは何があるかというと、
Ecto.SQL
系
…だけです。
本題
というわけでこの記事はEctoを読んでAdapterを書いてみた、という内容になります。
とりあえず 何から取りかかればいいんだ…?
→ Ecto.Adapter
を実装したModuleを作るところ から取り掛かれば良さそう。
ほとんどノーヒント状態ですが、明らかにひとつだけ名前空間が上位なので、Ecto.Adapterが重要そうに見えます。EctoのRepoの設定で「Adapterを指定する」部分も、流石に Ecto.Adapter
を実装しているモジュールの話だと考えるのが妥当そうです(ドキュメントに明記されてはなさげ…)
また、逆引き的ですが、Ectoを利用する上でよく呼び出す Ecto.Repo
の実装を見に行くと、後述の理由で実装すべきBehaviourが割り出せたりします。
更にいうと、地味にMySQLのアダプターを作ってみよう!みたいな公式ブログの記事があります。ただし Ecto.Adapter
類の実装の必要性に一瞬触れたかと思いきや「便利なEcto.Adapters.SQLを使いましょうね」と言い始めるのであんまり参考になりません。
というわけでまずは Ecto.Adapter
について確認していきます。
Ecto.Adapter
について
役割
Ecto.Adapterの実装に関しては2つの役割がありそう
- アダプターアプリケーションのツリーの起動
- (なぜか)型変換周り
アプリケーションツリーの起動
主にこの2つのコールバック。
@callback ensure_all_started(config :: Keyword.t(), type :: :application.restart_type()) ::
{:ok, [atom]} | {:error, atom}
@callback init(config :: Keyword.t()) :: {:ok, :supervisor.child_spec(), adapter_meta}
概ね、以下のことをすれば良さそう
-
ensure_all_started/2
- 依存しているライブラリがちゃんとロードしていることを確認する
-
init/1
- もし実装するAdapter内部にGenServerなどが建つのであればその
child_spec
を返す
- もし実装するAdapter内部にGenServerなどが建つのであればその
型変換周り
そのアダプターにおいて利用する、データ上の型とEcto上の型との変換を行う関数を指定する。
@callback loaders(primitive_type :: Ecto.Type.primitive(), ecto_type :: Ecto.Type.t()) ::
[(term -> {:ok, term} | :error) | Ecto.Type.t()]
@callback dumpers(primitive_type :: Ecto.Type.primitive(), ecto_type :: Ecto.Type.t()) ::
[(term -> {:ok, term} | :error) | Ecto.Type.t()]
loaders/2
のドキュメントから引用すると、
def loaders(:boolean, type), do: [&bool_decode/1, type]
def loaders(_primitive, type), do: [type]
defp bool_decode(0), do: {:ok, false}
defp bool_decode(1), do: {:ok, true}
このように第一引数で Ecto.Schema
で指定する型をうけ、変換が必要な値に関しては変換用の関数を添えて返す実装をすることになる。
例えば今回はCSVだが、各セルを読むと型は文字列型のみになる:
iex> File.read!("./test.csv") \
...> |> String.split("\n") \
...> |> NimbleCSV.RFC4180.parse_enumerable()
[["1", "10", "100"], ["2", "20", "200"], ["3", "30", "300"]]
なので、例えば :integer
と指定された値に対しては
@impl Ecto.Adapter
def loaders(:integer, type), do: [&integer_decode/1, type]
def integer_decode(value), do: value |> Integer.parse() |> do_integer_decode()
defp do_integer_decode(:error), do: :error
defp do_integer_decode({value, ""}), do: {:ok, value}
defp do_integer_decode({_, _}), do: :error
みたいな関数を指定してあげれば良さそう。
Adapterとbehaviours
ところで、普段Ectoを利用するときは
defmodule Foo.Repo do
use Ecto.Repo,
otp_app: :foo,
adapter: Ecto.Adapter.Postgrex
end
みたいな設定でRepoを作ると思いますが、実はこのとき :adapter
に指定するモジュールによって Foo.Repo
に生えるメソッドが異なってきます。
Ecto.Repo.__using__/1
は、use Ecto.Repo
の :adapter
に指定するモジュールが実装しているBehaviourに応じて挿入する内容が変わるようなマクロになっています。
実際の Ecto.Repo.__using__/1
を抜粋するとこんな感じです:
defmacro __using__(opts) do
quote bind_quoted: [opts: opts] do
@behaviour Ecto.Repo
{otp_app, adapter, behaviours} =
Ecto.Repo.Supervisor.compile_config(__MODULE__, opts)
...
## Transactions
if Ecto.Adapter.Transaction in behaviours do
def transaction(fun_or_multi, opts \\ []) do
Ecto.Repo.Transaction.transaction(__MODULE__, get_dynamic_repo(), fun_or_multi, opts)
end
def in_transaction? do
Ecto.Repo.Transaction.in_transaction?(get_dynamic_repo())
end
@spec rollback(term) :: no_return
def rollback(value) do
Ecto.Repo.Transaction.rollback(get_dynamic_repo(), value)
end
end
普段はすべて実装してある Ecto.SQL
系を利用するので知らなかったんですが、自分でAdapterを実装すると Repo.all
とかすらできなくて?!ってなりました。自前のAdapterが特定のBehaviourを実装していなかったことが原因です。
「 Ecto.Adapter.Transaction
を実装すると以下のメソッドが生えてくるよ」とかドキュメントに書いてあると嬉しいなぁ
逆にこのマクロを参照することにより、自分の作成するAdapterにおいてどのBehaviourを実装するべきかがわかると言えます。例えば今回でいうと(CSVは読み専用なため)WRITEの機能が一切いらないので、Ecto.Adapter.Schema
のBehaviourを実装する必要がない、ということになります。
ここらへんは上で触れたMySQLアダプター実装してみようブログ記事でも地味にさらっと触れられていました。該当箇所を適当に訳すとこんな感じ:
アダプタは最低でも
Ecto.Adapter
ビヘイビアだけでも実装していなくてはなりません. 残りのビヘイビアは任意です。なぜならクラウドサービスのようにトランザクションやCREATE/DROPに対応していないものもあるからです。
これ非常に便利ですよね。Twitterからデータ取得するだけのEctoAdapterとかも作れそうです。いるかはともかく。
Ecto.Adapter.Queryable
さて、ビヘイビアの中で今回必ず実装しなくてはならないのが Ecto.Adapter.Queryable
です。Ecto.Adapter.Queryable
を実装すると all
get
exists?
などのREAD系関数が利用できるようになります。
どんな実装をする必要があるのか読んでいきます。
① prepare/2
と execute/6
Ecto.Adapter.Queryable
のメインは3つのコールバック関数です。まるっと引用します。
@doc """
Commands invoked to prepare a query for `all`, `update_all` and `delete_all`.
The returned result is given to `execute/6`.
"""
@callback prepare(atom :: :all | :update_all | :delete_all, query :: Ecto.Query.t()) ::
{:cache, prepared} | {:nocache, prepared}
@doc """
Executes a previously prepared query.
It must return a tuple containing the number of entries and
the result set as a list of lists. The result set may also be
`nil` if a particular operation does not support them.
The `adapter_meta` field is a map containing some of the fields found
in the `Ecto.Query` struct.
"""
@callback execute(adapter_meta, query_meta, query_cache, params :: list(), options) ::
{integer, [[term]] | nil}
@doc """
Streams a previously prepared query.
It returns a stream of values.
The `adapter_meta` field is a map containing some of the fields found
in the `Ecto.Query` struct.
"""
@callback stream(adapter_meta, query_meta, query_cache, params :: list(), options) ::
Enumerable.t
これドキュメント部分省略してないんですが、結構ドキュメント薄くないですか・・・?
prepareの結果はどこにゆくのか
ドキュメントに気になる1文があります。
The returned result is given to
execute/6
.
どう渡されるんだ……?
とりあえずGrepして prepare
の現れるところを探します。
defmodule Ecto.Query.Planner do
# Normalizes a query and its parameters.
@moduledoc false
defp query_without_cache(query, operation, adapter, counter) do
{query, select} = normalize(query, operation, adapter, counter)
{cache, prepared} = adapter.prepare(operation, query) # ← ここ
{cache, select, prepared}
end
execute/6
見当たらないけど・・・??
というわけで周囲を見渡すと、 Ecto.Query.Planner
はこんなふうになっています
def query(query, operation, cache, adapter, counter) do
{query, params, key} = plan(query, operation, adapter)
query_with_cache(key, query, operation, cache, adapter, counter, params)
end
defp query_with_cache(key, query, operation, cache, adapter, counter, params) do
case query_lookup(key, query, operation, cache, adapter, counter) do
{_, select, prepared} -> # キャッシュを使わないとき
{build_meta(query, select), {:nocache, prepared}, params}
{_key, :cached, select, cached} -> # ETSキャッシュにあったとき
update = &cache_update(cache, key, &1)
reset = &cache_reset(cache, key, &1)
{build_meta(query, select), {:cached, update, reset, cached}, params}
{_key, :cache, select, prepared} -> # ETSキャッシュに入れたとき
update = &cache_update(cache, key, &1)
{build_meta(query, select), {:cache, update, prepared}, params}
end
end
defp query_lookup(:nocache, query, operation, _cache, adapter, counter) do
query_without_cache(query, operation, adapter, counter)
end
defp query_lookup(key, query, operation, cache, adapter, counter) do
case :ets.lookup(cache, key) do
[term] -> term
[] -> query_prepare(query, operation, adapter, counter, cache, key)
end
end
defp query_prepare(query, operation, adapter, counter, cache, key) do
case query_without_cache(query, operation, adapter, counter) do
{:cache, select, prepared} ->
cache_insert(cache, key, {key, :cache, select, prepared})
{:nocache, _, _} = nocache ->
nocache
end
end
呼び出しを整理すると、こんな感じで最終的に Ecto.Query.Planner.query/5
で呼ばれています。
query/5
query_with_cache/7
query_looup/6
query_without_cache/4 # ETSにない場合
adapter.prepare/2
query_prepare # ETSにある場合
query_without_cache/4
adapter.prepare/2
やっていることをまとめると次のような感じ:
- 指定したクエリに対してプランナーからキャッシュキーが提示される
- ETSテーブルをそのキーで探す。
- エントリがない場合は
-
{_, _, :nocache}
が返る -
query_without_cache
に行ってadapter.prepare
が呼ばれる。
-
- エントリがありそうな場合は中身があるかチェック
- ETSキャッシュがある場合
- キャッシュの中身を返す
- ETSにキャッシュがない場合
-
adapter.prepare
が呼ばれる
-
- ETSキャッシュがある場合
ところで実際 Ecto.Query.Planner.query/5
はどこで使われているのかというと、 Ecto.Adapter.Queryable.prepare_query/3
だったりします。元のモジュールに帰ってきました!
…で結局 execute/6
が呼ばれていそうな場所には出会っていないわけだが…?
というわけでもう一度grepすると、 execute/6
は Ecto.Repo.Queryable.execute/4
で呼ばれていることがわかります。
defmodule Ecto.Repo.Queryable do
defp execute(operation, name, query, opts) when is_list(opts) do
{adapter, %{cache: cache, repo: repo} = adapter_meta} = Ecto.Repo.Registry.lookup(name)
{query, opts} = repo.prepare_query(operation, query, opts)
query = attach_prefix(query, opts)
{query_meta, prepared, params} = Planner.query(query, operation, cache, adapter, 0)
case query_meta do
%{select: nil} ->
adapter.execute(adapter_meta, query_meta, prepared, params, opts)
%{select: select, sources: sources, preloads: preloads} ->
...
{count, rows} =
adapter.execute(adapter_meta, query_meta, prepared, params, opts)
ここを見るとたしかに直前に Ecto.Query.Planner.query/5
も呼ばれています。 The returned result is given to execute/6.
はここのことなのか…!
というわけでここで Ecto.Query.Planner.query/5
のSpecを書いてみます。(関係ないところは変数をそのままおいてます)
@spec query(query, operation, cache, adapter, counter) :: {meta, planned, params}
when planned: {:nocache, prepared}
| {:cached, update, reset, cached}
| {:cache, update, prepared},
meta: %{select: term, preloads: term, sources: term}
:cache
タプルや :cached
タプルは今回は割愛しますが、ETSに格納したあと更にクエリを上書き保存したりするときに :cached
とかに変更するようです。(その場面までは追えませんでした)
非常に回り道をしたけれど、 Ecto.Repo.Queryable
のスニペットを見ると、上のSpecの planned
全体がそのまま Adapter.Queryable.execute/6
の第3引数に入る。その中の prepared
を作る関数が Adapter.Queryable.prepare/2
ということになる。分かりづらすぎる…
結局何をする関数なのか
結局 Adapter.Queryable.prepare/2
が何をするのかというと
Adapter.Queryable.execute/6
で扱えるいい感じのクエリタプルとかを作る
ことだと解釈しました。
キャッシュを利用しない場合に限定すると、
(A)
@callback prepare(atom :: :all | :update_all | :delete_all, query :: Ecto.Query.t()) ::
{:cache, prepared} | {:nocache, prepared}
(B)
@spec query(query, operation, cache, adapter, counter) :: {meta, planned, params}
when planned: {:nocache, prepared}
| {:cached, update, reset, cached}
| {:cache, update, prepared}
(C)
@callback execute(adapter_meta, query_meta, query_cache, params :: list(), options) ::
{integer, [[term]] | nil}
(A)のprepared
が(B)のplanned
になって(C)にquery_cache
として渡されます ということだった。そう書いておいてほしかった~
めちゃくちゃ簡易な実装をするのであれば(=キャッシュを使わないのであれば) Adapter.Queryable.prepare/2
では何もしなくていいのかもしれないです。
prepare
execute
間でETSを挟むことから、おそらくクエリの準備自体にDBアクセスを挟むなどの場合に結果をキャッシュして高速化を図るための仕様なのだと思います。今回に限って言えば問題にならないポイントでした。
というわけで今回は Adapter.Queryable.prepare/2
に来た2引数をそのまま Adapter.Queryable.execute/6
に渡すことにしました。
(ところで execute/6
じゃなくて execute/5
では…?)
② execute/6
・ stream/6
長々と prepare/2
を追いかけましたが、対してこれは非常にシンプルで、クエリに応じたデータ列をかえせばいいだけです。ドキュメントも(query_cache
の正体さえわかっていれば)必要十分だと思います。
(※フラグです。後で引っかかります)
実装
長いコードジャンプの旅を終えて、ようやく実装を始めることができます。
かんたんに以下のような方針で実装を行いました。
-
NimbleCSV
- 比較的シンプルなCSVパーサ
- 1CSVファイルごとに1つ
GenServer
を建てる:Table
- CSVの中身をオンメモリで保持するため
- ファイルパスやCSV or TSV、ヘッダーの有無など固有の情報も保持する
- ↑をまとめるDynamicSupervisor:
Database
- テーブル名ごとに一意な子供を持つためにRegistryを利用する
TLDR:実装物↓
https://github.com/ndac-todoroki/ecto_csv
めちゃくちゃ雑なのは許してください。
大まかに動かすとこんな感じです。
defmodule Friends.Repo do
use Ecto.Repo,
otp_app: :friends,
adapter: EctoCsv.Adapter,
read_only: true
end
defmodule Friends.Person do
use Ecto.Schema
import EctoCsv.Database.Settings
@primary_key false
@table_settings path: "person.csv", format: :csv, name: "people"
schema "people" do
field :age, :integer, default: 0
field :name, :string
end
set_table!(@table_settings)
end
iex(1)> Friends.Person |> Friends.Repo.all()
[
%Friends.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "people">,
age: 17,
name: "Taro Otsuka"
},
%Friends.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "people">,
age: 18,
name: "Hanako Koji"
},
%Friends.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "people">,
age: 35,
name: "Masuo Fuguta"
}
]
うごいた!
実装で困ったこと
execute/6
・ stream/6
のドキュメントも辛かった
フラグ回収です。
execute/6
と stream/6
、まるで兄弟メソッド…というかStreamかそうじゃないかの違いしかないぜ!みたいな顔をしていますよね?
それぞれ返すべきEnumerable.tの形が違うんです。
execute/6
の場合
2要素のタプルを返しますが、1要素目が結果の総行数、2要素目が結果のリストです。結果は1行を要素のリストで返すので、実際にはリストのリストになります。
iex> execute(...)
{3, [[1, 1 1], [2, 2, 2], [3, 3, 3]]}
stream/6
の場合
execute/6
と同じ形の_タプルのStreamを返す必要があります_。しかし1行ずつです。上と同じデータを返すとこうなるはずです。
iex> stream(...) |> Enum.to_list()
[
{1, [[1, 1, 1]]},
{2, [[2, 2, 2]]},
{3, [[3, 3, 3]]}
]
ちなみに呼び出し部分では
stream
|> Stream.flat_map(fn {_, rows} -> rows end)
|> Stream.map(preprocessor)
|> Stream.map(postprocessor)
となっていて_第1要素が捨てられています_。ええぇ…
出来なかったこと
スキーマファイルのModule Attributeをいい感じに読めない
スキーマを書くときにこういう書き方をしますね
defmodule Person do
use Ecto.Schema
@primary_key {:id, :binary_id, autogenerate: true}
@foreign_key_type :binary_id
schema "people" do
field :name, :string
end
end
これ Ecto.Schema.schema/2
というマクロなんですが、この展開時に特定のModule Attributesについて読み込みを行うことでPrimaryKeyなどの設定がなされています。
ここにオレオレAdapterの際に利用するAttributeほしいじゃないですか!
defmodule Person do
use Ecto.Schema
use EctoCsv.Schema
@csv_file_path "~/Documents/people.csv"
@csv_file_type :csv
@csv_has_header false
schema "people" do
field :name, :string
end
end
Ecto.Schema.schema/2
に拡張性がなくて断念しました。最終的には set_table!
というマクロを呼ぶことでお茶を濁しています。
Schemaと結果行のマッピング
今回は :all
のみ、しかもリストをそのまま返すだけ、というとても雑な実装をしています。
今回Ecto.Schemaの内部までは見ていないんですが、おそらく EctoCsv.Adapter.Queryable.execute
の結果の各行の各要素をSchemaの構造体に詰めていると考えられます。(EctoCsv.Adapter.Queryable.execute
が返すのは値のリストだし、 schema do ... end
の中の順番を変えると構造体のキーが入れ替わることからも観察できる。)
なので本来はSchemaとデータの各行の対応付けをする必要があります。
例えば select
を行うためには EctoCsv.Adapter.Queryable.execute
が各データ行の対応する要素を削除する必要があるし、もしCSVにヘッダーがあるのならば、行の名前を利用してデータをソートしてあげることで、Schema定義を順不同で書いても問題なくなります。
EctoCsv.Adapter.Queryable.execute
ではこの他にも limit
where
などの処理を行う必要があります。
…が、正直あとは楽しい関数型データ処理をするだけなので楽にできるはず!
まとめ
- Ectoのドキュメント薄くて辛かった
- Adapterは
Repo
にほしい関数に合わせてBehaviourを実装していくといいです -
prepare
の結果はexecute
の第3引数にラップされた状態で渡ります - CSVを読むアダプターが書けました
- READだけなら比較的簡素に書くことが出来ます
皆さんもぜひEctoのAdapter書いてみてください!
明日は
enerickさんの担当です!お楽しみに!