概要
Embulkとは?
データ処理タスクのためのオープンソース(Javaアプリケーション)のデータインポート・エクスポートツールのいわゆるETLになります。
データパイプラインを効率的かつ柔軟に構築するために設計され、大量のデータを処理する際のボトルネックであるデータの集約をEmbulkは非常に人気のあるツールであり、エコシステムが豊富で、多くのデータストアやデータベースとの当面の統合が可能です。
近頃のトレンドでは、1社で複数のオンプレアプリやSaaSを使用しており、Embulkを活用することでさまざまなプラグインがあり、多様なInput/Output処理が可能となり、便利なツールです。
embulkの特徴
embulkはプラグインアーキテクチャ?
これにより、異なるデータソースやデータストアに対して柔軟な接続が可能です。データの取り込み元や出力先に合わせて、適切なプラグインを選択することで、データパイプラインの柔軟性と拡張性を高めることができます。
大規模データ処理のサポートが可能?
Embulk は、大量のデータを高速かつマラソン処理することができます。
データ処理タスクを複数のスレッドやプロセスに分割し、メモリ処理を実行することで、パフォーマンスの向上を図りまた、データのバルクやインポートバルクエクスポートの機能も提供しています、データの移行やバッチ処理に最適なツールと言われます。
構成ファイルによる設定管理
Embulkでは、YAML形式の構成ファイルを使用して、データパイプラインの設定を管理します。 構成ファイルを編集することで、データソースの設定やフィルタリング、変換のルールを簡単にまた、複数のタスクを定義し、依存関係や実行順序をわかりやすく指定します。
Embulkの利用
Embulkは様々なデータ処理タスクに使用することができます。以下にEmbulk の代表的な利用例を紹介します。
embulkをインストールしたサーバにs3→bigqueryへデータ転送をする場合に活用方法です。
インストールと記載方法
・Embulkをインストール
curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc
・S3 inputプラグインのインストール
https://github.com/embulk/embulk-input-s3
$ embulk gem install embulk-input-s3
・filter-json_keyのインストール
https://github.com/civitaspo/embulk-filter-json_key
$embulk gem install embulk-filter-json_key
・BigQuery outputプラグインのインストール
https://github.com/embulk/embulk-output-bigquery
$ embulk gem install embulk-output-bigquery
処理ymlの中身
in:
type: s3
bucket:★S3のバケット名
path_prefix: ★S3に配置したファイルへのパス
endpoint:★エンドポイント。s3-us-west-1.amazonaws.comみたいなの
auth_method: basic
access_key_id:★AWSのアクセスキーID
secret_access_key:★AWSのシークレットアクセスキー
parser:
type: csv
delimiter: "\t"
charset: UTF-8
newline: CRLF
null_string: 'NULL'
skip_header_lines: 0
comment_line_marker: '#'
allow_extra_columns: true
columns:
- {name: json_payload, type: string}
filters:
- type: json_key
column: json_payload
nested_key_delimiter: "."
drop_keys:
- {key: "a.b.userId"}
out:
type: bigquery
mode: append
auth_method: json_key
json_keyfile:★サービスアカウントやユーザのシークレットキー(json)
project: ★GCPのプロジェクト
dataset: ★BigQueryのデータセット
table: ★BigQueryのテーブル
auto_create_dataset: true
auto_create_table: true
schema_file: insert_schema.json ★BQのテーブルのスキーマ
column_options:
- {name: json_payload, type: string }
timeout_sec: 300
open_timeout_sec: 300
retries: 3
path_prefix: temp_data
file_ext: .gz
delete_from_local_when_job_end: true
source_format: CSV
max_bad_records: 0
formatter:
type: jsonl
実行方法
実際の処理はrun実行ですが、preview実行によりdryrunも可能です。
$ embulk preview ~.yml
#実行時
$ embulk run ~.yml
ワークフローエンジンとの合わせ技
Digdagワークフローエンジンも活用してshファイルのymlファイルを日次の定期実行することが可能です。
0 0 * * * ~.sh処理1
0 10 * * * 処理2 # 処理1は10分以内には終わるはず
0 20 * * * 処理3 # 処理2は10分以内には終わるはず
最後に
パフォーマンスの最適化 Embulk のパフォーマンスはすでに優れていますが、大規模データ処理のさらなる最適化が進められています。データの分散処理や並列処理の改善、ストリーム処理のサポートなど、より高速かつ効率的なデータ処理が実現されることが期待されています。
学習コストは高く環境構築も時間がかかりますがデータベースを扱うエンジニアとしては面白いツールです。
参照資料
Embulkを使ってS3からBigQueryへデータをフィルタリングしてロードする
zozo TECH BLOG