はじめに
embulkとはデータの転送を効率よく行う仕組みです。自分でバッチプログラムを組んでDBへデータを転送するよりもはるかに効率的に行うことができます。今回はkotlinからembulkを操作してDB(MySQL)上にデータを転送してみたいと思います。また、これらをDockerコンテナ上で行いたいと思います。
embulkコンテナにはembulkを操作するjar(kotlinのコンパイルはローカルで行います)とembulkのconfigファイル, 転送データのCSVファイルを配置します。MySQLもDockerコンテナにします。
環境
kotlin : 1.3.70
gradle : 6.3
MySQL(Docker) : 5.7
Java(Docker) : Java8 Alpine Linux
embulkコンテナの構成ファイルは、以下の通りです。
/work/embulk.jar
(転送を実行する本体)
/work/config/config.yml
(embulk設定ファイル)
/work/config/test.csv
(転送するデータ)
プラグインのダウンロードとビルド
embulkでDBへの転送を行うためにはプラグインが必要となります。
単純にembulkをインストールしてshell上で実行する時は、embulk gem install xxx
でプラグインをインストールしますが、今回のようにkotlinから操作する場合には、プラグインのダウンロードとビルドを行う必要があります。
git clone git@github.com:embulk/embulk-output-jdbc.git
ビルドして必要なjarを取ってきます。
mkdir plugin
cd embulk-output-jdbc
./gradlew gem
cp embulk-output-mysql/default_jdbc_driver/mysql-connector-java-5.1.44.jar ../pliugin
cp embulk-output-mysql/build/libs/embulk-output-mysql-0.8.7.jar ../plugin
cp embulk-output-jdbc/build/libs/embulk-output-jdbc-0.8.7.jar ../plugin
embulkを操作するkotlin
ルートに戻ってgradleプロジェクトを作成します。
gradle init
build.gradleは以下のようになっております。
mainClassの指定は適宜行なってください。
plugins {
// Apply the Kotlin JVM plugin to add support for Kotlin.
id 'org.jetbrains.kotlin.jvm' version '1.3.70'
// Apply the application plugin to add support for building a CLI application.
id 'application'
}
repositories {
// Use jcenter for resolving dependencies.
// You can declare any Maven/Ivy/file repository here.
jcenter()
mavenCentral()
}
dependencies {
// Align versions of all Kotlin components
implementation platform('org.jetbrains.kotlin:kotlin-bom')
// Use the Kotlin JDK 8 standard library.
compile 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
// Use the Kotlin test library.
testImplementation 'org.jetbrains.kotlin:kotlin-test'
// Use the Kotlin JUnit integration.
testImplementation 'org.jetbrains.kotlin:kotlin-test-junit'
//embulk
compile("org.embulk:embulk-core:0.9.12")
compile("org.embulk:embulk-standards:0.9.12")
//plugin
compile fileTree(dir: 'plugin', include: ['*.jar'])
}
application {
// Define the main class for the application.
mainClassName = 'AppKt'
}
jar {
manifest {attributes 'Main-Class': 'qiita.embulk.AppKt'}
from {configurations.compile.collect {it.isDirectory() ? it : zipTree(it)}}
}
embulkの設定, preview, runを行うクラスを記述してみます。
package qiita.embulk
import com.google.inject.Binder
import com.google.inject.Module
import org.embulk.EmbulkEmbed
import org.embulk.EmbulkEmbed.Bootstrap
import org.embulk.config.ConfigSource
import org.embulk.output.MySQLOutputPlugin
import org.embulk.spi.OutputPlugin
import org.embulk.plugin.InjectedPluginSource
import java.io.*
class Embulk(){
lateinit var config : ConfigSource
lateinit var embulk : EmbulkEmbed
fun loadConfig(path : String){
println("Hello, Embulk!")
val bootstrap = Bootstrap()
bootstrap.addModules(DbOutputModule())
embulk = bootstrap.initializeCloseable()
try {
val loader = embulk.newConfigLoader()
config = loader.fromYamlFile(File(path))
} catch (e: Exception) {
println(e.toString())
}
}
fun preview(){
var result = embulk.preview(config)
println(result.schema.columns)
}
fun run(){
var result = embulk.run(config)
println(result.configDiff.toString())
}
internal class DbOutputModule : Module {
override fun configure(binder: Binder) {
InjectedPluginSource.registerPluginTo(
Binder,
OutputPlugin::class.java,
"mysql",
MySQLOutputPlugin::class.java
)
}
}
}
bootstrap.addMdules()
でプラグインを指定し、 bootstrap.initializeCloseable()
でembulkを起動しています。
プラグインの組み込みはInjectedPluginSource.registerPluginTo()
で行います。
config = loader.fromYamlFile(File(path))
でconfig.ymlの読み込みを行います。
実際にpreviewやrunを行うのはembulk.preview()
, embulk.run()
ですね。
メインクラスも記述します。
package qiita.embulk
fun main(args: Array<String>) {
val embulk = Embulk()
embulk.loadConfig("./config/config.yml")
embulk.preview()
embulk.run()
}
Dockerfileとdocker-compose.yml
MySQLのDockerfileは以下のようになります。
ルート配下にdockerディレクトリを作ります。
mkdir docker
docker/ にDockerfileとdocker-compose.ymlを配置します。
コピーするjarのファイル名の指定は適宜行なってください。
FROM openjdk:8-jre-alpine
RUN apk update && apk add bash
WORKDIR /work
COPY ./build/libs/qiita-embulk.jar /work
COPY ./src/main/resources/config.yml /work/config/
COPY ./src/main/resources/test.csv /work/config/
FROM mysql:5.7
COPY ./docker/db/sql/2_initDb.sh /docker-entrypoint-initdb.d
COPY ./docker/db/sql/1_initDb.sql /docker-entrypoint-initdb.d
version: '3'
services:
db:
image: mysql:5.7
container_name: demo_mysql
build:
context: ..
dockerfile: ./docker/Dockerfile_mysql
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: test-database
MYSQL_USER: embulkuser
MYSQL_PASSWORD: password
TZ: 'Asia/Tokyo'
volumes:
- ./db/my.cnf:/etc/mysql/conf.d/my.cnf
- ./db/sql:/docker-entrypoint-initdb.d
ports:
- 3306:3306
embulk:
image: embulk
container_name: demo_embulk
build:
context: ..
dockerfile: ./docker/Dockerfile_embulk
tty: true
docker/dbディレクトリを作り、my.cnfを配置します。
[mysqld]
character-set-server=utf8mb4
collation-server=utf8mb4_bin
sql_mode=''
[client]
default-character-set = utf8mb4
文字コードの設定と、MySQL5.7ではsql_mode=STRICT_ALL_TABLE
がデフォルトで有効になっており、これを外さないとNULL値を許可してくれなかったりして面倒な事になるかもしれないので一応外します。
次にdocker/db/sqlディレクトリを作り、初期設定用のスクリプトと、SQLを配置します。
create database if not exists demo;
create user if not exists embulkuser;
grant all on demo.* to 'embulkuser'@'%';
create table if not exists demo.user(
id INT(11) AUTO_INCREMENT not null primary key,
name varchar(30) not null,
age INT(3) not null,
registerd_at timestamp default current_timestamp,
updated_at timestamp default current_timestamp
);
mysql -u root -proot < "/docker-entrypoint-initdb.d/1_initDb.sql"
これらをMySQLコンテナの/docker-entrypoint-initdb.dにコピーしておくと、コンテナ起動時にファイル名の先頭の番号順に自動で実行してくれます。
config.yml とtestデータ
in:
type: file
path_prefix: /work/config/test.csv
parser:
charset: UTF-8
newline: LF
type: csv
delimiter: ','
quote: '"'
escape: '"'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: id, type: long}
- {name: name, type: string}
- {name: age, type: long}
out:
type: mysql
host: db
user: embulkuser
password: password
database: demo
table: user
mode: truncate_insert
上記のdocker-compose.ymlからMySQLコンテナへはホスト名"db"で接続できます。
test用のCSVは以下のkotlin scriptで40万レコード分自動生成してみました。
import java.io.*
val fileWriter = FileWriter("./test.csv")
val pw = PrintWriter(BufferedWriter(fileWriter))
pw.println("id,name,age")
for (i in 1..400000){
val fileContent="$i,$i" + "_san,$i"
pw.println(fileContent)
}
pw.close()
これらのconfig.ymlとtest.scvをsrc/main/resourcesに配置します。
コンテナ実行
kotlinをコンパイルします。
./gradle build
コンテナを起動します。
cd docker
docker-compose up -d --build
プログラムを実行してみます。
実行するjarのファイル名の指定は適宜行なってください。
docker-compose exec embulk java -jar /work/qiita-embulk.jar
これでデータの転送が開始されるはずです!
最後に
自分で何も考えずに40万レコードをひとつひとつDBへ転送するよりも数十〜数百倍のオーダで早くなるので、データ転送にはembulkを使うことを学びました!
ちなみにコンテナ実行で40万レコードの転送は20秒ほどでした(環境にもよりますが)。