Edited at

マスタ情報を持つMySQLテーブルをTreasureDataへ簡単に転送する方法

More than 3 years have passed since last update.

データ解析には、マスタ情報(商品マスタ/会員マスタなど)が必要になるケースもあります。

そのとき、様々なデータソースからTreasureDataへレコードを一括投入できる、td import:autoコマンドが便利です。


対応しているデータソース


  • csv : カンマ区切りのファイル

  • tsv : タブ区切りのファイル

  • json : 各行にレコードがJSON形式で書かれたファイル

  • msgpack : 各行にレコードがmsgpack形式で書かれたファイル

  • apache : apache combined形式のログファイル

  • mysql : MySQLへJDBCドライバ経由で接続してエクスポートできるコネクタ

  • regex : キャプチャ付き正規表現を指定する?

cf. http://docs.treasuredata.com/articles/bulk-import


基本的な使い方


CSVファイル

MySQLからある条件で抽出したものや、マスキングしたデータをCSV化してから取り込む時にも使えます。

CSVファイルであれば、次のように指定すれば簡単にTreasureDataへ取り込めます。

# もしデータベースを新規に作る場合には実行(既にある場合は省略する)

$ td db:create my_db

# 取り込む先のテーブルを作る
$ td table:create my_db my_tbl

# インポートする
$ td import:auto \
--format csv --column-header \
--time-column date_time \
--time-format "%Y-%m-%d %H:%M:%S" \
--auto-create my_db.my_tbl \
./data.csv

# もし時刻カラムが無ければ次のように指定する
# date +%s コマンドの実行結果である、その時のUNIXTIMEを利用します
$ td import:auto \
--format csv --column-header \
--time-value `date +%s` \
--auto-create my_db.my_tbl \
./data.csv

cf. http://docs.treasuredata.com/articles/bulk-import-from-csv


MySQLテーブル

接続先とデータベース、テーブルを指定すると、自動的にエクスポート・アップロードが行えます。

なお、時刻の列が無い場合には--time-value引数で固定的に指定します。

$ td db:create my_db

$ td table:create my_db my_tbl
$ td import:auto \
--auto-create my_db.my_tbl \
--format mysql \
--db-url jdbc:mysql://my_sql_host/my_sql_db \
--db-user my_user \
--db-password my_pass \
--time-column date_time \
my_sql_tbl

cf. http://docs.treasuredata.com/articles/bulk-import-from-mysql


Tips


--all-string

デフォルトでは、1つ目のレコードから自動判定された型を用いて処理が進められます。

もし1つめでは数値だが、2つめのレコードで文字列となるデータを用いる場合には問題が発生します。

TreasureDataに入れた後に動的にschema設定が出来るので、とりあえずこのオプションを指定しても支障は無いでしょう。

但しこのオプションを指定する弊害があり、msgpack化した際の容量が大きくなるため、遅くなるかもしれません。

もし気になる場合には、次のオプションを用いて型の指定を行うと良いです。

--column-types TYPE,TYPE,...     column types [string, int, long, double]

--column-type NAME:TYPE column type [string, int, long, double]. A pair of column name and type can be specified like 'age:int'


--parallel / --prepare-parallel

デフォルトではどちらも2ですが、CPUのコア数に応じて増やすと良いでしょう。

--parallel NUM                   upload in parallel (default: 2; max 8)

--prepare-parallel NUM prepare in parallel (default: 2; max 96)

--parallel option specifies how many threads to be used for uploading the data. If you observe bulk import tool is not saturating your network, please consider increasing the value of the --parallel option.

--prepare-parallel option specifies the number of threads to be used to compress the data locally. Normally this number should match the number of CPU cores on your machine.


--time-value

TreasureDataはtimeキーでパーティショニングしているため、これがばらけているとパフォーマンスが上がります。

しかし取り込むデータによってはタイムスタンプが無い時もあります。そんな時は現在の時刻をunixtimeで指定すると良いでしょう。

$ date +%s

1421832388

但し"0"だけは設定しないようにと次のようにドキュメントに記載されていました。


Please don’t specify ‘0’ if you don’t have time column. Treasure Data is partitioning the data by time by default (See Data Partitioning). It’s recommended to always specify the time column, or specify the current time.

http://docs.treasuredata.com/articles/bulk-import-tips-and-tricks



プロキシ経由で使う

HTTP_PROXY環境変数にセットしましょう。

# Windows:

$ set HTTP_PROXY=http://proxy_host:8080
# Other:
$ export HTTP_PROXY="proxy_host:8080"

Unix/Linux系であれば、このように実行のみの環境変数としても良いですね。

$ HTTP_PROXY="proxy_host:8080" td import


そのほかのTips

td import:autoを2度実行したら上書きはされず、重複して記録されます。

入れ替える場合には1度別名で作成した後に、td table:swapを用いと良いでしょう。

$ td table

Additional commands, type "td help COMMAND" for more details:

table:list [db] # Show list of tables
table:show <db> <table> # Describe information of a table
table:create <db> <table> # Create a table
table:delete <db> <table> # Delete a table
table:import <db> <table> <files...> # Parse and import files to a table
table:export <db> <table> # Dump logs in a table to the specified storage
table:swap <db> <table1> <table2> # Swap names of two tables
table:tail <db> <table> # Get recently imported logs
table:partial_delete <db> <table> # Delete logs from the table within the specified time range
table:expire <db> <table> <expire_days> # Expire data in table after specified number of days


併せて読みたい