1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

SnowflakeAdvent Calendar 2024

Day 3

Airbyte (OSS)を使ってSnowflakeにデータを入れる話(Airbyteの挙動詳細版)

Last updated at Posted at 2024-12-02

Snowflakeアドベントカレンダー2024です。あらゆるアドカレ初参加です。

今年からSnowflakeをAribyte→Snowflake→dbtってフローでいろいろ触ってるのでコミュニティに知見を返したいので記事書いて行きます(真の目的は自分の備忘録

あんまりAirbyteを使ってるって話を聞かないので他の人にも使って(困りごとを解決したい)もらいたいという気持ちで記事を書いています。

Airbyteについて

割と勢いのあるELTツールです。

クラウドサービスで提供しており、Fivetranの対抗的なやつかと思います。日本だとTroccoさんとかもありますね。AirbyteはOSS展開しており、地味に複雑なライセンスしています。

基本はELv2ライセンスなのでマネージドサービスとして提供することをしないで自分でホストして使う分にはOSS的に使えます。またセルフホストエンタープライズ版なんていうのもあります。

割と探すと日本語圏でも使ってる人はいるようです。OSS的に自分で動かしてるパターンが多い気がします。自分で動かせるのでネットワークの中のデータベースにつないでプライベートリンクでデータ転送するユースケースに便利って気がしています。あとケチな人。

Airbyteのインストール

Airbyteは2024年9月にv1がリリースされて、インストールとかのコントロールがabctlが基本になって、docker-composeが廃止になりました。インストールはとってもかんたんでLinuxかMac環境だと
curl -LsfS https://get.airbyte.com | bash -でとりあえずabctlが入り

abctl local installでairbyteが起動します。(インストールって言ってるけど起動です)

こんな感じ。いっぱいイメージ落としてくるのでdocker hubの制限には注意が必要だったり。

$ abctl local install
  INFO    Using Kubernetes provider:
            Provider: kind
            Kubeconfig: /home/myu65/.airbyte/abctl/abctl.kubeconfig
            Context: kind-airbyte-abctl
 SUCCESS  Found Docker installation: version 27.3.1     
 INFO    No existing cluster found, cluster 'airbyte-abctl' will be created             
 SUCCESS  Port 8000 appears to be available                      
 ▀ Creating cluster 'airbyte-abctl' (16s)
 以下略

http://localhost:8000 にアクセスするとメールアドレスの設定画面とかがでてきます。で、
abctl local credentialsとコマンド打つとパスワードが生成されます。
image.png

コマンドで個別に設定もできます
abctl local credentials --email user@company.example
abctl local credentials --password new_password
なんか設定ファイル作って事前設定もできるようです。
https://docs.airbyte.com/deploying-airbyte/integrations/authentication

Dockerが必要だったりするのでDockerのインストールは事前に必要です。なお旧docker-compose版からの移行はabctl local install --migrateでいけるのでとっても簡単でした。

abctlがなにやってるかというと、kubectlでk8sのクラスタをコントロールするdockerイメージを動かして、このイメージがairbyteの色々なイメージを引っ張ってきたりしています。

$ docker ps
CONTAINER ID   IMAGE                   COMMAND                   CREATED          STATUS          PORTS                                             NAMES
c84b63c512d5   kindest/node:v1.29.10   "/usr/local/bin/entr…"   22 minutes ago   Up 22 minutes   0.0.0.0:8000->80/tcp, 127.0.0.1:36833->6443/tcp   airbyte-abctl-control-plane

データ転送とかはソースや行き先それぞれのイメージがあってバージョンのイメージを引いてきてる感じです。

kube的な設定をしたいときはabctl local install --values ./values.yamlでvalues.yamlにいろいろ書けばいいようです。
(自分はスキーマが大きすぎると転送できなくなるバグを踏んでて(よくある16MBくらいの転送制限)これを解決する設定方法がわからなくて困っています。誰か助けて https://github.com/airbytehq/airbyte/issues/4564 https://github.com/airbytehq/airbyte/issues/37463)

ちなみに停止する方法はabctl local uninstallでこれはdockerとめてる感じのうごきになります。
(いまこの記事書くためにサラの環境でこれやったらコネクタ情報とか保存されてない…消してないはずなのに…なぜ…公開までにわかったら追記します。

Snowpark Container Servicesで動かす方法は…わかりません。k8sわからんですわ。EC2でt2.largeでスワップマシマシくらいで十分動きます。

Airbyteのソースへの接続

Airbyteには「ソース(source)」「行き先(destination)」「接続(connection)」の3種類の設定があります。(逆に他にはほとんど設定がない)

ソースのデータを取得して行き先に入れる。そのペアをコネクションで管理するって感じです。

とりあえずコネクションを作っていきましょう。何もない状態で最初のコネクションを作る通すと下のような画面になります。
image.png

かなりの種類のソースコネクタがあります。なくてもAPIで叩けるものはAPIビルダーで自分で作れます。kintoneやSansanは少なくともAPIビルダーで叩けます。叩けました。APIビルダーについては次の記事くらいしか日本語情報ないです。ややクセがありますが、なれたらサクサク作れます。

* つまり、なんでもソースとして登録できるということです。

たとえばPostgressだとこんな感じです。
image.png
右側に(英語ですが)説明が書いてあってそれを参考にしながらホストとかポートとかユーザーネームとかの接続情報を埋めていきます。

うまくいくとソースとして登録されます。

Salesforceのソースは接続にSalesforce側でアプリ登録したりして、そこの設定がむずかしかった…。けど、できます。

AirbyteのSnowflakeへの接続

image.png

destinationもソースとおなじのりで設定していきます。ほんとにSnowflakeの設定を埋めてくだけ。

ネットワークさえうまく行ってればprivatelinkもいけます。

keypair認証も対応してますね。

ソースと行き先両方うまく設定できたら

こんな感じの画面になって、Streamってところにテーブル一覧が出てきます。

image.png

やることは、テーブルと列名を選ぶだけ!!!!

モードはfull refreshとincremental(増分更新)が選べます。増分更新の場合は、primary keyとcursorを選ぶ必要があります。プリマリーキーはだいたいデータベース側で指定されててあんまり気にしなくていいですが、組み合わせでユニークになるキーですね。カーソルはわかりにくいんですが、更新日列(日付型)を選ぶやつです。これを見て増分があったと判定します。少なくとも使ってるバージョンでは日付型じゃないの選ぶとエラー吐きました。ここで細かい設定はあんまりできません。更新日がいい感じの列ない場合はあきらめてfull refreshにするしかありません。また、データベースによりますがcursorの列はindex指定しといたほうがいいです。CDCとか使えるデータベースだと設定があったりします。

で、コネクションの名前や、行き先(Snowflake)で作るスキーマの名前(もとのソースのデータベース+スキーマ名みたいなのがデフォルトだけどたまにセンスないので自分でつける)、スケジュールなんかを設定します。
image.png

スケジュールに関してはAirflowとかDagsterとかで叩くならマニュアルにしとけばいいはずです。AirbyteのAPIをAirflowが叩いてる形になります。

スキーマ変更を検知したときの動作や列名変更あったときに過去に遡って取得するか?とかの設定もこのへんにあります。うまくいけば列が増えてても検知したら過去のデータ拾い直してくれたりします(コストかかるからデフォルトでオフだったりします

バグっぽいはなし

現行最新のSnowflake destinationコネクタ(3.15.2)はバグがあるっぽくて、うまく動きません。3.11.11なら動きます。コントリビューターになってなおすしか…ないのか…?

Snowflakeのdestinationコネクタで自分が認知しているバグは、「日本語テーブル名がSQLエラーになる」「テーブル新規作成時にエラーになる」など。そのうち治るとは思うけど3.11.11なら動くのは確認済み。

image.png

settingのdestinationのところから使ってるコネクタの編集ボタンを押して、バージョンの数字を自分で手入力するだけで任意のコネクタバージョンにかえることができます。

AirbyteのSnowflakeへのデータ送信の動き

  1. コネクタのイメージをチェック。
    上記のように任意のバージョンのコネクタに指定できます。コネクタのdocke imageがなければdocker hubから落としてきます。
  2. 前準備
    Snowflakeの場合だと、airbyte内部用のテーブル、stageを作ったり、最終型になるテーブルを作ったりします。
  3. データの取得
    ソースからデータをとってきます。なんか様子見ながら何行くらいとるか勝手に決めて勝手によしなにやってくれます。基本的にはselectクエリを自動発行してるのでもとのデータソースのDB性能にとても影響をうけます。
  4. ステージにcsvファイルをなげつける
    ある程度データが溜まったらcsv.gzにしてステージに送りつけて、copy intoクエリで内部テーブルに放り込みます。このときのcsvはairbyteのメタデータと元のテーブルのdataはairbyte_data列にkey value型で持っている感じです。
  5. ステージのデータを内部テーブルにcopy intoする
    4が終わったらすぐします。データの取得が終わるまで3,4,5をひたすら繰り返します。で、この頻度を調節できないのでSnowflakeユーザーとしてはつらいところがあって、数GBのデータを転送するのに200MBくらい溜まったら転送してcopy intoを発行したりします。つまり200MB貯まるのに1分くらいかかると、1分に1回クエリが動くのでWH最小課金起動時間が1分なSnowflakeの課金体系とあんまり相性が…よくない。。。現状不満点はほぼこのタイミングだけです。

で、copy intoできたらステージにあるcsvファイルは即削除されている(はず)です。
6. データ取り終わったらテーブルに整形
中間テーブルにあるデータを最終テーブルにinsert intoでいれます。airbyte_dataでkvなオブジェクト型でもってたのを元の型に戻していれています。あと、ステージも削除はこのへんでしてるっぽい。

この辺の動作はログみてればわかります
image.png

おしまい!

途中でエラーが起きると、自動でリトライしてくれます。よくあるのは元のデータベースが応答しなくなったとか、ネットワーク悪くて応答ないとか。ちょっとくらいなら勝手になんとかしてくれます。タスク内部で何回かリトライしてだめなら一旦エラーでとめて、attemptという形で4回くらいリトライしてくれます。データがうまくとれてて途中までちゃんと取れてると続きからちゃんとやってくれます。場合によってはそのデータ取得の最初からやり直ししてくれる場合もあります。件数とか数えたり日付見たりそのへんのチェックは頑張ってる。

full refreshの場合だとソースの全データを、incrementalの場合だと、airbyteがSnowflakeのAirbyte用メタデータテーブルにある前回の更新日時を参考にしてcursorの列の日付と比較するクエリを自動発行して絞り込みをしています。dedup(重複削除)をつけてるとprimary keyが同じやつを上書きする動作になります。この上書きは、最後の6のときの更新クエリで実行してます(たぶん)

結局、内部テーブルっていってるところに過去とったデータが全部入ってる(はず)で、そっからいい感じのクエリを発行することで重複削除してデータの最新テーブルをつくるとかやってるわけです。

動作のログみてるとこの辺の実クエリがみえたりするので面白い。
(ソースも公開されてるし)

おわりに

もうちょっと色々書こうと思ったけどAirbyteがどうやってSnowfalkeにデータを入れているかという話になりました。他のELTツールってどうやってるんだろうね。

Snowflakeを直接という話ではないですが、Airbyteの中ではSnowfalkeのステージを使ってデータを入れているよって話でした。

本当はdbtで外部ネットワークでOpenAIのベクトル化かけて貯めるところまで書こうと思ってたんですがAirbyteのところだけで力尽きました。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?