AirbyteでBigQueryにデータ移送
データ分析チームを立ち上げ, データ分析から基盤整備まで取り組むハイボール大好きエンジニアです。
社内のデータ基盤刷新の際にAirbyteを導入したので、こちらでまとめようと思います。
Airbyteとは何か
公式サイト: https://airbyte.com/
OSSのELTツール。有料版もあるが、無料版でもGUIが使え様々なデータソースと接続できる十分な機能が備わっている。
※AirbyteはDBTと連携しておりETLツールとしても機能できるが今回は、独自のDBTとの接続は行わずデフォルトを使用する。
Airbyteを試してみる
実行環境: airbyte version 0.4.21
git clone https://github.com/airbytehq/airbyte.git
cd airbyte
docker-compose up
こちらにもある通り上のコードを実行することで、airbyteを始めることができる。
docker-composeを実行してからしばらくして上のログが出たら http://localhost:8000/でアクセスすることができる。
最新のversionでは、usernameとpasswordを入力しないとアクセスできなくなっているが、デフォルトではusername airbyte
, password password
となっている。変更は.env
ファイルで可能。
それに加えて、メールアドレスを登録する必要がある。
トップページは以下のようになっており、サイドバーにあるSourcesでデータソース, Destinationsでデータ移送先, Connectionsでデータソースから移送するデータを設定しデータ移送を実行することができる。
データを準備する
SourcesやDestinationが対応しているのは, Google Sheets, Google Analytics, GA4, Postgres, MySQL, ...etcと様々ある。
Sources用(MySQL)
MySQLのデータをDestinationに移送するために以下を用意した。
CREATE TABLE `demos` (
`id` int NOT NULL AUTO_INCREMENT,
`name` text,
`created` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
INSERT INTO `demos`
(name, created, modified)
VALUES
('apple', now(), now()),
('orange', now(), now()),
('banana', now(), now());
Destination用(BigQuery)
移送用にプロジェクト, データセットを作成しておく必要がある。
また接続時にはサービスアカウントが必要で、Keyのjsonコードが必須であるため準備しておく。
詳しくはこちら
SourcesにMySQLを登録する
作成したデータの情報を元にSourcesの情報を記入し接続する。接続が成功したら次はDestinationを設定する。
DestinationにBigQueryを登録する
準備したプロジェクト, データセット, サービスアカウント鍵を登録する。
接続が成功したらConnectionsを設定する。
Connectionsを設定する。
Connectionsは、登録したSourcesとDestinationを接続する場所。
-
Connection name: Connectionの名前を設定する。従来では存在しなかったが著者が苦労したAirbyteの課題(別記事で執筆予定)を解決するための識別子として便利になったと思う。
-
Transfer: データ移送を行うスケジュールを設定する。時間指定よりかは何時間後でスケジュールを選択する。
-
Streams: データ移送の仕方を設定する場所。
- Mirror source structure:
Sourcesで設定したデータベースに基づいてDestinationに移送する。つまり, demosテーブルがデータベース名freeに作成されていたらDestinationのデータセットにfreeが作成され移送されることになる。 - Destination default:
Destinationで設定したdefault datasetにデータが移送される。 - Custom format: 独自に設定できる。
「Namespace custom format」はデータ移送先のデータセットを指す。デフォルトの${SOURCE_NAMESPACE}は「Mirror source structure」と同意義である。
※存在していないデータセットを記入すると新たにデータセットが作成される。
- Mirror source structure:
-
Activate the streams you want to sync: 移送するデータの選択や移送の方法を選択する場所。
-
Full Refresh - Overwrite
実行する度、既存のレコードは削除され全てのデータが新しく保存される。
-
Full Refresh - Append
実行する度、既存のレコードはそのままで全てのデータが新しく追加される。
-
Incremental Sync - Append
実行する度、
Cursor field
で指定したカラムに変化(新しいデータ)があれば新しくデータを追加する。 -
Incremental Sync - Deduped History
実行する度、
Primary key
と指定したCursor field
を監視し、新しいデータは追加され既存のデータで変更があったデータは更新される。
-
-
Normalization & Transformation: ここは、DBTによるデータ加工を取り入れるかどうかを選択する。加工したデータをDestinationに移送したい場合は、こちらを使用する。
データの確認
Connectionsを設定した後, 「Sync now」を押すことでデータ移送が実行される。
以下のようにBigQueryにテーブルが作成できていれば成功!
メタデータ管理のために、Prefixが_airbyteのテーブルも作成される。
移送方法は色々試してみてください!
まとめ
Airbyteではコストがかからずサクッとデータ移送できるので便利かと思います。
APIもあるのでこちらも是非覗いてみてください。
https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#auth