LoginSignup
34

More than 5 years have passed since last update.

Embulkを使って、RDSのデータをBigQueryへロード

Last updated at Posted at 2017-06-21

はじめに(追加)

参考として、Embulkを簡単に始められるサンプルコードを作ってみましたので、
よかったら使ってみて下さい。

動機

  • 既にBigQueryを使っていた
  • データベースにあるデータもJOINして便利なデータを取得したかった(re:dashでやろうとしてた)
  • なら、データベースにある欲しいデータもBigQueryにあげてしまえば良いのでは?

Embulkとは

  • RDSのデータをBigQueryのようなDWHへ簡単にバルクインサート出来る代物
  • 色んなケースがあると思いますが、ユーザの行動データ(所謂ログのようなもの)を全てRDSに保存していて、データが多くなりすぎて、RDSでの解析が辛くなってきた時などに使うケースが多いぽいです
  • 本家: https://github.com/embulk/embulk
    • これを見るとわかりますが、RDSだけでなく、CSVやS3、Salesforce等をInputし、
    • RedisやHive、ElasticSearach、BigQueryにbulk loadしてくれます

テストでEmbulkする手順

  • 1. BigQuery側にテスト用のDatasetとTableを作る
  • 2. テスト用にEC2インスタンス(Embulkを動かすサーバ)を立ち上げる
  • 3. テスト用のRDSインスタンスを立ち上げる
  • 4. テスト用のRDSに、テストデータを大量に(1万件程)突っ込む
  • 5. embulkをインストールし、設定ファイルを書き、実際に動かしてみる

1. BigQuery側にテスト用のDatasetとTableを作る

まずはDatasetを作る

名前はtest_for_embulkとしました。

$ bq mk test_for_embulk
Dataset 'test-bigquery-project-166901:test_for_embulk' successfully created.
$ bq ls
     datasetId
 -----------------
  test2
  test_dataset
  test_for_embulk

※ test2とtest_datasetは以前使ったテスト用のdetasetなので無視して下さい

テーブルを作る

今回は、ユーザテーブルを想定し、下記のようなスキームのテーブルを作ります。

$ cat schema.json
[
  {"name": "user_id", "type": "integer", "mode": "required"},
  {"name": "user_name", "type": "string", "mode": "required"},
  {"name": "user_age", "type": "integer", "mode": "required"}
]

じゃあ実際に作っていきます。
bq mkで、スキーマに添って空のテーブルを作る事が可能です。

$ bq mk --schema schema.json -t test_for_embulk.user_data

Table 'test-bigquery-project-166901:test_for_embulk.user_data' successfully created.

$ bq show test_for_embulk.user_data
Table test-bigquery-project-166901:test_for_embulk.user_data

   Last modified                Schema                Total Rows   Total Bytes   Expiration   Labels
 ----------------- --------------------------------- ------------ ------------- ------------ --------
  21 Jun 18:34:23   |- user_id: integer (required)    0            0
                    |- user_name: string (required)
                    |- user_age: integer (required)

想定通りのテーブルが出来てます。

EC2インスタンスと、RDSを立ち上げる

本当は適当にterraformで立ち上げたかったのですが、意外とやること多いのと、無駄にハマったので、
今回は普通にコンソールからEC2インスタンスとRDSインスタンスを立ち上げました。

Terraformに関してはまた別途記事UPします。

で、EC2インスタンスからRDSに接続出来るようにセキュリティを空けておいて、3306で接続出来る事を確認しておきます。

4. テスト用のRDSに、テストデータを大量に(1万件程)突っ込む

まずはデータベースを作成しておきます。

mysql> CREATE DATABASE test_for_embbulk;
Query OK, 1 row affected (0.01 sec)

次に、テスト用のテーブルを用意します。

$ cat create_table.sql
CREATE TABLE user_data (
  user_id INT PRIMARY KEY AUTO_INCREMENT,
  user_name VARCHAR(45),
  user_age INT
);

こんな感じのテーブルにしました。
一応作成しておきます。

mysql> use test_for_embbulk
Database changed
mysql> source create_table.sql
Query OK, 0 rows affected (0.03 sec)

mysql> show create table user_data;
+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table     | Create Table                                                                                                                                                                                                        |
+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| user_data | CREATE TABLE `user_data` (
  `user_id` int(11) NOT NULL AUTO_INCREMENT,
  `user_name` varchar(45) DEFAULT NULL,
  `user_age` int(11) DEFAULT NULL,
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 |
+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)

テストデータを生成するスクリプトを作っておく

ランダムな名前と年齢を生成します。
user_dataテーブルのidはauto_incrementなので無視してます。

# coding:utf-8
import random
import time

# ランダムな名前の組み合わせを生成
def generateRandomName():
    last = ["nakagawa", "aso", "mori", "yamamoto", "abe", "moritomo", "kakefu", "kuwata"]
    first = ["kesuke", "taro", "junichiro", "masumi", "kojiro", "sanjuro"]
    return random.choice(last) + random.choice(first)

# 出力するファイル名
OUTPUT_FILE = "test_insert.sql"

# 登録するサンプル数 - 1万
RECORD_COUNT = 10000

sqlCommands = "USE test_for_embbulk;\n"
sqlCommands += "INSERT INTO user_data (user_name, user_age) values \n"

# 登録するデータの数だけINSERT文を生成
for _ in range(RECORD_COUNT):
    # ランダムなデータからInsert文を生成
    sqlCommands += "('{}', '{}'),\n"\
                   .format(generateRandomName(), random.randint(1, 99))


# 生成したSQLコマンドをファイルに書き出す
f = open(OUTPUT_FILE, 'w')
f.write(sqlCommands)
f.close()

ちゃんと出来てるか見てみましょう。

$ python createSQL.py

$ head -10 test_insert.sql
USE test_for_embbulk;
INSERT INTO user_data (user_name, user_age) values
('nakagawakojiro', '81'),
('moritomosanjuro', '17'),
('nakagawakesuke', '44'),
('abetaro', '81'),
('asosanjuro', '18'),
('yamamototaro', '67'),
('kakefukojiro', '54'),
('kuwatajunichiro', '63'),

$ cat test_insert.sql | wc -l
10001

OKそうなのでぶち込みます。

mysql > source test_insert.sql

mysql > use test_for_embbulk
mysql> select * from user_data limit 10;
+---------+-----------------+----------+
| user_id | user_name       | user_age |
+---------+-----------------+----------+
|       1 | nakagawakojiro  |       81 |
|       2 | moritomosanjuro |       17 |
|       3 | nakagawakesuke  |       44 |
|       4 | abetaro         |       81 |
|       5 | asosanjuro      |       18 |
|       6 | yamamototaro    |       67 |
|       7 | kakefukojiro    |       54 |
|       8 | kuwatajunichiro |       63 |
|       9 | nakagawakojiro  |       21 |
|      10 | kuwatajunichiro |       32 |
+---------+-----------------+----------+
10 rows in set (0.00 sec)

よしよし。準備はOK

5. embulkをインストールし、設定ファイルを書き、実際に動かしてみる

ようやく本題。

  • Embulkのインストール
$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"

$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc
  • 必要なプラグインをインストール

今回はMySQLからインプットするプラグインと、
BigQueryにアウトプットするプラグインをインストールしておきます。

$ embulk gem install embulk-input-mysql
2017-06-21 14:10:46.049 +0000: Embulk v0.8.25
Fetching: embulk-input-mysql-0.8.3.gem (100%)
Successfully installed embulk-input-mysql-0.8.3
1 gem installed

$ embulk gem install embulk-output-bigquery
... 長いので省略 ...
25 gems installed

BQの読み込みに必要なサービスアカウントを作っておく

  • Google Cloud Platformのサービスアカウントから、アカウントを作成する
  • BigQueryの編集権限を与えておく(必要があると思う)

スクリーンショット_2017-06-21_23_16_47.png

ここで「新しい秘密鍵の提供」にチェックを入れておくと、作成完了後に自動的にDLされるので大事に保管します。
(※ 後で使います)

  • 設定ファイルを書いていきます(fluentdぽい感じ)
in:
  type: mysql
  host: xxxx # (ホスト名)
  user: xxxx # (DBのユーザ名)
  password: xxxx # (DBのPW)
  database: test_for_embbulk
  table: user_data
  select: user_id, user_name, user_age # (取得したいカラム)
out:
  type: bigquery
  auth_method: json_key
  json_keyfile: xxx #先程ダウンロードしたjsonファイルのパスを指定
  path_prefix: /tmp/ # 一時ファイル作成場所
  file_ext: .csv.gz
  source_format: CSV
  project: test-bigquery-project-166901
  dataset: test_for_embulk
  auto_create_table: true # 自動でTBL生成(既にあったのでfalseにしてたのですが、うまく行かず、trueにしました)
  table: user_data
  # schema_file: schema.json
  formatter: {type: csv, charset: UTF-8, delimiter: ',', header_line: false}
  encoders:
  - {type: gzip}

いざ実行

$ embulk run user_data.yml

実際は諸々途中経過が出ますが、私の環境だと1万レコード分で、1分くらいで終わりました。

確認


# ちゃんと1万件が入ってるを確認出来ました
$ bq query "select count(*) FROM [test-bigquery-project-166901:test_for_embulk.user_data];"
Waiting on bqjob_r252312d2758c6820_0000015ccb263ca3_1 ... (0s) Current status: DONE
+-------+
|  f0_  |
+-------+
| 10000 |
+-------+

# データもちゃんと入っていました
$ bq query "select user_id, user_name, user_age FROM [test-bigquery-project-166901:test_for_embulk.user_data] order by user_id limit 20"
Waiting on bqjob_r28aa5c45b85ea978_0000015ccb275650_1 ... (0s) Current status: DONE
+---------+-------------------+----------+
| user_id |     user_name     | user_age |
+---------+-------------------+----------+
|       1 | nakagawakojiro    |       81 |
|       2 | moritomosanjuro   |       17 |
|       3 | nakagawakesuke    |       44 |
|       4 | abetaro           |       81 |
|       5 | asosanjuro        |       18 |
|       6 | yamamototaro      |       67 |
|       7 | kakefukojiro      |       54 |
|       8 | kuwatajunichiro   |       63 |
|       9 | nakagawakojiro    |       21 |
|      10 | kuwatajunichiro   |       32 |
|      11 | moritaro          |       20 |
|      12 | moritomomasumi    |        4 |
|      13 | yamamotomasumi    |       28 |
|      14 | yamamotokesuke    |       65 |
|      15 | moritomosanjuro   |       24 |
|      16 | morisanjuro       |       14 |
|      17 | asokesuke         |       68 |
|      18 | nakagawamasumi    |       63 |
|      19 | abetaro           |       94 |
|      20 | yamamotojunichiro |       40 |
+---------+-------------------+----------+

総括

  • 今回は、MySQLのデータをEmbulkを使ってBigQueryにデータを転送してみました
  • Embulkは、MySQLとBigQueryだけでなく、あらゆるデータを並列で入れられる便利な代物です
  • 例えば、S3にある大量のデータを一時的にMySQLに入れる事も出来ますし
  • お客さんから貰ったCSVデータを一旦MySQLやRedisに入れる事も可能です
  • 非常に便利なツールで、プラグインもgemで簡単に追加出来るので色々試してみると可能性が広がりそうです

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
34