Edited at

Embulkを使って、RDSのデータをBigQueryへロード

More than 1 year has passed since last update.


はじめに(追加)

参考として、Embulkを簡単に始められるサンプルコードを作ってみましたので、

よかったら使ってみて下さい。

https://github.com/rikiMuneoka/embulk_sample


動機


  • 既にBigQueryを使っていた

  • データベースにあるデータもJOINして便利なデータを取得したかった(re:dashでやろうとしてた)



  • なら、データベースにある欲しいデータもBigQueryにあげてしまえば良いのでは?


Embulkとは


  • RDSのデータをBigQueryのようなDWHへ簡単にバルクインサート出来る代物

  • 色んなケースがあると思いますが、ユーザの行動データ(所謂ログのようなもの)を全てRDSに保存していて、データが多くなりすぎて、RDSでの解析が辛くなってきた時などに使うケースが多いぽいです

  • 本家: https://github.com/embulk/embulk


    • これを見るとわかりますが、RDSだけでなく、CSVやS3、Salesforce等をInputし、

    • RedisやHive、ElasticSearach、BigQueryにbulk loadしてくれます




テストでEmbulkする手順



  • 1. BigQuery側にテスト用のDatasetとTableを作る


  • 2. テスト用にEC2インスタンス(Embulkを動かすサーバ)を立ち上げる


  • 3. テスト用のRDSインスタンスを立ち上げる


  • 4. テスト用のRDSに、テストデータを大量に(1万件程)突っ込む


  • 5. embulkをインストールし、設定ファイルを書き、実際に動かしてみる


1. BigQuery側にテスト用のDatasetとTableを作る


まずはDatasetを作る

名前はtest_for_embulkとしました。

$ bq mk test_for_embulk

Dataset 'test-bigquery-project-166901:test_for_embulk' successfully created.
$ bq ls
datasetId
-----------------
test2
test_dataset
test_for_embulk

※ test2とtest_datasetは以前使ったテスト用のdetasetなので無視して下さい


テーブルを作る

今回は、ユーザテーブルを想定し、下記のようなスキームのテーブルを作ります。

$ cat schema.json

[
{"name": "user_id", "type": "integer", "mode": "required"},
{"name": "user_name", "type": "string", "mode": "required"},
{"name": "user_age", "type": "integer", "mode": "required"}
]

じゃあ実際に作っていきます。

bq mkで、スキーマに添って空のテーブルを作る事が可能です。

$ bq mk --schema schema.json -t test_for_embulk.user_data

Table 'test-bigquery-project-166901:test_for_embulk.user_data' successfully created.

$ bq show test_for_embulk.user_data
Table test-bigquery-project-166901:test_for_embulk.user_data

Last modified Schema Total Rows Total Bytes Expiration Labels
----------------- --------------------------------- ------------ ------------- ------------ --------
21 Jun 18:34:23 |- user_id: integer (required) 0 0
|- user_name: string (required)
|- user_age: integer (required)

想定通りのテーブルが出来てます。


EC2インスタンスと、RDSを立ち上げる

本当は適当にterraformで立ち上げたかったのですが、意外とやること多いのと、無駄にハマったので、

今回は普通にコンソールからEC2インスタンスとRDSインスタンスを立ち上げました。

Terraformに関してはまた別途記事UPします。

で、EC2インスタンスからRDSに接続出来るようにセキュリティを空けておいて、3306で接続出来る事を確認しておきます。


4. テスト用のRDSに、テストデータを大量に(1万件程)突っ込む

まずはデータベースを作成しておきます。

mysql> CREATE DATABASE test_for_embbulk;

Query OK, 1 row affected (0.01 sec)

次に、テスト用のテーブルを用意します。

$ cat create_table.sql

CREATE TABLE user_data (
user_id INT PRIMARY KEY AUTO_INCREMENT,
user_name VARCHAR(45),
user_age INT
);

こんな感じのテーブルにしました。

一応作成しておきます。

mysql> use test_for_embbulk

Database changed
mysql> source create_table.sql
Query OK, 0 rows affected (0.03 sec)

mysql> show create table user_data;
+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| user_data | CREATE TABLE `user_data` (
`user_id` int(11) NOT NULL AUTO_INCREMENT,
`user_name` varchar(45) DEFAULT NULL,
`user_age` int(11) DEFAULT NULL,
PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 |
+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)


テストデータを生成するスクリプトを作っておく

ランダムな名前と年齢を生成します。

user_dataテーブルのidはauto_incrementなので無視してます。

# coding:utf-8

import random
import time

# ランダムな名前の組み合わせを生成
def generateRandomName():
last = ["nakagawa", "aso", "mori", "yamamoto", "abe", "moritomo", "kakefu", "kuwata"]
first = ["kesuke", "taro", "junichiro", "masumi", "kojiro", "sanjuro"]
return random.choice(last) + random.choice(first)

# 出力するファイル名
OUTPUT_FILE = "test_insert.sql"

# 登録するサンプル数 - 1万
RECORD_COUNT = 10000

sqlCommands = "USE test_for_embbulk;\n"
sqlCommands += "INSERT INTO user_data (user_name, user_age) values \n"

# 登録するデータの数だけINSERT文を生成
for _ in range(RECORD_COUNT):
# ランダムなデータからInsert文を生成
sqlCommands += "('{}', '{}'),\n"\
.format(generateRandomName(), random.randint(1, 99))

# 生成したSQLコマンドをファイルに書き出す
f = open(OUTPUT_FILE, 'w')
f.write(sqlCommands)
f.close()

ちゃんと出来てるか見てみましょう。

$ python createSQL.py

$ head -10 test_insert.sql
USE test_for_embbulk;
INSERT INTO user_data (user_name, user_age) values
('nakagawakojiro', '81'),
('moritomosanjuro', '17'),
('nakagawakesuke', '44'),
('abetaro', '81'),
('asosanjuro', '18'),
('yamamototaro', '67'),
('kakefukojiro', '54'),
('kuwatajunichiro', '63'),

$ cat test_insert.sql | wc -l
10001

OKそうなのでぶち込みます。

mysql > source test_insert.sql

mysql > use test_for_embbulk
mysql> select * from user_data limit 10;
+---------+-----------------+----------+
| user_id | user_name | user_age |
+---------+-----------------+----------+
| 1 | nakagawakojiro | 81 |
| 2 | moritomosanjuro | 17 |
| 3 | nakagawakesuke | 44 |
| 4 | abetaro | 81 |
| 5 | asosanjuro | 18 |
| 6 | yamamototaro | 67 |
| 7 | kakefukojiro | 54 |
| 8 | kuwatajunichiro | 63 |
| 9 | nakagawakojiro | 21 |
| 10 | kuwatajunichiro | 32 |
+---------+-----------------+----------+
10 rows in set (0.00 sec)

よしよし。準備はOK


5. embulkをインストールし、設定ファイルを書き、実際に動かしてみる

ようやく本題。


  • Embulkのインストール

$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"

$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc


  • 必要なプラグインをインストール

今回はMySQLからインプットするプラグインと、

BigQueryにアウトプットするプラグインをインストールしておきます。

$ embulk gem install embulk-input-mysql

2017-06-21 14:10:46.049 +0000: Embulk v0.8.25
Fetching: embulk-input-mysql-0.8.3.gem (100%)
Successfully installed embulk-input-mysql-0.8.3
1 gem installed

$ embulk gem install embulk-output-bigquery
... 長いので省略 ...
25 gems installed


BQの読み込みに必要なサービスアカウントを作っておく


  • Google Cloud Platformのサービスアカウントから、アカウントを作成する

  • BigQueryの編集権限を与えておく(必要があると思う)

ここで「新しい秘密鍵の提供」にチェックを入れておくと、作成完了後に自動的にDLされるので大事に保管します。

(※ 後で使います)


  • 設定ファイルを書いていきます(fluentdぽい感じ)

in:

type: mysql
host: xxxx # (ホスト名)
user: xxxx # (DBのユーザ名)
password: xxxx # (DBのPW)
database: test_for_embbulk
table: user_data
select: user_id, user_name, user_age # (取得したいカラム)
out:
type: bigquery
auth_method: json_key
json_keyfile: xxx #先程ダウンロードしたjsonファイルのパスを指定
path_prefix: /tmp/ # 一時ファイル作成場所
file_ext: .csv.gz
source_format: CSV
project: test-bigquery-project-166901
dataset: test_for_embulk
auto_create_table: true # 自動でTBL生成(既にあったのでfalseにしてたのですが、うまく行かず、trueにしました)
table: user_data
# schema_file: schema.json
formatter: {type: csv, charset: UTF-8, delimiter: ',', header_line: false}
encoders:
- {type: gzip}


いざ実行

$ embulk run user_data.yml

実際は諸々途中経過が出ますが、私の環境だと1万レコード分で、1分くらいで終わりました。


確認


# ちゃんと1万件が入ってるを確認出来ました
$ bq query "select count(*) FROM [test-bigquery-project-166901:test_for_embulk.user_data];"
Waiting on bqjob_r252312d2758c6820_0000015ccb263ca3_1 ... (0s) Current status: DONE
+-------+
| f0_ |
+-------+
| 10000 |
+-------+

# データもちゃんと入っていました
$ bq query "select user_id, user_name, user_age FROM [test-bigquery-project-166901:test_for_embulk.user_data] order by user_id limit 20"
Waiting on bqjob_r28aa5c45b85ea978_0000015ccb275650_1 ... (0s) Current status: DONE
+---------+-------------------+----------+
| user_id | user_name | user_age |
+---------+-------------------+----------+
| 1 | nakagawakojiro | 81 |
| 2 | moritomosanjuro | 17 |
| 3 | nakagawakesuke | 44 |
| 4 | abetaro | 81 |
| 5 | asosanjuro | 18 |
| 6 | yamamototaro | 67 |
| 7 | kakefukojiro | 54 |
| 8 | kuwatajunichiro | 63 |
| 9 | nakagawakojiro | 21 |
| 10 | kuwatajunichiro | 32 |
| 11 | moritaro | 20 |
| 12 | moritomomasumi | 4 |
| 13 | yamamotomasumi | 28 |
| 14 | yamamotokesuke | 65 |
| 15 | moritomosanjuro | 24 |
| 16 | morisanjuro | 14 |
| 17 | asokesuke | 68 |
| 18 | nakagawamasumi | 63 |
| 19 | abetaro | 94 |
| 20 | yamamotojunichiro | 40 |
+---------+-------------------+----------+


総括


  • 今回は、MySQLのデータをEmbulkを使ってBigQueryにデータを転送してみました

  • Embulkは、MySQLとBigQueryだけでなく、あらゆるデータを並列で入れられる便利な代物です

  • 例えば、S3にある大量のデータを一時的にMySQLに入れる事も出来ますし

  • お客さんから貰ったCSVデータを一旦MySQLやRedisに入れる事も可能です

  • 非常に便利なツールで、プラグインもgemで簡単に追加出来るので色々試してみると可能性が広がりそうです