Snowflakeアドベントカレンダー2024です。あらゆるアドカレ初参加です。
今年からSnowflakeをAribyte→Snowflake→dbtってフローでいろいろ触ってるのでコミュニティに知見を返したいので記事書いて行きます(真の目的は自分の備忘録
あんまりAirbyteを使ってるって話を聞かないので他の人にも使って(困りごとを解決したい)もらいたいという気持ちで記事を書いています。
Airbyteについて
割と勢いのあるELTツールです。
クラウドサービスで提供しており、Fivetranの対抗的なやつかと思います。日本だとTroccoさんとかもありますね。AirbyteはOSS展開しており、地味に複雑なライセンスしています。
基本はELv2ライセンスなのでマネージドサービスとして提供することをしないで自分でホストして使う分にはOSS的に使えます。またセルフホストエンタープライズ版なんていうのもあります。
割と探すと日本語圏でも使ってる人はいるようです。OSS的に自分で動かしてるパターンが多い気がします。自分で動かせるのでネットワークの中のデータベースにつないでプライベートリンクでデータ転送するユースケースに便利って気がしています。あとケチな人。
Airbyteのインストール
Airbyteは2024年9月にv1がリリースされて、インストールとかのコントロールがabctlが基本になって、docker-composeが廃止になりました。インストールはとってもかんたんでLinuxかMac環境だと
curl -LsfS https://get.airbyte.com | bash -
でとりあえずabctlが入り
abctl local install
でairbyteが起動します。(インストールって言ってるけど起動です)
こんな感じ。いっぱいイメージ落としてくるのでdocker hubの制限には注意が必要だったり。
$ abctl local install
INFO Using Kubernetes provider:
Provider: kind
Kubeconfig: /home/myu65/.airbyte/abctl/abctl.kubeconfig
Context: kind-airbyte-abctl
SUCCESS Found Docker installation: version 27.3.1
INFO No existing cluster found, cluster 'airbyte-abctl' will be created
SUCCESS Port 8000 appears to be available
▀ Creating cluster 'airbyte-abctl' (16s)
以下略
http://localhost:8000 にアクセスするとメールアドレスの設定画面とかがでてきます。で、
abctl local credentials
とコマンド打つとパスワードが生成されます。
コマンドで個別に設定もできます
abctl local credentials --email user@company.example
abctl local credentials --password new_password
なんか設定ファイル作って事前設定もできるようです。
https://docs.airbyte.com/deploying-airbyte/integrations/authentication
Dockerが必要だったりするのでDockerのインストールは事前に必要です。なお旧docker-compose版からの移行はabctl local install --migrate
でいけるのでとっても簡単でした。
abctlがなにやってるかというと、kubectlでk8sのクラスタをコントロールするdockerイメージを動かして、このイメージがairbyteの色々なイメージを引っ張ってきたりしています。
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
c84b63c512d5 kindest/node:v1.29.10 "/usr/local/bin/entr…" 22 minutes ago Up 22 minutes 0.0.0.0:8000->80/tcp, 127.0.0.1:36833->6443/tcp airbyte-abctl-control-plane
データ転送とかはソースや行き先それぞれのイメージがあってバージョンのイメージを引いてきてる感じです。
kube的な設定をしたいときはabctl local install --values ./values.yaml
でvalues.yamlにいろいろ書けばいいようです。
(自分はスキーマが大きすぎると転送できなくなるバグを踏んでて(よくある16MBくらいの転送制限)これを解決する設定方法がわからなくて困っています。誰か助けて https://github.com/airbytehq/airbyte/issues/4564 https://github.com/airbytehq/airbyte/issues/37463)
ちなみに停止する方法はabctl local uninstall
でこれはdockerとめてる感じのうごきになります。
(いまこの記事書くためにサラの環境でこれやったらコネクタ情報とか保存されてない…消してないはずなのに…なぜ…公開までにわかったら追記します。
Snowpark Container Servicesで動かす方法は…わかりません。k8sわからんですわ。EC2でt2.largeでスワップマシマシくらいで十分動きます。
Airbyteのソースへの接続
Airbyteには「ソース(source)」「行き先(destination)」「接続(connection)」の3種類の設定があります。(逆に他にはほとんど設定がない)
ソースのデータを取得して行き先に入れる。そのペアをコネクションで管理するって感じです。
とりあえずコネクションを作っていきましょう。何もない状態で最初のコネクションを作る通すと下のような画面になります。
かなりの種類のソースコネクタがあります。なくてもAPIで叩けるものはAPIビルダーで自分で作れます。kintoneやSansanは少なくともAPIビルダーで叩けます。叩けました。APIビルダーについては次の記事くらいしか日本語情報ないです。ややクセがありますが、なれたらサクサク作れます。
* つまり、なんでもソースとして登録できるということです。
たとえばPostgressだとこんな感じです。
右側に(英語ですが)説明が書いてあってそれを参考にしながらホストとかポートとかユーザーネームとかの接続情報を埋めていきます。
うまくいくとソースとして登録されます。
Salesforceのソースは接続にSalesforce側でアプリ登録したりして、そこの設定がむずかしかった…。けど、できます。
AirbyteのSnowflakeへの接続
destinationもソースとおなじのりで設定していきます。ほんとにSnowflakeの設定を埋めてくだけ。
ネットワークさえうまく行ってればprivatelinkもいけます。
keypair認証も対応してますね。
ソースと行き先両方うまく設定できたら
こんな感じの画面になって、Streamってところにテーブル一覧が出てきます。
やることは、テーブルと列名を選ぶだけ!!!!
モードはfull refreshとincremental(増分更新)が選べます。増分更新の場合は、primary keyとcursorを選ぶ必要があります。プリマリーキーはだいたいデータベース側で指定されててあんまり気にしなくていいですが、組み合わせでユニークになるキーですね。カーソルはわかりにくいんですが、更新日列(日付型)を選ぶやつです。これを見て増分があったと判定します。少なくとも使ってるバージョンでは日付型じゃないの選ぶとエラー吐きました。ここで細かい設定はあんまりできません。更新日がいい感じの列ない場合はあきらめてfull refreshにするしかありません。また、データベースによりますがcursorの列はindex指定しといたほうがいいです。CDCとか使えるデータベースだと設定があったりします。
で、コネクションの名前や、行き先(Snowflake)で作るスキーマの名前(もとのソースのデータベース+スキーマ名みたいなのがデフォルトだけどたまにセンスないので自分でつける)、スケジュールなんかを設定します。
スケジュールに関してはAirflowとかDagsterとかで叩くならマニュアルにしとけばいいはずです。AirbyteのAPIをAirflowが叩いてる形になります。
スキーマ変更を検知したときの動作や列名変更あったときに過去に遡って取得するか?とかの設定もこのへんにあります。うまくいけば列が増えてても検知したら過去のデータ拾い直してくれたりします(コストかかるからデフォルトでオフだったりします
バグっぽいはなし
現行最新のSnowflake destinationコネクタ(3.15.2)はバグがあるっぽくて、うまく動きません。3.11.11なら動きます。コントリビューターになってなおすしか…ないのか…?
Snowflakeのdestinationコネクタで自分が認知しているバグは、「日本語テーブル名がSQLエラーになる」「テーブル新規作成時にエラーになる」など。そのうち治るとは思うけど3.11.11なら動くのは確認済み。
settingのdestinationのところから使ってるコネクタの編集ボタンを押して、バージョンの数字を自分で手入力するだけで任意のコネクタバージョンにかえることができます。
AirbyteのSnowflakeへのデータ送信の動き
- コネクタのイメージをチェック。
上記のように任意のバージョンのコネクタに指定できます。コネクタのdocke imageがなければdocker hubから落としてきます。 - 前準備
Snowflakeの場合だと、airbyte内部用のテーブル、stageを作ったり、最終型になるテーブルを作ったりします。 - データの取得
ソースからデータをとってきます。なんか様子見ながら何行くらいとるか勝手に決めて勝手によしなにやってくれます。基本的にはselectクエリを自動発行してるのでもとのデータソースのDB性能にとても影響をうけます。 - ステージにcsvファイルをなげつける
ある程度データが溜まったらcsv.gzにしてステージに送りつけて、copy intoクエリで内部テーブルに放り込みます。このときのcsvはairbyteのメタデータと元のテーブルのdataはairbyte_data列にkey value型で持っている感じです。 - ステージのデータを内部テーブルにcopy intoする
4が終わったらすぐします。データの取得が終わるまで3,4,5をひたすら繰り返します。で、この頻度を調節できないのでSnowflakeユーザーとしてはつらいところがあって、数GBのデータを転送するのに200MBくらい溜まったら転送してcopy intoを発行したりします。つまり200MB貯まるのに1分くらいかかると、1分に1回クエリが動くのでWH最小課金起動時間が1分なSnowflakeの課金体系とあんまり相性が…よくない。。。現状不満点はほぼこのタイミングだけです。
で、copy intoできたらステージにあるcsvファイルは即削除されている(はず)です。
6. データ取り終わったらテーブルに整形
中間テーブルにあるデータを最終テーブルにinsert intoでいれます。airbyte_dataでkvなオブジェクト型でもってたのを元の型に戻していれています。あと、ステージも削除はこのへんでしてるっぽい。
おしまい!
途中でエラーが起きると、自動でリトライしてくれます。よくあるのは元のデータベースが応答しなくなったとか、ネットワーク悪くて応答ないとか。ちょっとくらいなら勝手になんとかしてくれます。タスク内部で何回かリトライしてだめなら一旦エラーでとめて、attemptという形で4回くらいリトライしてくれます。データがうまくとれてて途中までちゃんと取れてると続きからちゃんとやってくれます。場合によってはそのデータ取得の最初からやり直ししてくれる場合もあります。件数とか数えたり日付見たりそのへんのチェックは頑張ってる。
full refreshの場合だとソースの全データを、incrementalの場合だと、airbyteがSnowflakeのAirbyte用メタデータテーブルにある前回の更新日時を参考にしてcursorの列の日付と比較するクエリを自動発行して絞り込みをしています。dedup(重複削除)をつけてるとprimary keyが同じやつを上書きする動作になります。この上書きは、最後の6のときの更新クエリで実行してます(たぶん)
結局、内部テーブルっていってるところに過去とったデータが全部入ってる(はず)で、そっからいい感じのクエリを発行することで重複削除してデータの最新テーブルをつくるとかやってるわけです。
動作のログみてるとこの辺の実クエリがみえたりするので面白い。
(ソースも公開されてるし)
おわりに
もうちょっと色々書こうと思ったけどAirbyteがどうやってSnowfalkeにデータを入れているかという話になりました。他のELTツールってどうやってるんだろうね。
Snowflakeを直接という話ではないですが、Airbyteの中ではSnowfalkeのステージを使ってデータを入れているよって話でした。
本当はdbtで外部ネットワークでOpenAIのベクトル化かけて貯めるところまで書こうと思ってたんですがAirbyteのところだけで力尽きました。