15
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

データ分析基盤・DataLakeを作成するツールとしてのEmbulk

Last updated at Posted at 2021-12-02

この記事はエイチーム引越し侍 / エイチームコネクトの社員による、Ateam Hikkoshi samurai Inc.× Ateam Connect Inc. Advent Calendar 2021 3日目の記事です。

はじめに

今日は、最近のお仕事で触ることになる可能性が高いEmbulkについて書きます。
触ることになるとあるようにEmbulkを使い始めるソフトウェアエンジニアによる記事です。説明に至らない点があったり間違っていたりするかもしれません。その際は編集リクエストなりコメントなりでご指摘いただけたら幸いです。

何に使うかですが、データ分析基盤、DataLake作成におけるETL処理のために使います。ちなみにETLはExtract、Transform、Loadの略でデータを抽出、変換・加工、ロード(Output)する処理のことです。
ETL処理ツールとしては過去Pentaho、Talendを少し触った経験があります。
RubyistなのでFluentdからのEmbulkは知っていたのですが、実際に触るのは初めてです。Embulk、勝手に中身Rubyかと思ってました。(メインがJavaで一部Rubyなんですね)

Embulkとは

Embulk(エンバルク)はbulk data loader です。はい、bulk load なのでデータを大量に且つ一括で登録するツールです。並列・分散実行も可能となっています。
Embulkの優れた点はプラグイン機構を導入していることによる対応フォーマット、対応データソースの多さです。
MySQLやBigQueryはもちろん、CSVといった物理ファイル、salesforceといったPaaSにも対応しています。

プラグイン一覧はこちら↓

導入方法

導入方法については上記記載の公式URLにあるので割愛します。
実際に運用する際はFargateなりECSでDockerイメージを使うことになるかと思います。

使い方

公式にembulk exampleというサンプル用コマンドが用意されているのでそれを叩いてみます。

$ embulk example ./try1
2021-12-03 02:39:19.977 +0900: Embulk v0.9.23
Creating ./try1 directory...
  Creating ./try1/
  Creating ./try1/csv/
  Creating ./try1/csv/sample_01.csv.gz
  Creating ./try1/seed.yml

Run following subcommands to try embulk:

   1. embulk guess ./try1/seed.yml -o config.yml
   2. embulk preview config.yml
   3. embulk run config.yml

exampleコマンドを実行すると上記の通り~/try1というディレクトリが作成されその配下にサンプル用ファイルが格納されます。
ディレクトリ内の構成は以下でした。

$ tree try1/
try1/
├── csv
│   └── sample_01.csv.gz
└── seed.yml

そして次にguessコマンドを実行するのですが、これがEmbulkの特徴の1つです。まずは実行してみます。

$ embulk guess ./try1/seed.yml -o config.yml

2021-12-03 02:39:36.109 +0900: Embulk v0.9.23
2021-12-03 02:39:37.255 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2021-12-03 02:39:40.522 +0900 [INFO] (main): Gem's home and path are set by default: "/Users/h-1390/.embulk/lib/gems"
2021-12-03 02:39:41.612 +0900 [INFO] (main): Started Embulk v0.9.23
2021-12-03 02:39:41.932 +0900 [INFO] (0001:guess): Listing local files at directory '/Users/h-1390/./try1/csv' filtering filename by prefix 'sample_'
2021-12-03 02:39:41.934 +0900 [INFO] (0001:guess): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2021-12-03 02:39:41.935 +0900 [INFO] (0001:guess): Loading files [/Users/h-1390/./try1/csv/sample_01.csv.gz]
2021-12-03 02:39:41.946 +0900 [INFO] (0001:guess): Try to read 32,768 bytes from input source
2021-12-03 02:39:42.026 +0900 [INFO] (0001:guess): Loaded plugin embulk (0.9.23)
2021-12-03 02:39:42.063 +0900 [INFO] (0001:guess): Loaded plugin embulk (0.9.23)
2021-12-03 02:39:42.109 +0900 [INFO] (0001:guess): Loaded plugin embulk (0.9.23)
2021-12-03 02:39:42.133 +0900 [INFO] (0001:guess): Loaded plugin embulk (0.9.23)
in:
  type: file
  path_prefix: /Users/h-1390/./try1/csv/sample_
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: LF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    null_string: 'NULL'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out: {type: stdout}

Created 'config.yml' file.

上記にあるようにconfig.ymlファイルが出力されました。引数として指定してあるseed.ymlは以下です。

seed.yml
in:
  type: file
  path_prefix: '/Users/h-1390/./try1/csv/sample_'
out:
  type: stdout

in:配下に記載されているのがInputとして使用するデータ定義、out:配下に記載するのがOutputするデータ定義です。
上記の場合はInputは実ファイルで任意のディレクトリにプレフィックス指定された形で任意の数のファイルがある、とわかります。(pathでCSVと推測できますが)ファイル形式は指定されていません。
上記内容を引数として出力guessコマンドを実行して出力されたのが以下です。

config.yml
in:
  type: file
  path_prefix: /Users/h-1390/./try1/csv/sample_
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: LF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    null_string: 'NULL'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out: {type: stdout}

出力されたファイルには、引数として与えた元ファイル内に記載されたCSVの構造・ヘッダー情報が出力されています。
つまり、指定されたファイル(郡)から形式、カラム数カラム名、型を推測し、引数として渡したymlへ追記した形で定義ファイルを作成してくれるのです。
こういう、**「形式が決まってて頑張れば自動化できるけど手でやってもそんなにかからない(から結局自動化しないで毎回手でやっちゃうような)作業」**を自動化してくれるのは非常にありがたく、感謝しかありません。

どうやって形式や型を推測しているのだろう…と気になり、csvという名称が含まれるディレクトリパス(これは推測材料にしてないと思う)や拡張子(これは判断材料にしているかも)を別名にしてコマンドを叩き直したりしましたが、ちゃんとCSVと推測されてました。おそらくDecorder,Parserを動かして成功するかどうかとかで判断しているんだと思います。

この出力された定義ファイルを引数としてbulk loadを実行します。定義ファイルはYml形式です。Yml形式だと何が嬉しいのか。
はい。Git管理ができますね。ここソフトウェアエンジニアとしては嬉しいポイントです。定義ファイルがバイナリだったりすると管理がなかなか大変だったりするので。

さて、previewコマンドではどのような出力(Output)になるのかを確認できます。

$ embulk preview config.yml
2021-12-03 02:42:49.234 +0900: Embulk v0.9.23
2021-12-03 02:42:50.382 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2021-12-03 02:42:53.580 +0900 [INFO] (main): Gem's home and path are set by default: "/Users/h-1390/.embulk/lib/gems"
2021-12-03 02:42:54.539 +0900 [INFO] (main): Started Embulk v0.9.23
2021-12-03 02:42:54.694 +0900 [INFO] (0001:preview): Listing local files at directory '/Users/h-1390/./try1/csv' filtering filename by prefix 'sample_'
2021-12-03 02:42:54.695 +0900 [INFO] (0001:preview): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2021-12-03 02:42:54.697 +0900 [INFO] (0001:preview): Loading files [/Users/h-1390/./try1/csv/sample_01.csv.gz]
2021-12-03 02:42:54.704 +0900 [INFO] (0001:preview): Try to read 32,768 bytes from input source
+---------+--------------+-------------------------+-------------------------+----------------------------+
| id:long | account:long |          time:timestamp |      purchase:timestamp |             comment:string |
+---------+--------------+-------------------------+-------------------------+----------------------------+
|       1 |       32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC |                     embulk |
|       2 |       14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC |               embulk jruby |
|       3 |       27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin |
|       4 |       11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC |                            |
+---------+--------------+-------------------------+-------------------------+----------------------------+

そして実行はrunです。サンプルではout: {type: stdout}なので標準出力にそのまま出力されます。

$ embulk run config.yml
2021-12-03 02:43:48.603 +0900: Embulk v0.9.23
2021-12-03 02:43:49.935 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2021-12-03 02:43:53.109 +0900 [INFO] (main): Gem's home and path are set by default: "/Users/h-1390/.embulk/lib/gems"
2021-12-03 02:43:54.129 +0900 [INFO] (main): Started Embulk v0.9.23
2021-12-03 02:43:54.251 +0900 [INFO] (0001:transaction): Listing local files at directory '/Users/h-1390/./try1/csv' filtering filename by prefix 'sample_'
2021-12-03 02:43:54.253 +0900 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2021-12-03 02:43:54.254 +0900 [INFO] (0001:transaction): Loading files [/Users/h-1390/./try1/csv/sample_01.csv.gz]
2021-12-03 02:43:54.306 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2021-12-03 02:43:54.318 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
1,32864,2015-01-27 19:23:49,20150127,embulk
2,14824,2015-01-27 19:01:23,20150127,embulk jruby
3,27559,2015-01-28 02:20:02,20150128,Embulk "csv" parser plugin
4,11270,2015-01-29 11:54:36,20150129,
2021-12-03 02:43:54.426 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2021-12-03 02:43:54.433 +0900 [INFO] (main): Committed.
2021-12-03 02:43:54.433 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"/Users/h-1390/./try1/csv/sample_01.csv.gz"},"out":{}}

サンプルではシンプルなデータ出力でしたが、プラグインを導入することでデータの加工(ETL)も可能です。また、MySQL等SQLが使えるデータソースの場合はSQLでin:を記述できるため、SQLでデータセットのJOIN・加工しそれをOutput先に登録も可能です。

実際の運用時には最初に一度今あるデータをすべて移行して、その後は差分移行という形になると思いますが、もちろんそういった差分更新にも対応しています。
SQLで記述できる場合はシンプルにupdate_atの条件指定をしてN日前にしたり前回実行時のタイムスタンプを参照するようにしたり、実現方法は複数あります。

Embulkまとめ

  • bulk data loader
  • プラグイン方式で様々なデータソースに対応している
  • 定義ファイル(Yml)を用いてI/Oを定義する
  • 定義ファイルはInputから推測して生成可能
  • 定義ファイルがYmlなのでバージョン管理(Git)が容易

おまけ: なぜEmbulkなのか

データ分析基盤・DataLakeの構築の手段は複数存在し、サービスもいくつかあります。その中でなぜEmbulkなのか、について。
データ分析基盤を構築する上での採用事例が複数存在する点、プラグインによって独自のデータセットがあったとしても柔軟に対応できる、等複数ありますが、社内に有識者が複数いる点というのも強いです。
すでに運用実績がある場合、外部に情報として出てこない深い話や情報があるため、導入の障壁が下がります。
身近に気軽に質問できる有識者がいる(今はリモートメインなのでビデオ越しですが)という点は大きいです。

話はそれますが、逆に社内に導入事例が無くても、誰かにその技術知見やモチベーションがある場合その人が旗振り役となって導入するという流れももちろんあります。
今、我々が実現したいことを最も簡単に実現できる方法は何か。それは保守性があるか・マネージドな状態にできるか等をそれなりに考えた結果の技術選定です。

次回

Ateam Hikkoshi samurai Inc.× Ateam Connect Inc. Advent Calendar 2021 3日目の記事は以上です。
読んでくださった方、ありがとうございます。参考になれば幸いです。
明日は @yhorikawa です。ご期待ください!!

15
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?