4
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Digdag+Embulkで異種RDB間のデータ転送をローカルで検証できるようにする

Posted at

最近社内でDigdag+Embulkでデータ連携をする機会が増えてきたので、ローカルで検証できる環境を作りました。
開発用のサーバがあればそれを使っても良いですが、気軽に試せるローカル環境があると何かと便利です。

この記事ではDigdagEmbulkについては詳しく説明しないので、基本的な使い方を知っている方向けです。

試すこと

実際こんなことをすることはほとんど無いと思いますが
PostgreSQL -> SQLServer
のデータ転送をローカルだけでやってみます。

本当はクラウドで管理されているDWHに転送したり、S3に保存するケースが多いと思いますが今回はローカルで完結させることに主眼を置きます。

Embulkを使うだけだったら必ずしもDigdagは必要ありませんが、Embulkの処理は概ねスケジューリングされたバッチ処理で行うケースが多いと思うので、それを念頭に置いています。

実現させたい環境

下記の用な動作環境を作ります。

サービス名 環境
Digdag 0.9.33 ローカル
Embulk 0.9.5 コンテナ
Postgres 9.6 コンテナ
SQLServer 2017 コンテナ

Digdagの環境もDockerで構築できますが、Digdagのコンテナ上でEmbulkのコンテナを動作させることになり、Docker in Dockerな構成になるため避けています。

RDBのコンテナを立ち上げる

まずDBのコンテナを立ち上げます。
これは特に難しいことはないですが、下記の様なdocker-compose.ymlを作って立ち上げましょう。


version: "3"
services:
  postgres:
    image: postgres:9.6
    container_name: "postgres_practice"
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=your_user
      - POSTGRES_PASSWORD=your_password
      - POSTGRES_DB=practice
    user: root

  sqlserver:
    image: microsoft/mssql-server-linux:2017-latest
    container_name: "sqlserver_practice"
    ports:
      - 1433:1433
    environment:
      ACCEPT_EULA: 'Y'
      SA_PASSWORD: 'YourP@ssword'

これでDBが立ち上がります。
ここには最低限の設定しか記載していないので、初期データロードやvolumeの永続化に関しては別途対応が必要になります。本記事では触れません。

Embulkが動作させるためのDockerfile

Embulkの動作環境を用意します。基本的なインストール方法は公式ドキュメント通りです。
Embulkにはいくつもプラグインが公開されおり、RDBのデータ入出力に関しては幅広く対応されています。
https://plugins.embulk.org/


FROM java:8-jre

RUN apt-get update && \
    apt-get -y install openssh-client

RUN curl --create-dirs -o /bin/embulk -L "https://dl.bintray.com/embulk/maven/embulk-0.9.5.jar" && \
    chmod +x /bin/embulk

RUN mkdir -p /your_embulk_bundle_dir
WORKDIR /your_embulk_bundle_dir

COPY Gemfile .
COPY Gemfile.lock .

RUN embulk bundle install --path vendor/bundle

今回はPostgreSQL -> SQLServerのデータ連携を実現することが目的なので
Gemfileは下記のように設定します。

source "http://rubygems.org"

gem 'embulk'
gem 'embulk-input-postgresql'
gem 'embulk-output-sqlserver'

inputとoutputでプラグインが分かれているので別で指定します。先頭のembulkも無いとエラーになるので入れます。

Gemfile.lockが無いとエラーになるので空ファイルを作ります。

touch Gemfile.lock

DigdagからDocker imageを使うときは既にimageがビルドされている必要があるため、適当な名前をつけてビルドします。

docker build -t practice_embulk . 

Digdagでワークフローを定義

ワークフローを定義するファイルは.digという拡張子のファイルを用いる必要があり、yamlベースの独自フォーマットで記述します。

practice.digというファイルを作成しておきます。

+practice:
  _export:
    docker:
      image: practice_embulk
  sh>: embulk run -b /your_embulk_bundle_dir rdb_data_transfer.yml

これは指定したコンテナ上でrdb_data_transfer.ymlに書かれた設定でemubulkを動作させるための命令を書いています。

_exportでdockerを指定すると、そのスコープ内で実行される命令をdocker run経由で実行してくれます。今回はEmbulkをDockerコンテナから使うので指定する必要があります。

次にembulk runコマンドで引数に与えているyamlファイルについて説明します。

Embulkの設定ファイルを作成

Embulkの設定ファイルはyaml形式で記述します。実際の運用ではデフォルトで使えるliquidというテンプレートエンジンが便利なのですが、この記事では必要無いので使いません。

rdb_data_transfer.ymlは下記のような設定にしておきます。

in:
  type: postgresql
  host: host.docker.internal
  user: your_user
  password: your_password
  database: practice
  query: |
    SELECT id,
           content
    FROM my_practice

out:
  type: sqlserver
  host: host.docker.internal
  user: sa
  password: YourP@ssword
  database: practice
  table: my_practice_from_postgres
  mode: truncate_insert

in: はデータの送信元からどんなクエリでデータを取得するかを記述します。今回は簡易的な例として my_practiceというテーブルのデータをquery:で指定したSQLを用いて全件取得しています。
詳しくは https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-postgresql に書いてます。

out: はデータの挿入処理です。modeを指定することでデータの入れ方を変えることができます。今回はデータを一度truncateして、取得データをinsertする処理を試しました。このmodeではSQLServerにテーブルが存在しなければ作成してくれます。
詳しくは https://github.com/embulk/embulk-output-jdbc/tree/master/embulk-output-sqlserver に書いてます。

host:に指定しているhost.docker.internalはコンテナの中からホストを参照するために割り当てられるDockerコンテナ特有のDNS名です。

ホストの1433番と5432番ポートに対してリクエストを出すと、ポートフォワーディングによってコンテナに向けられるため結果的にDBにたどり着きます。
(この辺りの話は別途記事を作って解説しようと思います)

docker run時にオプションを指定するか、docker-composeを使ってコンテナに名前を割り当てたら簡単に名前解決できますが、この仕組みではどちらも実現できないためこのような方法を取っています。

実行

PostgreSQLにテーブルとテストデータを流し込みます。
コンテナを立ち上げる時に一緒に流し込むのが本当にはいいですが、少ないので手動でやります。
私はInteliJで接続して試しました。


CREATE TABLE my_practice (
    id int,
    content text
);

INSERT INTO my_practice (id, content) VALUES (1, 'practice1'), (2, 'practice2'), (3, 'practice3');

SQLServerでデータベースを作成します。


CREATE DATABASE practice COLLATE Japanese_CI_AS;

digdagコマンドで実行します。


digdag run practice.dig

実行して成功すると、下記のような表示になります。

2019-03-17 15:23:37.958 +0000: Embulk v0.9.5
2019-03-17 15:23:39.373 +0000 [INFO] (main): Started Embulk v0.9.5
2019-03-17 15:23:43.102 +0000 [INFO] (0001:transaction): BUNDLE_GEMFILE is being set: "/your_embulk_bundle_dir/Gemfile"
2019-03-17 15:23:43.103 +0000 [INFO] (0001:transaction): Gem's home and path are being cleared.
2019-03-17 15:23:45.928 +0000 [INFO] (0001:transaction): Loaded plugin embulk-input-postgresql (0.9.3)
2019-03-17 15:23:45.981 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-sqlserver (0.8.2)
2019-03-17 15:23:46.061 +0000 [INFO] (0001:transaction): JDBC Driver = /your_embulk_bundle_dir/vendor/bundle/jruby/2.3.0/gems/embulk-input-postgresql-0.9.3/default_jdbc_driver/postgresql-9.4-1205-jdbc41.jar
2019-03-17 15:23:46.069 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://host.docker.internal:5432/practice options {ApplicationName=embulk-input-postgresql, user=your_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2019-03-17 15:23:46.177 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2019-03-17 15:23:46.197 +0000 [INFO] (0001:transaction): Using JDBC Driver PostgreSQL 9.4 JDBC4.1 (build 1205)
2019-03-17 15:23:46.298 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=4 / output tasks 2 = input tasks 1 * 2
2019-03-17 15:23:46.350 +0000 [INFO] (0001:transaction): Using jTDS Driver
2019-03-17 15:23:46.355 +0000 [INFO] (0001:transaction): Connecting to jdbc:jtds:sqlserver://host.docker.internal:1433/practice options {user=sa, password=***}
2019-03-17 15:23:46.509 +0000 [INFO] (0001:transaction): Using JDBC Driver 1.3.1
2019-03-17 15:23:46.510 +0000 [INFO] (0001:transaction): Using truncate_insert mode
2019-03-17 15:23:46.592 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE "my_practice_from_postgres_000001698c406e16_embulk000" ("id" BIGINT, "content" TEXT)
2019-03-17 15:23:46.601 +0000 [INFO] (0001:transaction): > 0.01 seconds
2019-03-17 15:23:46.610 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE "my_practice_from_postgres_000001698c406e16_embulk001" ("id" BIGINT, "content" TEXT)
2019-03-17 15:23:46.620 +0000 [INFO] (0001:transaction): > 0.01 seconds
2019-03-17 15:23:46.856 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2019-03-17 15:23:46.920 +0000 [INFO] (0015:task-0000): Using jTDS Driver
2019-03-17 15:23:46.933 +0000 [INFO] (0015:task-0000): Connecting to jdbc:jtds:sqlserver://host.docker.internal:1433/practice options {user=sa, password=***}
2019-03-17 15:23:46.953 +0000 [INFO] (0015:task-0000): Prepared SQL: INSERT INTO "my_practice_from_postgres_000001698c406e16_embulk000" ("id", "content") VALUES (?, ?)
2019-03-17 15:23:46.963 +0000 [INFO] (0015:task-0000): Using jTDS Driver
2019-03-17 15:23:46.964 +0000 [INFO] (0015:task-0000): Connecting to jdbc:jtds:sqlserver://host.docker.internal:1433/practice options {user=sa, password=***}
2019-03-17 15:23:46.984 +0000 [INFO] (0015:task-0000): Prepared SQL: INSERT INTO "my_practice_from_postgres_000001698c406e16_embulk001" ("id", "content") VALUES (?, ?)
2019-03-17 15:23:47.064 +0000 [INFO] (0015:task-0000): Connecting to jdbc:postgresql://host.docker.internal:5432/practice options {ApplicationName=embulk-input-postgresql, user=your_user, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2019-03-17 15:23:47.099 +0000 [INFO] (0015:task-0000): SQL: SET search_path TO "public"
2019-03-17 15:23:47.107 +0000 [INFO] (0015:task-0000): SQL: DECLARE cur NO SCROLL CURSOR FOR SELECT id,
       content
FROM my_practice

2019-03-17 15:23:47.114 +0000 [INFO] (0015:task-0000): SQL: FETCH FORWARD 10000 FROM cur
2019-03-17 15:23:47.121 +0000 [INFO] (0015:task-0000): > 0.00 seconds
2019-03-17 15:23:47.130 +0000 [INFO] (0015:task-0000): SQL: FETCH FORWARD 10000 FROM cur
2019-03-17 15:23:47.132 +0000 [INFO] (0015:task-0000): > 0.00 seconds
2019-03-17 15:23:47.135 +0000 [INFO] (0015:task-0000): Loading 3 rows
2019-03-17 15:23:47.159 +0000 [INFO] (0015:task-0000): > 0.02 seconds (loaded 3 rows in total)
2019-03-17 15:23:47.168 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2019-03-17 15:23:47.170 +0000 [INFO] (0001:transaction): Using jTDS Driver
2019-03-17 15:23:47.171 +0000 [INFO] (0001:transaction): Connecting to jdbc:jtds:sqlserver://host.docker.internal:1433/practice options {user=sa, password=***}
2019-03-17 15:23:47.213 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE "my_practice_from_postgres" ("id" BIGINT, "content" TEXT)
2019-03-17 15:23:47.217 +0000 [INFO] (0001:transaction): > 0.00 seconds
2019-03-17 15:23:47.222 +0000 [INFO] (0001:transaction): SQL: DELETE FROM "my_practice_from_postgres"
2019-03-17 15:23:47.227 +0000 [INFO] (0001:transaction): > 0.00 seconds
2019-03-17 15:23:47.227 +0000 [INFO] (0001:transaction): SQL: INSERT INTO "my_practice_from_postgres" ("id", "content") SELECT "id", "content" FROM "my_practice_from_postgres_000001698c406e16_embulk000" UNION ALL SELECT "id", "content" FROM "my_practice_from_postgres_000001698c406e16_embulk001"
2019-03-17 15:23:47.235 +0000 [INFO] (0001:transaction): > 0.01 seconds (3 rows)
2019-03-17 15:23:47.298 +0000 [INFO] (0001:transaction): Using jTDS Driver
2019-03-17 15:23:47.299 +0000 [INFO] (0001:transaction): Connecting to jdbc:jtds:sqlserver://host.docker.internal:1433/practice options {user=sa, password=***}
2019-03-17 15:23:47.331 +0000 [INFO] (0001:transaction): SQL: DROP TABLE "my_practice_from_postgres_000001698c406e16_embulk000"
2019-03-17 15:23:47.343 +0000 [INFO] (0001:transaction): > 0.01 seconds
2019-03-17 15:23:47.355 +0000 [INFO] (0001:transaction): SQL: DROP TABLE "my_practice_from_postgres_000001698c406e16_embulk001"
2019-03-17 15:23:47.371 +0000 [INFO] (0001:transaction): > 0.02 seconds
2019-03-17 15:23:47.387 +0000 [INFO] (main): Committed.

これでローカルだけで、PostgreSQLからSQLServerへのデータ転送処理が実現できました。
実際にSQLServer上にテーブルが作成されてデータが挿入されていることが確認できるはずです。

#最後に
簡易な設定だけで実現できるのでコンテナの技術とEmbulkの利便性を感じました。
Embulkのプラグインは機能が豊富でいろんなパターンに対応可能なので、色々試すと面白いです。

今回はとてもシンプルな例だったためEmbulkの設定ファイルは簡潔でしたが、実際の業務で必要なテーブルになるとデータの形式によってはfilterを入れる必要があったりするため、色々と考えることはあります。
あくまで導入の第一歩としての紹介でした。

4
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?