はじめに
今回は、Docker Composeを使ってDuckDBとdbtをセットアップし、Incremental
モデルを活用した増分更新の実装手順を解説します。dbt
モデルの設定、依存関係の管理、データのロード順序を含む包括的な手順を試してみます。
1. Docker環境の構築
a. Dockerfileの作成
プロジェクトルートに Dockerfile
を作成し、以下を記述します:
FROM python:3.9-slim
RUN apt-get update && apt-get install -y \
curl \
unzip \
git \
libpq-dev && \
apt-get clean
RUN curl -L https://github.com/duckdb/duckdb/releases/download/v0.8.1/duckdb_cli-linux-amd64.zip -o duckdb.zip && \
unzip duckdb.zip && \
mv duckdb /usr/local/bin/duckdb && \
chmod +x /usr/local/bin/duckdb && \
rm duckdb.zip
# dbt-coreとdbt-duckdbをインストール
RUN pip install --upgrade pip && \
pip install dbt-core==1.8.9 dbt-duckdb==1.8.4
WORKDIR /dbt_project
COPY . /dbt_project
CMD ["tail", "-f", "/dev/null"]
b. Docker Composeファイルの作成
プロジェクトルートに docker-compose.yml
を作成し、以下を記述します:
services:
dbt_duckdb:
build: .
container_name: dbt_duckdb
volumes:
- .:/dbt_project
- ~/.dbt:/root/.dbt
working_dir: /dbt_project
tty: true
c. Docker環境のビルドと起動
-
Dockerイメージのビルドとコンテナ起動
以下のコマンドを実行してDockerイメージをビルドし、コンテナを起動します:docker compose up --build -d
2. dbtプロジェクトのセットアップ
a. プロジェクトの初期化
-
dbtプロジェクトを作成
コンテナ内で以下を実行してdbtプロジェクトを初期化します:dbt init dbt_project cd dbt_project
-
dbt_project.yml
の内容確認
作成されたdbt_project.yml
ファイルのprofile
がdbt_project
であることを確認します。
b. profiles.ymlの作成と接続確認
-
プロファイルの設定
コンテナ内で.dbt/profiles.ymlを作成し、以下の内容を記述しますdbt_project: outputs: dev: type: duckdb path: /dbt_project/duckdb_incremental.duckdb target: dev
-
接続確認
以下のコマンドで接続設定を確認します:dbt debug
All checks passed!
が表示されれば成功です。
3. 初期データのロード
a. 初期データの作成
以下のCSVを dbt_project/seeds/
に作成します:
seeds/new_events.csv
:
event_id,user_id,event_type,event_time,payload
1,1001,click,2024-11-27T08:00:00,"{""key"": ""value1""}"
2,1002,purchase,2024-11-28T09:00:00,"{""key"": ""value2""}"
b. dbt seed
の実行
以下のコマンドを実行して初期データをロードします:
dbt seed
4. モデルの設定と確認
a. schema.yml の作成
models/schema.yml
を作成し、以下の内容で記述します:
version: 2
models:
- name: raw_event_stream
description: "増分更新を行うイベントデータのテーブル"
columns:
- name: event_id
description: "イベントID。テーブル内で一意です。"
- name: user_id
description: "イベントに関連付けられたユーザーID。"
- name: event_type
description: "イベントの種類(例: click, purchase)"
- name: event_time
description: "イベントが発生した日時。"
- name: payload
description: "イベントの追加情報をJSON形式で格納。"
- name: view_raw_event_stream
description: "raw_event_streamの内容を確認するためのビュー"
b. raw_event_stream.sql の作成
以下の内容で models/raw_event_stream.sql
を作成します。この設定により2回目のロード以降、増分のみがロードされます。:
-- depends_on: {{ ref('new_events') }}
{{ config(
materialized='incremental',
unique_key='event_id'
) }}
SELECT
event_id,
user_id,
event_type,
event_time,
payload
FROM {{ ref('new_events') }} -- 初回実行時は全データを取得
{% if is_incremental() %}
WHERE event_id NOT IN (SELECT event_id FROM {{ this }}) -- 増分データを取得
{% endif %}
c. view_raw_event_stream.sql の作成
今回はテーブルの状態を分かりやすく確認するために以下の内容でmodels/view_raw_event_stream.sql
を作成します:
{{ config(
materialized='view'
) }}
SELECT *
FROM {{ ref('raw_event_stream') }}
ORDER BY event_id
d. dbt_project.ymlの編集
name: 'dbt_project'
version: '1.0.0'
profile: 'dbt_project'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
clean-targets:
- "target"
- "dbt_packages"
# モデル設定
models:
dbt_project:
raw_event_stream:
+materialized: incremental # `raw_event_stream` モデルを増分処理として設定
view_raw_event_stream:
+materialized: view # `view_raw_event_stream` モデルをビューとして設定
e. モデルの実行
以下のコマンドを実行してモデルを作成します:
dbt run
run成功後、dbt-power-userでview_raw_event_stream.sqlの実行し作成されたテーブルを確認してみます。
5. 追加データのロードと確認
a. 追加データの準備
以下のデータを seeds/new_events.csv
に追記します:
event_id(3,4)が追加されています。
event_id,user_id,event_type,event_time,payload
1,1001,click,2024-11-27T08:00:00,"{""key"": ""value1""}"
2,1002,purchase,2024-11-28T09:00:00,"{""key"": ""value2""}"
3,1003,view,2024-11-28T10:00:00,"{""key"": ""value3""}"
4,1004,click,2024-11-28T11:00:00,"{""key"": ""value4""}"
b. データの更新
-
データのロード
dbt seed
-
モデルの更新
dbt run
実際にテーブルを確認してみると、event_idを基準に正しくデータが追加されていることがわかります。
おまけ:event_time を使用した増分更新
event_id ではなく、event_time を基準にして新しいデータをインクリメンタルに更新する方法もあります。この方法は、イベントのタイムスタンプが一意であり、順序が保証されている場合に適しています。
-- depends_on: {{ ref('new_events') }}
{{ config(
materialized='incremental',
unique_key='event_id'
) }}
SELECT *
FROM {{ this }}
{% if is_incremental() %}
UNION ALL
SELECT
event_id,
user_id,
event_type,
event_time,
payload
FROM {{ ref('new_events') }}
WHERE event_time > (SELECT COALESCE(MAX(event_time), TIMESTAMP '1900-01-01 00:00:00') FROM {{ this }})
{% endif %}
まとめ
この記事では、初期データのロード(dbt seed
)を先に行い、その後にモデルを構築する正しい実行順序で作業を進めました。また、schema.yml
を使用してモデルのドキュメント化を行い、incremental
モデルの依存関係を正しく管理しました。この手順に従うことで、柔軟なデータパイプラインを構築できます。