LoginSignup
3
2

More than 3 years have passed since last update.

KotlinでEmbulkを動かす(Dockerで起動する)

Last updated at Posted at 2020-04-23

はじめに

 embulkとはデータの転送を効率よく行う仕組みです。自分でバッチプログラムを組んでDBへデータを転送するよりもはるかに効率的に行うことができます。今回はkotlinからembulkを操作してDB(MySQL)上にデータを転送してみたいと思います。また、これらをDockerコンテナ上で行いたいと思います。

 embulkコンテナにはembulkを操作するjar(kotlinのコンパイルはローカルで行います)とembulkのconfigファイル, 転送データのCSVファイルを配置します。MySQLもDockerコンテナにします。

環境

kotlin : 1.3.70
gradle : 6.3
MySQL(Docker) : 5.7
Java(Docker) : Java8 Alpine Linux

embulkコンテナの構成ファイルは、以下の通りです。
/work/embulk.jar (転送を実行する本体)
/work/config/config.yml (embulk設定ファイル)
/work/config/test.csv (転送するデータ)

プラグインのダウンロードとビルド

embulkでDBへの転送を行うためにはプラグインが必要となります。
単純にembulkをインストールしてshell上で実行する時は、embulk gem install xxxでプラグインをインストールしますが、今回のようにkotlinから操作する場合には、プラグインのダウンロードとビルドを行う必要があります。

git clone git@github.com:embulk/embulk-output-jdbc.git

ビルドして必要なjarを取ってきます。

mkdir plugin
cd embulk-output-jdbc
./gradlew gem
cp embulk-output-mysql/default_jdbc_driver/mysql-connector-java-5.1.44.jar ../pliugin
cp embulk-output-mysql/build/libs/embulk-output-mysql-0.8.7.jar ../plugin
cp embulk-output-jdbc/build/libs/embulk-output-jdbc-0.8.7.jar ../plugin

embulkを操作するkotlin

ルートに戻ってgradleプロジェクトを作成します。

gradle init

build.gradleは以下のようになっております。
mainClassの指定は適宜行なってください。

build.gradle
plugins {
    // Apply the Kotlin JVM plugin to add support for Kotlin.
    id 'org.jetbrains.kotlin.jvm' version '1.3.70'
    // Apply the application plugin to add support for building a CLI application.
    id 'application'
}

repositories {
    // Use jcenter for resolving dependencies.
    // You can declare any Maven/Ivy/file repository here.
    jcenter()
    mavenCentral()
}

dependencies {
    // Align versions of all Kotlin components
    implementation platform('org.jetbrains.kotlin:kotlin-bom')
    // Use the Kotlin JDK 8 standard library.
    compile 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
    // Use the Kotlin test library.
    testImplementation 'org.jetbrains.kotlin:kotlin-test'
    // Use the Kotlin JUnit integration.
    testImplementation 'org.jetbrains.kotlin:kotlin-test-junit'
    //embulk
    compile("org.embulk:embulk-core:0.9.12")
    compile("org.embulk:embulk-standards:0.9.12")
    //plugin
    compile fileTree(dir: 'plugin', include: ['*.jar'])
}

application {
    // Define the main class for the application.
    mainClassName = 'AppKt'
}

jar {
    manifest {attributes 'Main-Class': 'qiita.embulk.AppKt'}
    from {configurations.compile.collect {it.isDirectory() ? it : zipTree(it)}}
}

embulkの設定, preview, runを行うクラスを記述してみます。

Embulk.kt
package qiita.embulk

import com.google.inject.Binder
import com.google.inject.Module
import org.embulk.EmbulkEmbed
import org.embulk.EmbulkEmbed.Bootstrap
import org.embulk.config.ConfigSource
import org.embulk.output.MySQLOutputPlugin
import org.embulk.spi.OutputPlugin
import org.embulk.plugin.InjectedPluginSource
import java.io.*


class Embulk(){
    lateinit var config : ConfigSource
    lateinit var embulk : EmbulkEmbed
    fun loadConfig(path : String){
        println("Hello, Embulk!")
        val bootstrap = Bootstrap()
        bootstrap.addModules(DbOutputModule())
        embulk = bootstrap.initializeCloseable()
        try {
            val loader = embulk.newConfigLoader()
            config = loader.fromYamlFile(File(path))
        } catch (e: Exception) {
            println(e.toString())
        }
    }

    fun preview(){
        var result = embulk.preview(config)
        println(result.schema.columns)
    }

    fun run(){
        var result = embulk.run(config)
        println(result.configDiff.toString())
    }
    internal class DbOutputModule : Module {
        override fun configure(binder: Binder) {
            InjectedPluginSource.registerPluginTo(
                    Binder, 
                    OutputPlugin::class.java, 
                    "mysql", 
                    MySQLOutputPlugin::class.java
            )
        }
    }
}

bootstrap.addMdules()でプラグインを指定し、bootstrap.initializeCloseable()でembulkを起動しています。
プラグインの組み込みはInjectedPluginSource.registerPluginTo()で行います。
config = loader.fromYamlFile(File(path))でconfig.ymlの読み込みを行います。
実際にpreviewやrunを行うのはembulk.preview(), embulk.run()ですね。

メインクラスも記述します。

App.kt
package qiita.embulk

fun main(args: Array<String>) {
    val embulk = Embulk()
    embulk.loadConfig("./config/config.yml")
    embulk.preview()
    embulk.run()
}

Dockerfileとdocker-compose.yml

MySQLのDockerfileは以下のようになります。
ルート配下にdockerディレクトリを作ります。

mkdir docker

docker/ にDockerfileとdocker-compose.ymlを配置します。
コピーするjarのファイル名の指定は適宜行なってください。

Dockerfile_embulk
FROM openjdk:8-jre-alpine
RUN apk update && apk add bash
WORKDIR /work
COPY ./build/libs/qiita-embulk.jar /work
COPY ./src/main/resources/config.yml /work/config/
COPY ./src/main/resources/test.csv /work/config/
Dockerfile_mysql
FROM mysql:5.7
COPY ./docker/db/sql/2_initDb.sh /docker-entrypoint-initdb.d
COPY ./docker/db/sql/1_initDb.sql /docker-entrypoint-initdb.d
docker-compose.yml
version: '3'
services:
  db:
    image: mysql:5.7
    container_name: demo_mysql
    build:
      context: ..
      dockerfile: ./docker/Dockerfile_mysql
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: test-database
      MYSQL_USER: embulkuser
      MYSQL_PASSWORD: password
      TZ: 'Asia/Tokyo'
    volumes:
      - ./db/my.cnf:/etc/mysql/conf.d/my.cnf
      - ./db/sql:/docker-entrypoint-initdb.d
    ports:
      - 3306:3306

  embulk:
    image: embulk
    container_name: demo_embulk
    build:
      context: ..
      dockerfile: ./docker/Dockerfile_embulk
    tty: true

docker/dbディレクトリを作り、my.cnfを配置します。

my.cnf
[mysqld]
character-set-server=utf8mb4
collation-server=utf8mb4_bin
sql_mode=''
[client]
default-character-set = utf8mb4

文字コードの設定と、MySQL5.7ではsql_mode=STRICT_ALL_TABLEがデフォルトで有効になっており、これを外さないとNULL値を許可してくれなかったりして面倒な事になるかもしれないので一応外します。
次にdocker/db/sqlディレクトリを作り、初期設定用のスクリプトと、SQLを配置します。

1_initDB.sql
create database if not exists demo;
create user if not exists embulkuser;

grant all on demo.* to 'embulkuser'@'%';

create table if not exists demo.user(
       id INT(11) AUTO_INCREMENT not null primary key,
       name varchar(30) not null,
       age INT(3) not null,
       registerd_at timestamp default current_timestamp,
       updated_at timestamp default current_timestamp
);

2_initDB.sh
mysql -u root -proot < "/docker-entrypoint-initdb.d/1_initDb.sql"

これらをMySQLコンテナの/docker-entrypoint-initdb.dにコピーしておくと、コンテナ起動時にファイル名の先頭の番号順に自動で実行してくれます。

config.yml とtestデータ

config.yml
in:
  type: file
  path_prefix: /work/config/test.csv
  parser:
    charset: UTF-8
    newline: LF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: name, type: string}
    - {name: age, type: long}
out:
  type: mysql
  host: db
  user: embulkuser
  password: password
  database: demo
  table: user
  mode: truncate_insert

上記のdocker-compose.ymlからMySQLコンテナへはホスト名"db"で接続できます。
test用のCSVは以下のkotlin scriptで40万レコード分自動生成してみました。

import java.io.*

val fileWriter = FileWriter("./test.csv")
val pw = PrintWriter(BufferedWriter(fileWriter))
pw.println("id,name,age")
for (i in 1..400000){
    val fileContent="$i,$i" + "_san,$i"
    pw.println(fileContent)
}
pw.close()

これらのconfig.ymlとtest.scvをsrc/main/resourcesに配置します。

コンテナ実行

kotlinをコンパイルします。

./gradle build

コンテナを起動します。

cd docker
docker-compose up -d --build

プログラムを実行してみます。
実行するjarのファイル名の指定は適宜行なってください。

docker-compose exec embulk java -jar /work/qiita-embulk.jar

これでデータの転送が開始されるはずです!

最後に

自分で何も考えずに40万レコードをひとつひとつDBへ転送するよりも数十〜数百倍のオーダで早くなるので、データ転送にはembulkを使うことを学びました!
ちなみにコンテナ実行で40万レコードの転送は20秒ほどでした(環境にもよりますが)。

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2