はじめに
Fluentdのバッチ版Embulkを試してみました。
大容量のデータ転送で困っていた時に役立った記憶があります。
概要としては以下のような感じ。
Embulk(エンバルク)とは、プラグ可能なマルチソースバルクデータローダーです。バルク処理に特化したプラグインベースのデータローダーで、大規模データセットのバルク転送を実施します。「データベース」「DWH」「NoSQL」「ファイル形式」「クラウドデータストア」などのデータ転送を強力にサポートします。
参考:https://www.ossnews.jp/oss_info/Embulk
基本説明も以下に載せておきます。
Embulk は、ストリーミング型ログ収集フレームワーク「fluentd」のバッチ版のようなデータ転送ツールです。特に、「1発実行」「日次バッチ処理」「定期バッチ処理」などのバルク処理用途に向いています。
転送元の「ファイル」「データベース」などからデータを吸い出し、転送先の「ストレージ」「データベース」などにロードするためのシンプルな仕組みを提供します。
プラグイン型アーキテクチャを採用しており、RubyやJavaでシンプルなコードを書くことで、さまざまな「データベース」「ファイルフォーマット」「ストレージ」に対して柔軟に対応できます。
参考:https://www.ossnews.jp/oss_info/Embulk
試してみること
- ローカル環境にEmbulkコンテナとMySQLコンテナを建てる。
- MySQLのデータをEmbulkを使って、リモート環境のBigQueryに転送する。
リモート環境
便宜上、先に転送先のBigQueryの説明だけ書いておきます。
BigQuery
テーブルはあえて用意しないで、プロジェクトとデータセットだけ作成しておきます。
※転送先にテーブルが無い場合、embulkが自動でテーブル作成&データ投入してくれる。
※以下写真参考。
EmbulkからBigQueryに転送するための権限を持ったサービスアカウントを作成する。
※以下写真参考。
作成したサービスアカウントの秘密鍵を作成ダウンロードする。
※以下写真参考。
この秘密鍵の入ったJsonファイルをEmbulk転送時に使うので、わかるところに保存しておいてください。
ローカル環境
docker-composeを使ってEmbulkとMySQLのコンテナ環境を作成する。
以下、ローカル環境全体のディレクトリ構成です。
$ tree
.
├── docker-compose.yml
├── embulk
│ ├── Dockerfile
│ └── service_account_iam_key.json
└── mysql
├── Dockerfile
├── ddl
│ ├── init.sql
│ ├── insert_departments.sql
│ └── insert_employee.sql
└── my.cnf
以下、docker-compose.yml、Embulk、MySQLの3つの環境設定について説明をしていきます。
docker-compose.yml
環境設定は以下の通り。
強いてポイントがあるとすれば、networkを作成することでホスト名でコンテナ間通信できるようにしている所くらいです。
version: '3'
services:
dbserver:
build: ./mysql
container_name: mysql_etl
volumes:
- ./mysql/my.cnf:/etc/mysql/my.cnf
- ./mysql/ddl:/docker-entrypoint-initdb.d
environment:
MYSQL_DATABASE: training
MYSQL_USER: fuka_user
MYSQL_PASSWORD: fuka_pass
MYSQL_ROOT_PASSWORD: root_pass
MYSQL_PORT: 3306
ports:
- 3306:3306
tty: true
embulk:
build: ./embulk
container_name: embulk_etl
volumes:
- ./embulk:/home
tty: true
networks:
etl_networks:
Embulkコンテナ
Dockerfile
環境設定は以下の通り。
FROM openjdk:8
# パッケージ管理システムのアップデート
RUN apt-get -y update && apt-get -y upgrade
# localeの設定
RUN apt-get -y install locales && \
localedef -f UTF-8 -i ja_JP ja_JP.UTF-8
ENV LANG ja_JP.UTF-8
ENV LANGUAGE ja_JP:ja
ENV LC_ALL ja_JP.UTF-8
# timezone (Asia/Tokyo)の設定
ENV TZ JST-9
# vim以外にも使いそうなもの一応インストール
RUN apt-get install -y vim git zip unzip less wget
# MySQLコンテナ接続確認するときのためにMySQL-clientをインストール
RUN apt-get install -y default-mysql-client
# embulkをインストール(https://www.embulk.org/)
RUN curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
RUN chmod +x ~/.embulk/bin/embulk
RUN echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
RUN . ~/.bashrc
# MySQL取得とBigQuery投入用のプラグインを入れておく
RUN ~/.embulk/bin/embulk gem install embulk-input-mysql \
&& ~/.embulk/bin/embulk gem install embulk-output-bigquery
WORKDIR /root
公開鍵の配置
BigQueryにアクセスするためのservice_accountの秘密鍵を、Dockerfileと同じ階層に配置する。
名前は「service_account_iam_key.json」としました。
├── embulk
│ ├── Dockerfile
│ └── service_account_iam_key.json
MySQLコンテナ
Dockerfile
環境設定は以下の通り。
FROM mysql:5.7
ADD ./my.cnf /etc/mysql/my.cnf
RUN chmod 644 /etc/mysql/my.cnf
ddl
以下の通り。
init.sql
DROP SCHEMA IF EXISTS training;
CREATE SCHEMA training;
USE training;
DROP TABLE IF EXISTS departments;
CREATE TABLE departments (
department_id int primary key NOT NULL AUTO_INCREMENT,
department_name varchar(20)
) ENGINE=INNODB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS employees;
CREATE TABLE employees (
employee_id int primary key NOT NULL AUTO_INCREMENT,
department_id int,
name varchar(20),
age int,
CONSTRAINT fk_department_id
FOREIGN KEY (department_id)
REFERENCES departments (department_id)
ON DELETE RESTRICT ON UPDATE RESTRICT
) ENGINE=INNODB DEFAULT CHARSET=utf8;
insert_departments.sql
INSERT INTO departments (department_name) VALUES ("総務");
INSERT INTO departments (department_name) VALUES ("営業");
INSERT INTO departments (department_name) VALUES ("開発");
insert_employee.sql
INSERT INTO employees (department_id, name, age) VALUES (1, "佐藤", 40);
INSERT INTO employees (department_id, name, age) VALUES (2, "加藤", 30);
INSERT INTO employees (department_id, name, age) VALUES (3, "田中", 25);
INSERT INTO employees (department_id, name, age) VALUES (3, "中村", 20);
my.cnf
特別な設定はしてません。
[mysqld]
character-set-server=utf8
[mysql]
default-character-set=utf8
[mysqld_safe]
log_error=/var/log/mysqld.log
pid_file=/var/run/mysqld/mysqld.pid
[client]
default-character-set=utf8
動作検証
Dockerコンテナの起動
以下の手順で起動しました。
$ docker-compose build --no-cache
$ docker-compose up -d
$ docker exec -it embulk_etl /bin/bash
ネットワーク接続チェック Embulkコンテナ→MySQLコンテナ
以下のように接続確認します。
# ping -c 3 172.28.0.3
PING 172.28.0.3 (172.28.0.3) 56(84) bytes of data.
64 bytes from 172.28.0.3: icmp_seq=1 ttl=64 time=2.49 ms
64 bytes from 172.28.0.3: icmp_seq=2 ttl=64 time=0.195 ms
64 bytes from 172.28.0.3: icmp_seq=3 ttl=64 time=0.129 ms
--- 172.28.0.3 ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 15ms
rtt min/avg/max/mdev = 0.129/0.939/2.493/1.099 ms
# ping -c 3 mysql_etl
PING mysql_etl (172.28.0.3) 56(84) bytes of data.
64 bytes from mysql_etl.mysql_docker_for_embulk_default (172.28.0.3): icmp_seq=1 ttl=64 time=0.327 ms
64 bytes from mysql_etl.mysql_docker_for_embulk_default (172.28.0.3): icmp_seq=2 ttl=64 time=0.132 ms
64 bytes from mysql_etl.mysql_docker_for_embulk_default (172.28.0.3): icmp_seq=3 ttl=64 time=0.128 ms
--- mysql_etl ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 22ms
rtt min/avg/max/mdev = 0.128/0.195/0.327/0.094 ms
# mysql -h mysql_etl -u fuka_user training -p
Enter password:
MySQL [training]>
ネットワーク接続チェック Embulkコンテナ→BigQuery
bqインストールします。(参考:https://cloud.google.com/sdk/docs/install#deb)
# echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list
# curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key --keyring /usr/share/keyrings/cloud.google.gpg add -
# apt-get update && apt-get install google-cloud-sdk
# RUN echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] http://packages.cloud.google.com/apt cloud-sdk main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list && curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key --keyring /usr/share/keyrings/cloud.google.gpg add - && apt-get update -y && apt-get install google-cloud-sdk -y
以下のように接続確認します。
# gcloud auth login
Go to the following link in your browser:
https://accounts.google.com/o/oauth2/auth?response_type=省略~
Enter verification code:
# bq ls
表示されれば接続確認ok
embulk実行
設定ファイル準備
- departmentsテーブル用
内容は以下の通りです。
# vim conf/embulk_guess_departments.yml
in:
type: mysql
user: fuka_user
password: fuka_pass
host: mysql_etl
database: training
query: select * from departments;
out:
type: bigquery
auth_method: json_key
json_keyfile: /home/service_account_iam_key.json
location: asia-northeast1
project: training-project-314502
dataset: test_embulk
table: departments
mode: append
auto_create_table: true
- employeesテーブル用
内容は以下の通りです。
# vim conf/embulk_guess_employees.yml
in:
type: mysql
user: fuka_user
password: fuka_pass
host: mysql_etl
database: training
query: select * from employees;
out:
type: bigquery
auth_method: json_key
json_keyfile: /home/service_account_iam_key.json
location: asia-northeast1
project: training-project-314502
dataset: test_embulk
table: employees
mode: append
auto_create_table: true
embulk実行
- departmentsテーブルの転送
実行手順は以下の通りです。
# embulk guess conf/embulk_guess_departments.yml -o conf/embulk_load_departments.yml
# embulk preview -G conf/embulk_load_departments.yml
~省略~
*************************** 1 ***************************
department_id ( long) : 1
department_name (string) : 総務
*************************** 2 ***************************
department_id ( long) : 2
department_name (string) : 営業
*************************** 3 ***************************
department_id ( long) : 3
department_name (string) : 開発
# embulk run conf/embulk_load_departments.yml
- employeesテーブルの転送
実行手順は以下の通りです。
# embulk guess conf/embulk_guess_employees.yml -o conf/embulk_load_employees.yml
# embulk preview -G conf/embulk_load_employees.yml
~省略~
*************************** 1 ***************************
employee_id ( long) : 1
department_id ( long) : 1
name (string) : 佐藤
age ( long) : 40
*************************** 2 ***************************
employee_id ( long) : 2
department_id ( long) : 2
name (string) : 加藤
age ( long) : 30
*************************** 3 ***************************
employee_id ( long) : 3
department_id ( long) : 3
name (string) : 田中
age ( long) : 25
*************************** 4 ***************************
employee_id ( long) : 4
department_id ( long) : 3
name (string) : 中村
age ( long) : 20
# embulk run conf/embulk_load_employees.yml
テーブルチェック
BigQueryを確認してみるとテーブルが作成され、データが投入されていることが確認できます!
コードレスでデータ転送できるので良い気がしてます・・!