LoginSignup
17
2

More than 3 years have passed since last update.

EctoでCSVを読めるようにする

Last updated at Posted at 2019-12-19

この記事はElixir Advent Calendar 2019 19日目の記事です!
昨日は masatam81さんの「年末の大掃除がてらリファクタリングをしよう」 でした。そちらもぜひ!

まえおき

(飛ばして大丈夫です)

ActiveTsv

いきなりElixirじゃない話ですが、 ActiveTsv ってご存知ですか?
ActiveRecordっぽくTSV/CSVが読めるActiveTsvを作った
https://github.com/ksss/active_tsv
ActiveRecord として操作するけどデータの読み先はCSVやTSVですよ、ってやつです。これが意外と便利で、ちょっとした動作の確認やテストデータなどにもってこいだったりします。

そこでおもった。

Ecto でCSVを扱えたらめちゃくちゃ良いのでは??テストデータとか流し放題じゃんww
Ecto.Adapters.CSV つくるぞー!

EctoのAdapterってどうやって作るんだろう :thinking:

ゆーてEctoといえばドキュメントが厚いことで有名じゃないですか

ドキュメント読んでちょちょいのちょいや。そう思っていた時期がありました。

Ecto、実はEctoを利用する側のドキュメントは豊富なんですが、Adapterを実装する側に関してはまじでドキュメントありません。
一応Module一覧を覗けば Ecto.Adapter から始まるモジュール群がなんか関係ありそうだな…という程度のことがわかります。
スクリーンショット 2019-12-19 7.39.01.png
https://hexdocs.pm/ecto/api-reference.html#modules

ゆーてEctoってすでにいろんなアダプターがあるじゃないですか

ぐぐると出てくるし。
EctoMnesia とか。
MnesiaなんてCSVとやってることほとんど同じなんじゃないですか??フォークして書き換えたらつくれるじゃーん…?

Ecto は今バージョン3系なんですよね。EctoMnesiaその他、いろんなアダプターがあったのは2系でした。
3系になってデータの変換・バリデーターである EctoEcto.SQL が切り離され、単体で利用しやすくなりました。神アプデです。(Ectoガンガン使ってAPIのリクエストパラメーターバリデーションとかやるとちょっと辛さもありますが)
このことが災いして昔のAdapterは互換性ないし、参考にしようとしても手間になります。じゃあ3系に存在するアダプターは何があるかというと、

Ecto.SQL

…だけです。

本題

というわけでこの記事はEctoを読んでAdapterを書いてみた、という内容になります。

とりあえず 何から取りかかればいいんだ…?

Ecto.Adapter を実装したModuleを作るところ から取り掛かれば良さそう。

ほとんどノーヒント状態ですが、明らかにひとつだけ名前空間が上位なので、Ecto.Adapterが重要そうに見えます。EctoのRepoの設定で「Adapterを指定する」部分も、流石に Ecto.Adapter を実装しているモジュールの話だと考えるのが妥当そうです(ドキュメントに明記されてはなさげ…)

また、逆引き的ですが、Ectoを利用する上でよく呼び出す Ecto.Repo の実装を見に行くと、後述の理由で実装すべきBehaviourが割り出せたりします。

更にいうと、地味にMySQLのアダプターを作ってみよう!みたいな公式ブログの記事があります。ただし Ecto.Adapter 類の実装の必要性に一瞬触れたかと思いきや「便利なEcto.Adapters.SQLを使いましょうね」と言い始めるのであんまり参考になりません。

というわけでまずは Ecto.Adapter について確認していきます。

Ecto.Adapter について

役割

Ecto.Adapterの実装に関しては2つの役割がありそう

  • アダプターアプリケーションのツリーの起動
  • (なぜか)型変換周り
アプリケーションツリーの起動

主にこの2つのコールバック。

@callback ensure_all_started(config :: Keyword.t(), type :: :application.restart_type()) ::
            {:ok, [atom]} | {:error, atom}

@callback init(config :: Keyword.t()) :: {:ok, :supervisor.child_spec(), adapter_meta}

概ね、以下のことをすれば良さそう

  • ensure_all_started/2
    • 依存しているライブラリがちゃんとロードしていることを確認する
  • init/1
    • もし実装するAdapter内部にGenServerなどが建つのであればその child_spec を返す
型変換周り

そのアダプターにおいて利用する、データ上の型とEcto上の型との変換を行う関数を指定する。

@callback loaders(primitive_type :: Ecto.Type.primitive(), ecto_type :: Ecto.Type.t()) ::
            [(term -> {:ok, term} | :error) | Ecto.Type.t()]

@callback dumpers(primitive_type :: Ecto.Type.primitive(), ecto_type :: Ecto.Type.t()) ::
            [(term -> {:ok, term} | :error) | Ecto.Type.t()]

loaders/2 のドキュメントから引用すると、

def loaders(:boolean, type), do: [&bool_decode/1, type]
def loaders(_primitive, type), do: [type]
defp bool_decode(0), do: {:ok, false}
defp bool_decode(1), do: {:ok, true}

このように第一引数で Ecto.Schema で指定する型をうけ、変換が必要な値に関しては変換用の関数を添えて返す実装をすることになる。

例えば今回はCSVだが、各セルを読むと型は文字列型のみになる:

iex> File.read!("./test.csv") \
...> |> String.split("\n") \
...> |> NimbleCSV.RFC4180.parse_enumerable() 
[["1", "10", "100"], ["2", "20", "200"], ["3", "30", "300"]]

なので、例えば :integer と指定された値に対しては

@impl Ecto.Adapter
def loaders(:integer, type), do: [&integer_decode/1, type]

def integer_decode(value), do: value |> Integer.parse() |> do_integer_decode()

defp do_integer_decode(:error), do: :error
defp do_integer_decode({value, ""}), do: {:ok, value}
defp do_integer_decode({_, _}), do: :error

みたいな関数を指定してあげれば良さそう。

Adapterとbehaviours

ところで、普段Ectoを利用するときは

defmodule Foo.Repo do
  use Ecto.Repo,
    otp_app: :foo,
    adapter: Ecto.Adapter.Postgrex
end

みたいな設定でRepoを作ると思いますが、実はこのとき :adapter に指定するモジュールによって Foo.Repo に生えるメソッドが異なってきます。

Ecto.Repo.__using__/1 は、use Ecto.Repo:adapter に指定するモジュールが実装しているBehaviourに応じて挿入する内容が変わるようなマクロになっています。

実際の Ecto.Repo.__using__/1 を抜粋するとこんな感じです:

defmacro __using__(opts) do
  quote bind_quoted: [opts: opts] do
    @behaviour Ecto.Repo

    {otp_app, adapter, behaviours} =
      Ecto.Repo.Supervisor.compile_config(__MODULE__, opts)

  ...

    ## Transactions
    if Ecto.Adapter.Transaction in behaviours do
      def transaction(fun_or_multi, opts \\ []) do
        Ecto.Repo.Transaction.transaction(__MODULE__, get_dynamic_repo(), fun_or_multi, opts)
      end

      def in_transaction? do
        Ecto.Repo.Transaction.in_transaction?(get_dynamic_repo())
      end

      @spec rollback(term) :: no_return
      def rollback(value) do
        Ecto.Repo.Transaction.rollback(get_dynamic_repo(), value)
      end
    end

普段はすべて実装してある Ecto.SQL 系を利用するので知らなかったんですが、自分でAdapterを実装すると Repo.all とかすらできなくて?!ってなりました。自前のAdapterが特定のBehaviourを実装していなかったことが原因です。

Ecto.Adapter.Transaction を実装すると以下のメソッドが生えてくるよ」とかドキュメントに書いてあると嬉しいなぁ

逆にこのマクロを参照することにより、自分の作成するAdapterにおいてどのBehaviourを実装するべきかがわかると言えます。例えば今回でいうと(CSVは読み専用なため)WRITEの機能が一切いらないので、Ecto.Adapter.Schema のBehaviourを実装する必要がない、ということになります。

ここらへんは上で触れたMySQLアダプター実装してみようブログ記事でも地味にさらっと触れられていました。該当箇所を適当に訳すとこんな感じ:

アダプタは最低でも Ecto.Adapter ビヘイビアだけでも実装していなくてはなりません. 残りのビヘイビアは任意です。なぜならクラウドサービスのようにトランザクションやCREATE/DROPに対応していないものもあるからです。

これ非常に便利ですよね。Twitterからデータ取得するだけのEctoAdapterとかも作れそうです。いるかはともかく。

Ecto.Adapter.Queryable

さて、ビヘイビアの中で今回必ず実装しなくてはならないのが Ecto.Adapter.Queryable です。Ecto.Adapter.Queryable を実装すると all get exists? などのREAD系関数が利用できるようになります。
どんな実装をする必要があるのか読んでいきます。

prepare/2execute/6

Ecto.Adapter.Queryable のメインは3つのコールバック関数です。まるっと引用します。

ecto/lib/ecto/adapter/queryable.ex
@doc """
Commands invoked to prepare a query for `all`, `update_all` and `delete_all`.

The returned result is given to `execute/6`.
"""
@callback prepare(atom :: :all | :update_all | :delete_all, query :: Ecto.Query.t()) ::
            {:cache, prepared} | {:nocache, prepared}

@doc """
Executes a previously prepared query.

It must return a tuple containing the number of entries and
the result set as a list of lists. The result set may also be
`nil` if a particular operation does not support them.

The `adapter_meta` field is a map containing some of the fields found
in the `Ecto.Query` struct.
"""
@callback execute(adapter_meta, query_meta, query_cache, params :: list(), options) ::
            {integer, [[term]] | nil}

@doc """
Streams a previously prepared query.

It returns a stream of values.

The `adapter_meta` field is a map containing some of the fields found
in the `Ecto.Query` struct.
"""
@callback stream(adapter_meta, query_meta, query_cache, params :: list(), options) ::
            Enumerable.t

これドキュメント部分省略してないんですが、結構ドキュメント薄くないですか・・・?

prepareの結果はどこにゆくのか

ドキュメントに気になる1文があります。

The returned result is given to execute/6.

どう渡されるんだ……?
とりあえずGrepして prepare の現れるところを探します。

ecto/lib/ecto/query/planner.ex
defmodule Ecto.Query.Planner do
  # Normalizes a query and its parameters.
  @moduledoc false

  defp query_without_cache(query, operation, adapter, counter) do
    {query, select} = normalize(query, operation, adapter, counter)
    {cache, prepared} = adapter.prepare(operation, query)    # ← ここ
    {cache, select, prepared}
  end

execute/6 見当たらないけど・・・??
というわけで周囲を見渡すと、 Ecto.Query.Planner はこんなふうになっています

ecto/lib/ecto/query/planner.ex
def query(query, operation, cache, adapter, counter) do
  {query, params, key} = plan(query, operation, adapter)
  query_with_cache(key, query, operation, cache, adapter, counter, params)
end

defp query_with_cache(key, query, operation, cache, adapter, counter, params) do
  case query_lookup(key, query, operation, cache, adapter, counter) do
    {_, select, prepared} ->             # キャッシュを使わないとき
      {build_meta(query, select), {:nocache, prepared}, params}
    {_key, :cached, select, cached} ->   # ETSキャッシュにあったとき
      update = &cache_update(cache, key, &1)
      reset = &cache_reset(cache, key, &1)
      {build_meta(query, select), {:cached, update, reset, cached}, params}
    {_key, :cache, select, prepared} ->  # ETSキャッシュに入れたとき
      update = &cache_update(cache, key, &1)
      {build_meta(query, select), {:cache, update, prepared}, params}
  end
end

defp query_lookup(:nocache, query, operation, _cache, adapter, counter) do
  query_without_cache(query, operation, adapter, counter)
end

defp query_lookup(key, query, operation, cache, adapter, counter) do
  case :ets.lookup(cache, key) do
    [term] -> term
    [] -> query_prepare(query, operation, adapter, counter, cache, key)
  end
end

defp query_prepare(query, operation, adapter, counter, cache, key) do
  case query_without_cache(query, operation, adapter, counter) do
    {:cache, select, prepared} ->
      cache_insert(cache, key, {key, :cache, select, prepared})
    {:nocache, _, _} = nocache ->
      nocache
  end
end

呼び出しを整理すると、こんな感じで最終的に Ecto.Query.Planner.query/5 で呼ばれています。

query/5
  query_with_cache/7
    query_looup/6
      query_without_cache/4  # ETSにない場合
        adapter.prepare/2
      query_prepare          # ETSにある場合
        query_without_cache/4
          adapter.prepare/2

やっていることをまとめると次のような感じ:

  • 指定したクエリに対してプランナーからキャッシュキーが提示される
  • ETSテーブルをそのキーで探す。
  • エントリがない場合は
    • {_, _, :nocache} が返る
    • query_without_cache に行って adapter.prepare が呼ばれる。
  • エントリがありそうな場合は中身があるかチェック
    • ETSキャッシュがある場合
      • キャッシュの中身を返す
    • ETSにキャッシュがない場合
      • adapter.prepare が呼ばれる

ところで実際 Ecto.Query.Planner.query/5 はどこで使われているのかというと、 Ecto.Adapter.Queryable.prepare_query/3 だったりします。元のモジュールに帰ってきました!

…で結局 execute/6 が呼ばれていそうな場所には出会っていないわけだが…?:thinking:

というわけでもう一度grepすると、 execute/6Ecto.Repo.Queryable.execute/4 で呼ばれていることがわかります。

ecto/lib/ecto/repo/queryable.ex
defmodule Ecto.Repo.Queryable do
  defp execute(operation, name, query, opts) when is_list(opts) do
    {adapter, %{cache: cache, repo: repo} = adapter_meta} = Ecto.Repo.Registry.lookup(name)
    {query, opts} = repo.prepare_query(operation, query, opts)
    query = attach_prefix(query, opts)
    {query_meta, prepared, params} = Planner.query(query, operation, cache, adapter, 0)

    case query_meta do
      %{select: nil} ->
        adapter.execute(adapter_meta, query_meta, prepared, params, opts)

      %{select: select, sources: sources, preloads: preloads} ->
        ...
        {count, rows} =
          adapter.execute(adapter_meta, query_meta, prepared, params, opts)

ここを見るとたしかに直前に Ecto.Query.Planner.query/5 も呼ばれています。 The returned result is given to execute/6. はここのことなのか…!
というわけでここで Ecto.Query.Planner.query/5 のSpecを書いてみます。(関係ないところは変数をそのままおいてます)

@spec query(query, operation, cache, adapter, counter) :: {meta, planned, params}
  when planned: {:nocache, prepared}
                | {:cached, update, reset, cached}
                | {:cache, update, prepared},
       meta: %{select: term, preloads: term, sources: term}

:cache タプルや :cached タプルは今回は割愛しますが、ETSに格納したあと更にクエリを上書き保存したりするときに :cached とかに変更するようです。(その場面までは追えませんでした)

非常に回り道をしたけれど、 Ecto.Repo.Queryable のスニペットを見ると、上のSpecの planned 全体がそのまま Adapter.Queryable.execute/6 の第3引数に入る。その中の prepared を作る関数が Adapter.Queryable.prepare/2 ということになる。分かりづらすぎる…

結局何をする関数なのか

結局 Adapter.Queryable.prepare/2 が何をするのかというと

Adapter.Queryable.execute/6 で扱えるいい感じのクエリタプルとかを作る

ことだと解釈しました。
キャッシュを利用しない場合に限定すると、

(A)
@callback prepare(atom :: :all | :update_all | :delete_all, query :: Ecto.Query.t()) ::
            {:cache, prepared} | {:nocache, prepared}

(B)
@spec query(query, operation, cache, adapter, counter) :: {meta, planned, params}
  when planned: {:nocache, prepared}
                | {:cached, update, reset, cached}
                | {:cache, update, prepared}

(C)
@callback execute(adapter_meta, query_meta, query_cache, params :: list(), options) ::
            {integer, [[term]] | nil}

(A)のpreparedが(B)のplannedになって(C)にquery_cacheとして渡されます ということだった。そう書いておいてほしかった~

めちゃくちゃ簡易な実装をするのであれば(=キャッシュを使わないのであれば) Adapter.Queryable.prepare/2 では何もしなくていいのかもしれないです。

prepare execute 間でETSを挟むことから、おそらくクエリの準備自体にDBアクセスを挟むなどの場合に結果をキャッシュして高速化を図るための仕様なのだと思います。今回に限って言えば問題にならないポイントでした。

というわけで今回は Adapter.Queryable.prepare/2 に来た2引数をそのまま Adapter.Queryable.execute/6 に渡すことにしました。

(ところで execute/6 じゃなくて execute/5 では…?)

execute/6stream/6

長々と prepare/2 を追いかけましたが、対してこれは非常にシンプルで、クエリに応じたデータ列をかえせばいいだけです。ドキュメントも(query_cache の正体さえわかっていれば)必要十分だと思います。
(※フラグです。後で引っかかります)

実装

長いコードジャンプの旅を終えて、ようやく実装を始めることができます。
かんたんに以下のような方針で実装を行いました。

  • NimbleCSV
    • 比較的シンプルなCSVパーサ
  • 1CSVファイルごとに1つ GenServer を建てる: Table
    • CSVの中身をオンメモリで保持するため
    • ファイルパスやCSV or TSV、ヘッダーの有無など固有の情報も保持する
  • ↑をまとめるDynamicSupervisor: Database

TLDR:実装物↓
https://github.com/ndac-todoroki/ecto_csv
めちゃくちゃ雑なのは許してください。

大まかに動かすとこんな感じです。

defmodule Friends.Repo do
  use Ecto.Repo,
    otp_app: :friends,
    adapter: EctoCsv.Adapter,
    read_only: true
end

defmodule Friends.Person do
  use Ecto.Schema
  import EctoCsv.Database.Settings

  @primary_key false
  @table_settings path: "person.csv", format: :csv, name: "people"
  schema "people" do

    field :age, :integer, default: 0
    field :name, :string
  end
  set_table!(@table_settings)
end

iex(1)> Friends.Person |> Friends.Repo.all()
[
  %Friends.Person{
    __meta__: #Ecto.Schema.Metadata<:loaded, "people">,
    age: 17,
    name: "Taro Otsuka"
  },
  %Friends.Person{
    __meta__: #Ecto.Schema.Metadata<:loaded, "people">,
    age: 18,
    name: "Hanako Koji"
  },
  %Friends.Person{
    __meta__: #Ecto.Schema.Metadata<:loaded, "people">,
    age: 35,
    name: "Masuo Fuguta"
  }
]

うごいた!

実装で困ったこと

execute/6stream/6 のドキュメントも辛かった

フラグ回収です。
execute/6stream/6 、まるで兄弟メソッド…というかStreamかそうじゃないかの違いしかないぜ!みたいな顔をしていますよね?
それぞれ返すべきEnumerable.tの形が違うんです。

execute/6 の場合

2要素のタプルを返しますが、1要素目が結果の総行数、2要素目が結果のリストです。結果は1行を要素のリストで返すので、実際にはリストのリストになります。

iex> execute(...)
{3, [[1, 1 1], [2, 2, 2], [3, 3, 3]]}
stream/6 の場合

execute/6 と同じ形のタプルのStreamを返す必要があります。しかし1行ずつです。上と同じデータを返すとこうなるはずです。

iex> stream(...) |> Enum.to_list()
[
  {1, [[1, 1, 1]]},
  {2, [[2, 2, 2]]},
  {3, [[3, 3, 3]]}
]

ちなみに呼び出し部分では

stream
|> Stream.flat_map(fn {_, rows} -> rows end)
|> Stream.map(preprocessor)
|> Stream.map(postprocessor)

となっていて第1要素が捨てられています。ええぇ…

出来なかったこと

スキーマファイルのModule Attributeをいい感じに読めない

スキーマを書くときにこういう書き方をしますね

defmodule Person do
  use Ecto.Schema

  @primary_key {:id, :binary_id, autogenerate: true}
  @foreign_key_type :binary_id
  schema "people" do
    field :name, :string
  end
end

これ Ecto.Schema.schema/2 というマクロなんですが、この展開時に特定のModule Attributesについて読み込みを行うことでPrimaryKeyなどの設定がなされています。

ここにオレオレAdapterの際に利用するAttributeほしいじゃないですか!

defmodule Person do
  use Ecto.Schema
  use EctoCsv.Schema

  @csv_file_path "~/Documents/people.csv"
  @csv_file_type :csv
  @csv_has_header false
  schema "people" do
    field :name, :string
  end
end

Ecto.Schema.schema/2 に拡張性がなくて断念しました。最終的には set_table! というマクロを呼ぶことでお茶を濁しています。

Schemaと結果行のマッピング

今回は :all のみ、しかもリストをそのまま返すだけ、というとても雑な実装をしています。

今回Ecto.Schemaの内部までは見ていないんですが、おそらく EctoCsv.Adapter.Queryable.execute の結果の各行の各要素をSchemaの構造体に詰めていると考えられます。(EctoCsv.Adapter.Queryable.execute が返すのは値のリストだし、 schema do ... end の中の順番を変えると構造体のキーが入れ替わることからも観察できる。)

なので本来はSchemaとデータの各行の対応付けをする必要があります。

例えば select を行うためには EctoCsv.Adapter.Queryable.execute が各データ行の対応する要素を削除する必要があるし、もしCSVにヘッダーがあるのならば、行の名前を利用してデータをソートしてあげることで、Schema定義を順不同で書いても問題なくなります。

EctoCsv.Adapter.Queryable.execute ではこの他にも limit where などの処理を行う必要があります。
…が、正直あとは楽しい関数型データ処理をするだけなので楽にできるはず!

まとめ

  • Ectoのドキュメント薄くて辛かった
  • Adapterは Repo にほしい関数に合わせてBehaviourを実装していくといいです
  • prepare の結果は execute の第3引数にラップされた状態で渡ります
  • CSVを読むアダプターが書けました
    • READだけなら比較的簡素に書くことが出来ます

皆さんもぜひEctoのAdapter書いてみてください!

明日は

enerickさんの担当です!お楽しみに!

17
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
2