0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

3rep - Embulk やってみた(語彙力

Last updated at Posted at 2020-02-02

最近データ基盤整理をやり始め、便利なものがあったのでまとめたメモ
色々他の人と被っているかもですが雑多なめも

Embulkとは

  • ETLツール
  • リアルタイムで動作するストリーミング型ログ収集フレームワーク
  • fluentdのバッチ版
  • TreasureData製のOSS
  • 大量のデータをバッチ転送することに特化
  • データの入力・加工・出力する部分はプラグインとして提供
  • 設定ファイルはYAML

主な機能

  • ファイル/データベース 吸い出し
  • ストレージ/データベース ロード

メリット

  • 並列実行でパフォーマンス最適化
  • validation
  • エラーリカバリー
  • 冪等なリトライ
  • プラグインが豊富なので大抵のユースケースをカバー(ここ大事

Embulkの型

説明
boolean true / false
long 64bit符号付き整数型
timestamp 日付。ナノ秒を含む
double 64bit浮動小数点型
string 文字列
  • 途中型が一致しなかったデータに関してはその行だけ消えた

Embulk Plugin

基本的な使い方(Postgresql出力)

Embulkが設定されていることを前提で記載しています。
ただいまdigdag & EmbulkのDockerを作成中です

  • guess.yml - Postgresqlからselectし結果表示設定ファイル ※1
in:
  type: postgresql
  host: {{ host }}
  user: {{ user }}
  password: {{password }}
  database: {{ database }}
  query: |
    SELECT 
      * 
    FROM 
      {table} 
    order by id asc;
out:
  type: stdout
  • guessコマンドで最小限の設定を保管したファイル設定を生成します。
$ embulk guess ./guess.yml -o config.yml
2020-01-29 07:15:24.615 +0000: Embulk v0.9.22
2020-01-29 07:15:25.898 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2020-01-29 07:15:30.659 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2020-01-29 07:15:33.179 +0000 [INFO] (main): Started Embulk v0.9.22
2020-01-29 07:15:33.578 +0000 [INFO] (0001:guess): Loaded plugin embulk-input-postgresql (0.10.1)
in:
  type: postgresql
  host: postgres-db
  user: root
  password: root
  database: database
  query: "SELECT \n  * \nFROM \n  table \norder by id asc;\n"
out: {type: stdout}

Created 'config.yml' file.
  • previewコマンドでdry-runができます
$ embulk preview config.yml
2020-01-29 07:22:01.738 +0000: Embulk v0.9.22
2020-01-29 07:22:02.691 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2020-01-29 07:22:06.590 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2020-01-29 07:22:07.543 +0000 [INFO] (main): Started Embulk v0.9.22
2020-01-29 07:22:07.738 +0000 [INFO] (0001:preview): Loaded plugin embulk-input-postgresql (0.10.1)
2020-01-29 07:22:07.798 +0000 [INFO] (0001:preview): JDBC Driver = /root/.embulk/lib/gems/gems/embulk-input-postgresql-0.10.1/default_jdbc_driver/postgresql-9.4-1205-jdbc41.jar
2020-01-29 07:22:07.807 +0000 [INFO] (0001:preview): Connecting to jdbc:postgresql://postgres-db:5432/database options {ApplicationName=embulk-input-postgresql, user=root, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2020-01-29 07:22:07.955 +0000 [INFO] (0001:preview): SQL: SET search_path TO "public"
2020-01-29 07:22:07.969 +0000 [INFO] (0001:preview): Using JDBC Driver PostgreSQL 9.4 JDBC4.1 (build 1205)
2020-01-29 07:22:08.153 +0000 [INFO] (0001:preview): Connecting to jdbc:postgresql://postgres-db:5432/database options {ApplicationName=embulk-input-postgresql, user=root, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2020-01-29 07:22:08.167 +0000 [INFO] (0001:preview): SQL: SET search_path TO "public"
2020-01-29 07:22:08.175 +0000 [INFO] (0001:preview): SQL: DECLARE cur NO SCROLL CURSOR FOR SELECT
  *
FROM
  {Table}
order by id asc;

2020-01-29 07:22:08.180 +0000 [INFO] (0001:preview): SQL: FETCH FORWARD 10000 FROM cur
2020-01-29 07:22:08.184 +0000 [INFO] (0001:preview): > 0.00 seconds
2020-01-29 07:22:08.188 +0000 [INFO] (0001:preview): SQL: FETCH FORWARD 10000 FROM cur
2020-01-29 07:22:08.189 +0000 [INFO] (0001:preview): > 0.00 seconds
+---------+-------------+----------------+------------------+----------------+----------------+---------------+
| id:long | text:string | text_code:long | main_text:string | create_at:long | update_at:long | status:string |
+---------+-------------+----------------+------------------+----------------+----------------+---------------+
テーブルが表示される
+---------+-------------+----------------+------------------+----------------+----------------+---------------+
  • runで実行 stdoutにoutput 指定
$ embulk run config.yml
2020-01-29 07:26:44.438 +0000: Embulk v0.9.22
2020-01-29 07:26:45.808 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2020-01-29 07:26:50.567 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2020-01-29 07:26:51.804 +0000 [INFO] (main): Started Embulk v0.9.22
2020-01-29 07:26:52.027 +0000 [INFO] (0001:transaction): Loaded plugin embulk-input-postgresql (0.10.1)
2020-01-29 07:26:52.106 +0000 [INFO] (0001:transaction): JDBC Driver = /root/.embulk/lib/gems/gems/embulk-input-postgresql-0.10.1/default_jdbc_driver/postgresql-9.4-1205-jdbc41.jar
2020-01-29 07:26:52.119 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://postgres-db:5432/database options {ApplicationName=embulk-input-postgresql, user=root, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2020-01-29 07:26:52.226 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2020-01-29 07:26:52.250 +0000 [INFO] (0001:transaction): Using JDBC Driver PostgreSQL 9.4 JDBC4.1 (build 1205)
2020-01-29 07:26:52.335 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=4 / tasks=1
2020-01-29 07:26:52.348 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2020-01-29 07:26:52.591 +0000 [INFO] (0015:task-0000): Connecting to jdbc:postgresql://postgres-db:5432/database options {ApplicationName=embulk-input-postgresql, user=root, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2020-01-29 07:26:52.600 +0000 [INFO] (0015:task-0000): SQL: SET search_path TO "public"
2020-01-29 07:26:52.608 +0000 [INFO] (0015:task-0000): SQL: DECLARE cur NO SCROLL CURSOR FOR SELECT
  *
FROM
  table
order by id asc;

2020-01-29 07:26:52.611 +0000 [INFO] (0015:task-0000): SQL: FETCH FORWARD 10000 FROM cur
2020-01-29 07:26:52.615 +0000 [INFO] (0015:task-0000): > 0.00 seconds
2020-01-29 07:26:52.620 +0000 [INFO] (0015:task-0000): SQL: FETCH FORWARD 10000 FROM cur
2020-01-29 07:26:52.621 +0000 [INFO] (0015:task-0000): > 0.00 seconds
postgresの出力結果
postgresの出力結果
postgresの出力結果
postgresの出力結果
postgresの出力結果
postgresの出力結果
postgresの出力結果
2020-01-29 07:26:52.628 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2020-01-29 07:26:52.633 +0000 [INFO] (main): Committed.
2020-01-29 07:26:52.633 +0000 [INFO] (main): Next config diff: {"in":{},"out":{}}

guess.ymlファイルに環境変数を設定しファイルを生成する

ファイル拡張子に.liquidとつける「postgres-guess.yml.liquid」

  • liquid〜Embulkに備わっているテンプレートエンジン
  • 同じ設定を記載し続けて肥大化を避けるため

_env.yml.liquid

  • env設定ファイル
_env.yml.liquid
{% assign env_postgresql = 'postgresql' %}
{% assign env_host = '{host_name}' %}
{% assign env_user = '{user_name}' %}
{% assign env_password = '"{user_password}"' %}
{% assign env_database = '{database_name}' %}

guess.yml.liquid

  • 汎用設定が書かれたテンプレートファイル
    • 今回はpostgresなのでpostgres-guess.yml.liquidとしてます
postgres-guess.yml.liquid
{% include 'env' %}                          # これ必須
exec:
  min_output_tasks: 1                       # OutPutタスク数制御(1にしないとOutPutファイルが複数分割される)
in:
  type: {{ env_postgresql }}
  host: {{ env_host }}
  user: {{ env_user }}
  password: {{ env_password }}
  database: {{ env_database }}
  query: |
    SELECT 
      * 
    FROM 
      Table Name
    order by id asc;
out:
  type: stdout

実行ファイル生成

$ embulk guess ./postgres-guess.yml.liquid -o postgres-config.yml

dry-run

$ embulk preview postgres-config.yml

実行

$ embulk run postgres-config.yml

感想

  • 便利すぎて鼻血が出る
  • 基本的な処理などはembulk側がやってくれるのでどこにどうデータを集めてデータ分析を行なっていく化などの考える時間に割り当てられます
  • プラグインによっては差分で取得してくれるものもあるのでupdateの項目があるデータに関しては積極的に使ってみましょう
  • 次はdigdagについて書いてみようかと思います。

※1 guess 読み込んだデータファイルのレイアウトを推測してyamlファイルを生成する機能

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?