最近社内でDigdag+Embulkでデータ連携をする機会が増えてきたので、ローカルで検証できる環境を作りました。
開発用のサーバがあればそれを使っても良いですが、気軽に試せるローカル環境があると何かと便利です。
この記事ではDigdagとEmbulkについては詳しく説明しないので、基本的な使い方を知っている方向けです。
試すこと
実際こんなことをすることはほとんど無いと思いますが
PostgreSQL -> SQLServer
のデータ転送をローカルだけでやってみます。
本当はクラウドで管理されているDWHに転送したり、S3に保存するケースが多いと思いますが今回はローカルで完結させることに主眼を置きます。
Embulkを使うだけだったら必ずしもDigdagは必要ありませんが、Embulkの処理は概ねスケジューリングされたバッチ処理で行うケースが多いと思うので、それを念頭に置いています。
実現させたい環境
下記の用な動作環境を作ります。
サービス名 | 環境 |
---|---|
Digdag 0.9.33 | ローカル |
Embulk 0.9.5 | コンテナ |
Postgres 9.6 | コンテナ |
SQLServer 2017 | コンテナ |
Digdagの環境もDockerで構築できますが、Digdagのコンテナ上でEmbulkのコンテナを動作させることになり、Docker in Dockerな構成になるため避けています。
RDBのコンテナを立ち上げる
まずDBのコンテナを立ち上げます。
これは特に難しいことはないですが、下記の様なdocker-compose.ymlを作って立ち上げましょう。
version: "3"
services:
postgres:
image: postgres:9.6
container_name: "postgres_practice"
ports:
- 5432:5432
environment:
- POSTGRES_USER=your_user
- POSTGRES_PASSWORD=your_password
- POSTGRES_DB=practice
user: root
sqlserver:
image: microsoft/mssql-server-linux:2017-latest
container_name: "sqlserver_practice"
ports:
- 1433:1433
environment:
ACCEPT_EULA: 'Y'
SA_PASSWORD: 'YourP@ssword'
これでDBが立ち上がります。
ここには最低限の設定しか記載していないので、初期データロードやvolumeの永続化に関しては別途対応が必要になります。本記事では触れません。
Embulkが動作させるためのDockerfile
Embulkの動作環境を用意します。基本的なインストール方法は公式ドキュメント通りです。
Embulkにはいくつもプラグインが公開されおり、RDBのデータ入出力に関しては幅広く対応されています。
https://plugins.embulk.org/
FROM java:8-jre
RUN apt-get update && \
apt-get -y install openssh-client
RUN curl --create-dirs -o /bin/embulk -L "https://dl.bintray.com/embulk/maven/embulk-0.9.5.jar" && \
chmod +x /bin/embulk
RUN mkdir -p /your_embulk_bundle_dir
WORKDIR /your_embulk_bundle_dir
COPY Gemfile .
COPY Gemfile.lock .
RUN embulk bundle install --path vendor/bundle
今回はPostgreSQL -> SQLServerのデータ連携を実現することが目的なので
Gemfileは下記のように設定します。
source "http://rubygems.org"
gem 'embulk'
gem 'embulk-input-postgresql'
gem 'embulk-output-sqlserver'
inputとoutputでプラグインが分かれているので別で指定します。先頭のembulkも無いとエラーになるので入れます。
Gemfile.lockが無いとエラーになるので空ファイルを作ります。
touch Gemfile.lock
DigdagからDocker imageを使うときは既にimageがビルドされている必要があるため、適当な名前をつけてビルドします。
docker build -t practice_embulk .
Digdagでワークフローを定義
ワークフローを定義するファイルは.digという拡張子のファイルを用いる必要があり、yamlベースの独自フォーマットで記述します。
practice.digというファイルを作成しておきます。
+practice:
_export:
docker:
image: practice_embulk
sh>: embulk run -b /your_embulk_bundle_dir rdb_data_transfer.yml
これは指定したコンテナ上でrdb_data_transfer.ymlに書かれた設定でemubulkを動作させるための命令を書いています。
_exportでdockerを指定すると、そのスコープ内で実行される命令をdocker run経由で実行してくれます。今回はEmbulkをDockerコンテナから使うので指定する必要があります。
次にembulk runコマンドで引数に与えているyamlファイルについて説明します。
Embulkの設定ファイルを作成
Embulkの設定ファイルはyaml形式で記述します。実際の運用ではデフォルトで使えるliquidというテンプレートエンジンが便利なのですが、この記事では必要無いので使いません。
rdb_data_transfer.ymlは下記のような設定にしておきます。
in:
type: postgresql
host: host.docker.internal
user: your_user
password: your_password
database: practice
query: |
SELECT id,
content
FROM my_practice
out:
type: sqlserver
host: host.docker.internal
user: sa
password: YourP@ssword
database: practice
table: my_practice_from_postgres
mode: truncate_insert
in: はデータの送信元からどんなクエリでデータを取得するかを記述します。今回は簡易的な例として my_practiceというテーブルのデータをquery:で指定したSQLを用いて全件取得しています。
詳しくは https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-postgresql に書いてます。
out: はデータの挿入処理です。modeを指定することでデータの入れ方を変えることができます。今回はデータを一度truncateして、取得データをinsertする処理を試しました。このmodeではSQLServerにテーブルが存在しなければ作成してくれます。
詳しくは https://github.com/embulk/embulk-output-jdbc/tree/master/embulk-output-sqlserver に書いてます。
host:に指定しているhost.docker.internalはコンテナの中からホストを参照するために割り当てられるDockerコンテナ特有のDNS名です。
ホストの1433番と5432番ポートに対してリクエストを出すと、ポートフォワーディングによってコンテナに向けられるため結果的にDBにたどり着きます。
(この辺りの話は別途記事を作って解説しようと思います)
docker run時にオプションを指定するか、docker-composeを使ってコンテナに名前を割り当てたら簡単に名前解決できますが、この仕組みではどちらも実現できないためこのような方法を取っています。
実行
PostgreSQLにテーブルとテストデータを流し込みます。
コンテナを立ち上げる時に一緒に流し込むのが本当にはいいですが、少ないので手動でやります。
私はInteliJで接続して試しました。
CREATE TABLE my_practice (
id int,
content text
);
INSERT INTO my_practice (id, content) VALUES (1, 'practice1'), (2, 'practice2'), (3, 'practice3');
SQLServerでデータベースを作成します。
CREATE DATABASE practice COLLATE Japanese_CI_AS;
digdagコマンドで実行します。
digdag run practice.dig
実行して成功すると、下記のような表示になります。
2019-03-17 15:23:37.958 +0000: Embulk v0.9.5
2019-03-17 15:23:39.373 +0000 [INFO] (main): Started Embulk v0.9.5
2019-03-17 15:23:43.102 +0000 [INFO] (0001:transaction): BUNDLE_GEMFILE is being set: "/your_embulk_bundle_dir/Gemfile"
2019-03-17 15:23:43.103 +0000 [INFO] (0001:transaction): Gem's home and path are being cleared.
2019-03-17 15:23:45.928 +0000 [INFO] (0001:transaction): Loaded plugin embulk-input-postgresql (0.9.3)
2019-03-17 15:23:45.981 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-sqlserver (0.8.2)
2019-03-17 15:23:46.061 +0000 [INFO] (0001:transaction): JDBC Driver = /your_embulk_bundle_dir/vendor/bundle/jruby/2.3.0/gems/embulk-input-postgresql-0.9.3/default_jdbc_driver/postgresql-9.4-1205-jdbc41.jar
2019-03-17 15:23:46.069 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://host.docker.internal:5432/practice options {ApplicationName=embulk-input-postgresql, user=your_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2019-03-17 15:23:46.177 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2019-03-17 15:23:46.197 +0000 [INFO] (0001:transaction): Using JDBC Driver PostgreSQL 9.4 JDBC4.1 (build 1205)
2019-03-17 15:23:46.298 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=4 / output tasks 2 = input tasks 1 * 2
2019-03-17 15:23:46.350 +0000 [INFO] (0001:transaction): Using jTDS Driver
2019-03-17 15:23:46.355 +0000 [INFO] (0001:transaction): Connecting to jdbc:jtds:sqlserver://host.docker.internal:1433/practice options {user=sa, password=***}
2019-03-17 15:23:46.509 +0000 [INFO] (0001:transaction): Using JDBC Driver 1.3.1
2019-03-17 15:23:46.510 +0000 [INFO] (0001:transaction): Using truncate_insert mode
2019-03-17 15:23:46.592 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE "my_practice_from_postgres_000001698c406e16_embulk000" ("id" BIGINT, "content" TEXT)
2019-03-17 15:23:46.601 +0000 [INFO] (0001:transaction): > 0.01 seconds
2019-03-17 15:23:46.610 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE "my_practice_from_postgres_000001698c406e16_embulk001" ("id" BIGINT, "content" TEXT)
2019-03-17 15:23:46.620 +0000 [INFO] (0001:transaction): > 0.01 seconds
2019-03-17 15:23:46.856 +0000 [INFO] (0001:transaction): {done: 0 / 1, running: 0}
2019-03-17 15:23:46.920 +0000 [INFO] (0015:task-0000): Using jTDS Driver
2019-03-17 15:23:46.933 +0000 [INFO] (0015:task-0000): Connecting to jdbc:jtds:sqlserver://host.docker.internal:1433/practice options {user=sa, password=***}
2019-03-17 15:23:46.953 +0000 [INFO] (0015:task-0000): Prepared SQL: INSERT INTO "my_practice_from_postgres_000001698c406e16_embulk000" ("id", "content") VALUES (?, ?)
2019-03-17 15:23:46.963 +0000 [INFO] (0015:task-0000): Using jTDS Driver
2019-03-17 15:23:46.964 +0000 [INFO] (0015:task-0000): Connecting to jdbc:jtds:sqlserver://host.docker.internal:1433/practice options {user=sa, password=***}
2019-03-17 15:23:46.984 +0000 [INFO] (0015:task-0000): Prepared SQL: INSERT INTO "my_practice_from_postgres_000001698c406e16_embulk001" ("id", "content") VALUES (?, ?)
2019-03-17 15:23:47.064 +0000 [INFO] (0015:task-0000): Connecting to jdbc:postgresql://host.docker.internal:5432/practice options {ApplicationName=embulk-input-postgresql, user=your_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2019-03-17 15:23:47.099 +0000 [INFO] (0015:task-0000): SQL: SET search_path TO "public"
2019-03-17 15:23:47.107 +0000 [INFO] (0015:task-0000): SQL: DECLARE cur NO SCROLL CURSOR FOR SELECT id,
content
FROM my_practice
2019-03-17 15:23:47.114 +0000 [INFO] (0015:task-0000): SQL: FETCH FORWARD 10000 FROM cur
2019-03-17 15:23:47.121 +0000 [INFO] (0015:task-0000): > 0.00 seconds
2019-03-17 15:23:47.130 +0000 [INFO] (0015:task-0000): SQL: FETCH FORWARD 10000 FROM cur
2019-03-17 15:23:47.132 +0000 [INFO] (0015:task-0000): > 0.00 seconds
2019-03-17 15:23:47.135 +0000 [INFO] (0015:task-0000): Loading 3 rows
2019-03-17 15:23:47.159 +0000 [INFO] (0015:task-0000): > 0.02 seconds (loaded 3 rows in total)
2019-03-17 15:23:47.168 +0000 [INFO] (0001:transaction): {done: 1 / 1, running: 0}
2019-03-17 15:23:47.170 +0000 [INFO] (0001:transaction): Using jTDS Driver
2019-03-17 15:23:47.171 +0000 [INFO] (0001:transaction): Connecting to jdbc:jtds:sqlserver://host.docker.internal:1433/practice options {user=sa, password=***}
2019-03-17 15:23:47.213 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE "my_practice_from_postgres" ("id" BIGINT, "content" TEXT)
2019-03-17 15:23:47.217 +0000 [INFO] (0001:transaction): > 0.00 seconds
2019-03-17 15:23:47.222 +0000 [INFO] (0001:transaction): SQL: DELETE FROM "my_practice_from_postgres"
2019-03-17 15:23:47.227 +0000 [INFO] (0001:transaction): > 0.00 seconds
2019-03-17 15:23:47.227 +0000 [INFO] (0001:transaction): SQL: INSERT INTO "my_practice_from_postgres" ("id", "content") SELECT "id", "content" FROM "my_practice_from_postgres_000001698c406e16_embulk000" UNION ALL SELECT "id", "content" FROM "my_practice_from_postgres_000001698c406e16_embulk001"
2019-03-17 15:23:47.235 +0000 [INFO] (0001:transaction): > 0.01 seconds (3 rows)
2019-03-17 15:23:47.298 +0000 [INFO] (0001:transaction): Using jTDS Driver
2019-03-17 15:23:47.299 +0000 [INFO] (0001:transaction): Connecting to jdbc:jtds:sqlserver://host.docker.internal:1433/practice options {user=sa, password=***}
2019-03-17 15:23:47.331 +0000 [INFO] (0001:transaction): SQL: DROP TABLE "my_practice_from_postgres_000001698c406e16_embulk000"
2019-03-17 15:23:47.343 +0000 [INFO] (0001:transaction): > 0.01 seconds
2019-03-17 15:23:47.355 +0000 [INFO] (0001:transaction): SQL: DROP TABLE "my_practice_from_postgres_000001698c406e16_embulk001"
2019-03-17 15:23:47.371 +0000 [INFO] (0001:transaction): > 0.02 seconds
2019-03-17 15:23:47.387 +0000 [INFO] (main): Committed.
これでローカルだけで、PostgreSQLからSQLServerへのデータ転送処理が実現できました。
実際にSQLServer上にテーブルが作成されてデータが挿入されていることが確認できるはずです。
#最後に
簡易な設定だけで実現できるのでコンテナの技術とEmbulkの利便性を感じました。
Embulkのプラグインは機能が豊富でいろんなパターンに対応可能なので、色々試すと面白いです。
今回はとてもシンプルな例だったためEmbulkの設定ファイルは簡潔でしたが、実際の業務で必要なテーブルになるとデータの形式によってはfilterを入れる必要があったりするため、色々と考えることはあります。
あくまで導入の第一歩としての紹介でした。