Data Platform Group という部署で機械学習システムの開発・運用をしている yubessy です。
TL; DR
- sqlcsv という RDB ↔ CSV のI/Oが手軽にできるツールを作ったよ
- 小規模なバッチ処理システムの構築に使えるよ
- INSERT文でカラムの順序や値の動的生成をしたいときに便利だよ
背景
私の部署では次のようなバッチ処理システムを複数運用しています。
- 新規ユーザに対しておすすめ求人リストを生成する
- 特定セグメントの求人について応募単価を推定する
新しいシステムの構築を簡単にするため、機械学習アルゴリズムやDB入出力などの共通部分はコンポーネントとして切り出しています。
これらのコンポーネント間ではCSVファイルでデータを受け渡しており、処理フローは次のようになることが多いです。
RDB → CSVファイル → 機械学習などの処理 → CSVファイル → RDB
この中で RDB ↔ CSV のI/Oに何を使うかちょっとした悩みどころでした。
既存ツール
扱うデータが数100MB程度と小規模なら、手軽なコマンドラインツール等があれば十分です。
しかし、次のように既存の手段はどれも一長一短がありました。
COPY, LOAD
MySQL, PosgreSQL にはデータをファイルからインポートするための COPY
や LOAD
といったコマンドが用意されています。
しかしこれらはINSERTのようなSQL標準構文と異なり、RDBごとにインタフェースが大きく異なります。
またリモートサーバからの入出力には対応していないことも多いです。
CSVKit
SELECT文の結果をCSVファイルにエクスポートする際には、CSVKit の sql2csv
コマンドが便利です。
ただしインポートに使える csvsql
コマンドは、 INSERT 文ではなくテーブル名を指定する方式のため、予めCSVファイルのカラム名や順序をテーブルに合わせて整形しておく必要があります。
またCSVKitは汎用的なCSV処理ツールとして作られており、 agate など様々なライブラリに依存していますが、これらは RDB ↔ CSV のI/Oには直接必要ないものも多いです。
Pandas
Pandasには read_sql
や to_sql
といった、 RDB <-> DataFrame のI/Oを実現する機能があります。
これらも便利なのですが、 to_sql
はCSVKitの csvsql
と同様テーブル名を指定する必要があります。
Embulk
データ基盤における大規模なETLではEmbulkを使っていますが、設定ファイルやプラグインの管理が少々複雑になります。
小規模なデータパイプラインはデータエンジニア以外が開発することもありキャッチアップに時間がかかるのは避けたいです。
つくったもの: sqlcsv
上記のツールのうち、CSVKitの sql2csv
, csvsql
コマンドの代替を想定してツールを作ってみました。
Python3 上で動作し、 Pip でインストールできます。
$ pip3 install sqlcsv
依存ライブラリは SQLAlchemy と Click です。
CSVの処理はPython標準ライブラリの csv パッケージを使っているため、Pandas等への依存はありません。
SQLite以外のDBドライバは別途インストールする必要があります。
# MySQL
$ pip3 install mysqlclient
# PostgreSQL
$ pip3 install psycopg2
使い方
たとえばMySQLに次のようなテーブルとデータがあるとして、
CREATE TABLE test(
id INT AUTO_INCREMENT PRIMARY KEY,
int_col INT,
float_col FLOAT,
varchar_col VARCHAR(255)
)
+----+---------+-----------+-------------+
| id | int_col | float_col | varchar_col |
+----+---------+-----------+-------------+
| 1 | 1 | 1 | aaa |
| 2 | 2 | 2 | bbb |
| 3 | NULL | NULL | NULL |
+----+---------+-----------+-------------+
SELECT文によるデータのエクスポートは次のように、
$ sqlcsv --db-url 'mysql://username:password@localhost:3306/dbname' select \
--sql 'SELECT * FROM test'
id,int_col,float_col,varchar_col
1,1,1.0,aaa
2,2,2.0,bbb
3,,,
INSERT文によるデータのインポートは次のようにできます。
$ cat input.csv
int_col,float_col,varchar_col
1,1.0,aaa
2,2.0,bbb
$ sqlcsv --db-url 'mysql://username:password@localhost:3306/dbname' insert \
--sql 'INSERT INTO test(int_col, float_col, varchar_col) VALUES (%s, %s, %s)' \
--types int,float,str --infle input.csv
DB接続URLは SQLCSV_DB_URL
という環境変数を設定していればそこから読み込むこともできます。
$ export SQLCSV_DB_URL='mysql://username:password@localhost:3306/dbname'
$ sqlcsv <subcommand> ...
sqlcsv insert
の特徴
sqlcsv select
の機能はCSVKitの sql2csv
とほぼ同じですがが、 sqlcsv insert
はCSVKitの csvsql
とは少し異なります。
特にテーブル名ではなくINSERT文を指定するため、RDBとCSVでカラム順序を替えたり、カラムの値を動的に生成したりできます。
$ cat input.csv
float_col,int_col
1.0,1
2.0,2
$ sqlcsv insert \
--sql 'INSERT INTO test(float_col, int_col, varchar_col) VALUES (%s, %s, NOW())' \
--types float,int --infle input.csv
--types
で各カラムの型を指定するのが少し面倒ですが、CSVのカラム型の自動推定は難しくバグの温床になりやすいので、あえて明示指定が必要なようにしています。
その他の機能
詳しくは https://github.com/yubessy/sqlcsv を御覧ください。
各オプションの説明を以下に書いておきます。
sqlcsv
-u, --db-url TEXT DB接続URL
-p, --pre-sql TEXT メイン処理の実行前に指定したSQLを実行
-P, --post-sql TEXT メイン処理の実行後に指定したSQLを実行
-X, --transaction 発行されるSQLを同一トランザクション内で実行
-H, --no-header select 時にヘッダ行を出力しない / insert 時にCSVファイルがヘッダを持たないものとして扱う
-T, --tab 入出力をTSVにする
# 以下は Python の csv のオプションと同様 (https://docs.python.jp/3/library/csv.html)
-d, --delimiter TEXT
-l, --lineterminator TEXT
-Q, --quoting [ALL|MINIMAL|NONNUMERIC|NONE]
-q, --quotechar TEXT
-e, --escapechar TEXT
-B, --no-doublequote
sqlcsv select
-s, --sql TEXT SELECTクエリ
-f, --sqlfile FILENAME SELECTクエリのファイル名
-o, --outfile FILENAME 出力ファイル名
sqlcsv insert
-s, --sql TEXT INSERTクエリ
-f, --sqlfile FILENAME INSERTクエリのファイル名
-i, --infile FILENAME 入力ファイル名
-t, --types TEXT CSVの各カラムの型
-n, --nullables TEXT CSVの各カラムの null (None) を許容するか
-c, --chunk-size INTEGER レコードを指定行ずつチャンクに分割してINSERTを実行
おわりに
社内でCSVKitを使っていた箇所を置き換えてみましたが、今のところ想定していたメリットを得られています。
RDB ↔ CSV のI/Oで手頃なツールをお探しの場合は試していただけると良いかもしれません。