最近データ基盤整理をやり始め、便利なものがあったのでまとめたメモ
色々他の人と被っているかもですが雑多なめも
Embulkとは
- ETLツール
- リアルタイムで動作するストリーミング型ログ収集フレームワーク
- fluentdのバッチ版
- TreasureData製のOSS
- 大量のデータをバッチ転送することに特化
- データの入力・加工・出力する部分はプラグインとして提供
- 設定ファイルはYAML
主な機能
- ファイル/データベース 吸い出し
- ストレージ/データベース ロード
メリット
- 並列実行でパフォーマンス最適化
- validation
- エラーリカバリー
- 冪等なリトライ
- プラグインが豊富なので大抵のユースケースをカバー(ここ大事
Embulkの型
型 | 説明 |
---|---|
boolean | true / false |
long | 64bit符号付き整数型 |
timestamp | 日付。ナノ秒を含む |
double | 64bit浮動小数点型 |
string | 文字列 |
- 途中型が一致しなかったデータに関してはその行だけ消えた
Embulk Plugin
-
List of Embulk Plugins by Category
- インストール例
- embulk gem install embulk-output-command
基本的な使い方(Postgresql出力)
Embulkが設定されていることを前提で記載しています。
ただいまdigdag & EmbulkのDockerを作成中です
- guess.yml - Postgresqlからselectし結果表示設定ファイル ※1
in:
type: postgresql
host: {{ host }}
user: {{ user }}
password: {{password }}
database: {{ database }}
query: |
SELECT
*
FROM
{table}
order by id asc;
out:
type: stdout
- guessコマンドで最小限の設定を保管したファイル設定を生成します。
$ embulk guess ./guess.yml -o config.yml
2020-01-29 07:15:24.615 +0000: Embulk v0.9.22
2020-01-29 07:15:25.898 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2020-01-29 07:15:30.659 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2020-01-29 07:15:33.179 +0000 [INFO] (main): Started Embulk v0.9.22
2020-01-29 07:15:33.578 +0000 [INFO] (0001:guess): Loaded plugin embulk-input-postgresql (0.10.1)
in:
type: postgresql
host: postgres-db
user: root
password: root
database: database
query: "SELECT \n * \nFROM \n table \norder by id asc;\n"
out: {type: stdout}
Created 'config.yml' file.
- previewコマンドでdry-runができます
$ embulk preview config.yml
2020-01-29 07:22:01.738 +0000: Embulk v0.9.22
2020-01-29 07:22:02.691 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2020-01-29 07:22:06.590 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2020-01-29 07:22:07.543 +0000 [INFO] (main): Started Embulk v0.9.22
2020-01-29 07:22:07.738 +0000 [INFO] (0001:preview): Loaded plugin embulk-input-postgresql (0.10.1)
2020-01-29 07:22:07.798 +0000 [INFO] (0001:preview): JDBC Driver = /root/.embulk/lib/gems/gems/embulk-input-postgresql-0.10.1/default_jdbc_driver/postgresql-9.4-1205-jdbc41.jar
2020-01-29 07:22:07.807 +0000 [INFO] (0001:preview): Connecting to jdbc:postgresql://postgres-db:5432/database options {ApplicationName=embulk-input-postgresql, user=root, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2020-01-29 07:22:07.955 +0000 [INFO] (0001:preview): SQL: SET search_path TO "public"
2020-01-29 07:22:07.969 +0000 [INFO] (0001:preview): Using JDBC Driver PostgreSQL 9.4 JDBC4.1 (build 1205)
2020-01-29 07:22:08.153 +0000 [INFO] (0001:preview): Connecting to jdbc:postgresql://postgres-db:5432/database options {ApplicationName=embulk-input-postgresql, user=root, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2020-01-29 07:22:08.167 +0000 [INFO] (0001:preview): SQL: SET search_path TO "public"
2020-01-29 07:22:08.175 +0000 [INFO] (0001:preview): SQL: DECLARE cur NO SCROLL CURSOR FOR SELECT
*
FROM
{Table}
order by id asc;
2020-01-29 07:22:08.180 +0000 [INFO] (0001:preview): SQL: FETCH FORWARD 10000 FROM cur
2020-01-29 07:22:08.184 +0000 [INFO] (0001:preview): > 0.00 seconds
2020-01-29 07:22:08.188 +0000 [INFO] (0001:preview): SQL: FETCH FORWARD 10000 FROM cur
2020-01-29 07:22:08.189 +0000 [INFO] (0001:preview): > 0.00 seconds
+---------+-------------+----------------+------------------+----------------+----------------+---------------+
| id:long | text:string | text_code:long | main_text:string | create_at:long | update_at:long | status:string |
+---------+-------------+----------------+------------------+----------------+----------------+---------------+
テーブルが表示される
+---------+-------------+----------------+------------------+----------------+----------------+---------------+
- runで実行 stdoutにoutput 指定
$ embulk run config.yml
2020-01-29 07:26:44.438 +0000: Embulk v0.9.22
2020-01-29 07:26:45.808 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2020-01-29 07:26:50.567 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2020-01-29 07:26:51.804 +0000 [INFO] (main): Started Embulk v0.9.22
2020-01-29 07:26:52.027 +0000 [INFO] (0001:transaction): Loaded plugin embulk-input-postgresql (0.10.1)
2020-01-29 07:26:52.106 +0000 [INFO] (0001:transaction): JDBC Driver = /root/.embulk/lib/gems/gems/embulk-input-postgresql-0.10.1/default_jdbc_driver/postgresql-9.4-1205-jdbc41.jar
2020-01-29 07:26:52.119 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://postgres-db:5432/database options {ApplicationName=embulk-input-postgresql, user=root, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2020-01-29 07:26:52.226 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2020-01-29 07:26:52.250 +0000 [INFO] (0001:transaction): Using JDBC Driver PostgreSQL 9.4 JDBC4.1 (build 1205)
2020-01-29 07:26:52.335 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=4 / tasks=1
2020-01-29 07:26:52.348 +0000 [INFO] (0001:transaction): {done: 0 / 1, running: 0}
2020-01-29 07:26:52.591 +0000 [INFO] (0015:task-0000): Connecting to jdbc:postgresql://postgres-db:5432/database options {ApplicationName=embulk-input-postgresql, user=root, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2020-01-29 07:26:52.600 +0000 [INFO] (0015:task-0000): SQL: SET search_path TO "public"
2020-01-29 07:26:52.608 +0000 [INFO] (0015:task-0000): SQL: DECLARE cur NO SCROLL CURSOR FOR SELECT
*
FROM
table
order by id asc;
2020-01-29 07:26:52.611 +0000 [INFO] (0015:task-0000): SQL: FETCH FORWARD 10000 FROM cur
2020-01-29 07:26:52.615 +0000 [INFO] (0015:task-0000): > 0.00 seconds
2020-01-29 07:26:52.620 +0000 [INFO] (0015:task-0000): SQL: FETCH FORWARD 10000 FROM cur
2020-01-29 07:26:52.621 +0000 [INFO] (0015:task-0000): > 0.00 seconds
postgresの出力結果
postgresの出力結果
postgresの出力結果
postgresの出力結果
postgresの出力結果
postgresの出力結果
postgresの出力結果
2020-01-29 07:26:52.628 +0000 [INFO] (0001:transaction): {done: 1 / 1, running: 0}
2020-01-29 07:26:52.633 +0000 [INFO] (main): Committed.
2020-01-29 07:26:52.633 +0000 [INFO] (main): Next config diff: {"in":{},"out":{}}
guess.ymlファイルに環境変数を設定しファイルを生成する
ファイル拡張子に.liquidとつける「postgres-guess.yml.liquid」
- liquid〜Embulkに備わっているテンプレートエンジン
- 同じ設定を記載し続けて肥大化を避けるため
_env.yml.liquid
- env設定ファイル
_env.yml.liquid
{% assign env_postgresql = 'postgresql' %}
{% assign env_host = '{host_name}' %}
{% assign env_user = '{user_name}' %}
{% assign env_password = '"{user_password}"' %}
{% assign env_database = '{database_name}' %}
guess.yml.liquid
- 汎用設定が書かれたテンプレートファイル
- 今回はpostgresなのでpostgres-guess.yml.liquidとしてます
postgres-guess.yml.liquid
{% include 'env' %} # これ必須
exec:
min_output_tasks: 1 # OutPutタスク数制御(1にしないとOutPutファイルが複数分割される)
in:
type: {{ env_postgresql }}
host: {{ env_host }}
user: {{ env_user }}
password: {{ env_password }}
database: {{ env_database }}
query: |
SELECT
*
FROM
Table Name
order by id asc;
out:
type: stdout
実行ファイル生成
$ embulk guess ./postgres-guess.yml.liquid -o postgres-config.yml
dry-run
$ embulk preview postgres-config.yml
実行
$ embulk run postgres-config.yml
感想
- 便利すぎて鼻血が出る
- 基本的な処理などはembulk側がやってくれるのでどこにどうデータを集めてデータ分析を行なっていく化などの考える時間に割り当てられます
- プラグインによっては差分で取得してくれるものもあるのでupdateの項目があるデータに関しては積極的に使ってみましょう
- 次はdigdagについて書いてみようかと思います。
※1 guess 読み込んだデータファイルのレイアウトを推測してyamlファイルを生成する機能