概要
dbt with databricksではなく、dbt on databricks、つまり、Databricks上でdbt Coreを実行する方法を検証しましたので内容を共有します。本手順では、Databricksからdbtを経由してsnowflakeへ接続する手順を紹介します。
Databricksは、大量データの書き込みに最適なSpark コネクターを利用できることや静的パブリックIPアドレスを付与できることから、他データストアへのデータ連携のELTにおけるEL(データ書き込み)を実施するツールとして最適です。データ連携のELTにおけるT(データストア内でのデータ変換)のツールとして最適なdbtをdatabricksで実行できれば、最強のツールになるのではないかと考えて検証しました。
Databricks Repos上にdbtのコードを実行します。ただし、dbtのデバッグ時にのみ、下記仕様により、クラスターのストレージにコードをコピーしてから実行する必要があります。
- dbt debugを実行する場合、あるいは、--debugをつけて実行する場合には、dbtのコードと同一ディレクトリにある
logs/dbt.log
にログを書き込むというdbtの仕様があること - Files in ReposへPythonの関数から参照する場合には読み取りのみに制限されているため、デバッグ時にログファイル(logs/dbt.log)へ書き込むことができないこと
dbtから書き込むディレクトリの設定には下記のものがあり、log-path
のみdbfs上のディレクトリを指定するとエラーとなることに注意してください。log-path
のディレクトリを、クラスターのストレージ(例:/tmp/dbt_logs
)にて記載後、ログファイルを移動する必要があります。
target-path: '/dbfs/dbt_manabian/target'
packages-install-path: '/dbfs/dbt_manabian/dbt_packages'
log-path: '/tmp/dbt_logs'
dbtがどういったツールであるかを理解するためには、下記の記事を参考にしてください。
環境準備
Databricksクラスターにてdbfs上に配置してinitスクリプトを実行するように設定
sudo apt-get libpq-dev python-dev python3-pip
sudo apt-get remove python-cffi
sudo pip install --upgrade cffi
DatabricksはUbuntuで動作していることから、下記の記事を参考にしております。
引用元:Use pip to install dbt | dbt Docs (getdbt.com)
Databricksクラスターにて接続先のデータストアに応じたdbtのライブラリ(本手順ではdbt-snowflake==1.0.0)とcryptography(cryptography~=3.4)をインストール
dbt Coreがインストールされていることを確認
%sh
dbt --version
dbtのコードをDatabricks Repos上に配置
%sh
cd /Workspace/Repos/dbt_test/dbt_test
pwd;find . | sort | sed '1d;s/^\.//;s/\/\([^/]*\)$/|--\1/;s/\/[^/|]*/| /g'
dbtのコードをデバック
クラスターのストレージへdbtのコードを配置
# クラスターのストレージへdbt関連のコードを配置
in_dir_path = 'file:/Workspace/Repos/dbt_test/dbt_test/dbt'
out_dir_path = 'file:/dbt_manabian/dbt_test'
dbutils.fs.rm(out_dir_path, True)
dbutils.fs.cp(in_dir_path, out_dir_path, recurse=True)
クラスターのストレージ上のdbtのコードをデバッグ
%sh
export DBT_PROFILES_DIR=profiles/snowflake
cd /dbt_manabian/dbt_test
dbt debug
デバッグ時のログファイルを確認
import pprint
pprint.pprint(dbutils.fs.head('file:/dbt_manabian/dbt_test/logs/dbt.log'))
dbtのコードを実行
dbtのコードを実行
%sh
export DBT_PROFILES_DIR=profiles/snowflake
cd /Workspace/Repos/dbt_test/dbt_test/dbt
dbt deps
dbt docs generate
dbt run
ログファイルをdbfsにコピー
# ログファイルをdbfsにコピー
from datetime import datetime
# 現在の時分秒のUNIX時刻の文字列を取得
now_ts = str(int(datetime.now().timestamp()))
dbutils.fs.cp('file:/tmp/dbt_logs', f'dbfs:/dbt_manabian/dbt_log/{now_ts}', True)
dbt関連のファイルが生成されていることを確認
display(spark.read.json('/dbt_manabian/dbt_log/*/*'))
Databricks on dbtを利用する際の制約事象
Files in ReposへPythonの関数から参照する場合には読み取りのみに制限されているため、ログ等のファイルを書き込むことができない。
Daabricks Repos上にあるdbtのコードを実行するとlogs
配下にファイルを書き込む際にエラーとなります。
%sh
export DBT_PROFILES_DIR=profiles/snowflake
cd /Workspace/Repos/dbt_test/dbt_test
dbt debug
05:33:00 Encountered an error:
[Errno 38] Function not implemented: 'logs'
dbfs上にdbt.logを書き込むと後続のdbt処理がエラーとなる
下記のようにdebugを2回実行すると、2回目の処理がエラーとなります。
%sh
export DBT_PROFILES_DIR=profiles/snowflake
cd /dbfs/dbt_manabian/dbt_test
dbt debug
%sh
export DBT_PROFILES_DIR=profiles/snowflake
cd /dbfs/dbt_manabian/dbt_test
dbt debug
--- Logging error ---
Traceback (most recent call last):
File "/usr/lib/python3.8/logging/init.py", line 1089, in emit
self.flush()
File "/usr/lib/python3.8/logging/init.py", line 1069, in flush
self.stream.flush()
OSError: [Errno 95] Operation not supported