はじめに
前回、MacにEmbulkコマンドのインストールまでをしました。
今回は、Embulkを使用してMySQLからElasticsearchへのデータ転送をしてみます。
環境構築
Mac上にDocker環境を構築しておきます。
- MySQL:5.7
- Elasticsearch:7.9.0
- Kibana:7.9.0
docker-compose
参考までにサンプルを用意しました。
version: '3.1'
services:
# MySQL
db:
image: mysql:5.7
container_name: my-example-mysql57
restart: always
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_ALLOW_EMPTY_PASSWORD: 1
TZ: "UTC"
volumes:
- ./mysql/init:/docker-entrypoint-initdb.d
- ./mysql/my.cnf:/etc/mysql/conf.d/my.cnf
- ./docker/mysql/data:/var/lib/mysql
- ./docker/mysql/log:/var/log/mysql
ports:
- 3357:3306
# Elasticsearch
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch-oss:7.9.0
container_name: my-example-elasticsearch79
volumes:
- ./docker/elasticsearch/data:/usr/share/elasticsearch/data
ports:
- 9279:9200
expose:
- 9300
environment:
- discovery.type=single-node
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- http.host=0.0.0.0
- transport.host=127.0.0.1
# Kibana
kibana:
image: docker.elastic.co/kibana/kibana-oss:7.9.0
container_name: my-example-kibana79
ports:
- 5679:5601
volumes:
my-example-elasticsearch79-data:
driver: local
転送元データの準備
検証のため簡単なテーブルとデータを用意しました。
CREATE DATABASE myembulk DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
grant select,update,insert,delete on myembulk.* to root@localhost identified by 'root';
flush privileges;
use myembulk;
CREATE TABLE myblog (
id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
title varchar(255) NOT NULL,
description text,
status tinyint(3) unsigned NOT NULL DEFAULT '0',
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO myblog VALUES (1,'ブログタイトル1','ブログ本文1',1);
INSERT INTO myblog VALUES (2,'ブログタイトル2','ブログ本文2',1);
INSERT INTO myblog VALUES (3,'ブログタイトル3','ブログ本文3',1);
INSERT INTO myblog VALUES (4,'ブログタイトル4','ブログ本文4',1);
INSERT INTO myblog VALUES (5,'ブログタイトル5','ブログ本文5',1);
INSERT INTO myblog VALUES (6,'ブログタイトル6(削除済)','ブログ本文6',0);
Dockerコンテナを起動
こんな感じでできていれば、成功です。
$ docker-compose up -d
$ docker-compose ps
Name Command State Ports
-------------------------------------------------------------------------------------------------------
my-example-elasticsearch79 /tini -- /usr/local/bin/do ... Up 0.0.0.0:9279->9200/tcp, 9300/tcp
my-example-kibana79 /usr/local/bin/dumb-init - ... Up 0.0.0.0:5679->5601/tcp
my-example-mysql57 docker-entrypoint.sh mysqld Up 0.0.0.0:3357->3306/tcp, 33060/tcp
これで事前に必要な準備は完了です。次は本題のEmbulkです。
Embulk
プラグインのインストール
データ転送元はMySQL、転送先がElasticsearchになるので、プラグインを入れておいてください。
$ embulk gem install embulk-input-mysql
$ embulk gem install embulk-output-elasticsearch
環境変数の設定
今回はdirenvを使用して.envrc
ファイルに以下のように記載しましたが、この限りではありません。
# MySQL
export DB_ADDRESS=127.0.0.1
export DB_PORT=3357
export DB_SCHEMA=myembulk
export DB_USER=root
export DB_PASSWORD=root
# Elasticsearch
export ES_ADDRESS=127.0.0.1
export ES_PORT=9279
Embulkの設定
Liquidテンプレートを使用して、MySQLからElasticsearchへデータを転送するEmbulkの設定ファイルを書きます。
先ほど設定した環境変数は、{{ env.hoge }}
という表現になっていますね。
{% capture today %}{{ 'now' | date: '%Y%m%d' }}{% endcapture %}
in:
type: mysql
host: {{ env.DB_ADDRESS }}
port: {{ env.DB_PORT }}
user: {{ env.DB_USER }}
password: {{ env.DB_PASSWORD }}
database: {{ env.DB_SCHEMA }}
table: myblog
select: "id, title, description, status"
where: "status = 1"
default_column_options:
TIMESTAMP: { type: string, timestamp_format: "%Y-%m-%d %H:%M:%S"}
out:
type: elasticsearch
mode: insert
nodes:
- {host: {{ env.ES_ADDRESS }}, port: {{ env.ES_PORT }}}
index: myblog_{{ today }}
index_type: myblog
Embulkで用意したファイルはこれだけです。
Embulkの実行
$ embulk run mysql2elasticsearch.yml.liquid
エラーにならなければ、成功です。
データ転送先の確認
先ほどの、docker-composeを使用したのであれば、以下のURLでKibanaから確認できます。
$ open http://localhost:5679
さいごに
今回はDockerで最小限のデータ転送を試しました。
思ったより簡単にEmbulkの導入ができました。(Docker準備の方が時間がかかった・・)
実際のプロダクトでは、環境変数の他にセキュリティやネットワークの認証などの設定が増えそうですが、それでもEmbulk自体はシンプルな作りになると思います。