1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

dbtAdvent Calendar 2024

Day 23

Docker環境でDuckDBとdbt coreをセットアップし、Incrementalモデルを活用した増分更新を行う

Last updated at Posted at 2024-11-30

はじめに

今回は、Docker Composeを使ってDuckDBとdbtをセットアップし、Incremental モデルを活用した増分更新の実装手順を解説します。dbtモデルの設定、依存関係の管理、データのロード順序を含む包括的な手順を試してみます。

1. Docker環境の構築

a. Dockerfileの作成

プロジェクトルートに Dockerfile を作成し、以下を記述します:

FROM python:3.9-slim

RUN apt-get update && apt-get install -y \
    curl \
    unzip \
    git \
    libpq-dev && \
    apt-get clean

RUN curl -L https://github.com/duckdb/duckdb/releases/download/v0.8.1/duckdb_cli-linux-amd64.zip -o duckdb.zip && \
    unzip duckdb.zip && \
    mv duckdb /usr/local/bin/duckdb && \
    chmod +x /usr/local/bin/duckdb && \
    rm duckdb.zip

# dbt-coreとdbt-duckdbをインストール
RUN pip install --upgrade pip && \
    pip install dbt-core==1.8.9 dbt-duckdb==1.8.4

WORKDIR /dbt_project

COPY . /dbt_project

CMD ["tail", "-f", "/dev/null"]

b. Docker Composeファイルの作成

プロジェクトルートに docker-compose.yml を作成し、以下を記述します:

services:
  dbt_duckdb:
    build: .
    container_name: dbt_duckdb
    volumes:
      - .:/dbt_project
      - ~/.dbt:/root/.dbt
    working_dir: /dbt_project
    tty: true

c. Docker環境のビルドと起動

  1. Dockerイメージのビルドとコンテナ起動
    以下のコマンドを実行してDockerイメージをビルドし、コンテナを起動します:

    docker compose up --build -d
    
  2. コンテナに入る
    起動後ルートコンテナに入ります。
    スクリーンショット 2024-11-30 20.08.11.png

2. dbtプロジェクトのセットアップ

a. プロジェクトの初期化

  1. dbtプロジェクトを作成
    コンテナ内で以下を実行してdbtプロジェクトを初期化します:

    dbt init dbt_project
    cd dbt_project
    
  2. dbt_project.yml の内容確認
    作成された dbt_project.yml ファイルの profiledbt_project であることを確認します。

b. profiles.ymlの作成と接続確認

  1. プロファイルの設定
    コンテナ内で.dbt/profiles.ymlを作成し、以下の内容を記述します

    dbt_project:
      outputs:
        dev:
          type: duckdb
          path: /dbt_project/duckdb_incremental.duckdb
      target: dev
    
  2. 接続確認
    以下のコマンドで接続設定を確認します:

    dbt debug
    

    All checks passed! が表示されれば成功です。

    スクリーンショット 2024-11-30 15.22.31.png

3. 初期データのロード

a. 初期データの作成

以下のCSVを dbt_project/seeds/ に作成します:

seeds/new_events.csv:

event_id,user_id,event_type,event_time,payload
1,1001,click,2024-11-27T08:00:00,"{""key"": ""value1""}"
2,1002,purchase,2024-11-28T09:00:00,"{""key"": ""value2""}"

b. dbt seed の実行

以下のコマンドを実行して初期データをロードします:

dbt seed

ロードが成功したことを確認してください。
スクリーンショット 2024-11-30 15.23.59.png

4. モデルの設定と確認

a. schema.yml の作成

models/schema.yml を作成し、以下の内容で記述します:

version: 2

models:
  - name: raw_event_stream
    description: "増分更新を行うイベントデータのテーブル"
    columns:
      - name: event_id
        description: "イベントID。テーブル内で一意です。"
      - name: user_id
        description: "イベントに関連付けられたユーザーID。"
      - name: event_type
        description: "イベントの種類(例: click, purchase)"
      - name: event_time
        description: "イベントが発生した日時。"
      - name: payload
        description: "イベントの追加情報をJSON形式で格納。"
  - name: view_raw_event_stream
    description: "raw_event_streamの内容を確認するためのビュー"

b. raw_event_stream.sql の作成

以下の内容で models/raw_event_stream.sql を作成します。この設定により2回目のロード以降、増分のみがロードされます。:

-- depends_on: {{ ref('new_events') }}

{{ config(
    materialized='incremental',
    unique_key='event_id'
) }}

SELECT
    event_id,
    user_id,
    event_type,
    event_time,
    payload
FROM {{ ref('new_events') }}  -- 初回実行時は全データを取得
{% if is_incremental() %}
WHERE event_id NOT IN (SELECT event_id FROM {{ this }})  -- 増分データを取得
{% endif %}

c. view_raw_event_stream.sql の作成

今回はテーブルの状態を分かりやすく確認するために以下の内容でmodels/view_raw_event_stream.sql を作成します:

{{ config(
    materialized='view'
) }}

SELECT * 
FROM {{ ref('raw_event_stream') }}
ORDER BY event_id

d. dbt_project.ymlの編集


name: 'dbt_project'
version: '1.0.0'

profile: 'dbt_project'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets:
  - "target"
  - "dbt_packages"

# モデル設定
models:
  dbt_project:
    raw_event_stream:
      +materialized: incremental  # `raw_event_stream` モデルを増分処理として設定
    view_raw_event_stream:
      +materialized: view         # `view_raw_event_stream` モデルをビューとして設定

e. モデルの実行

以下のコマンドを実行してモデルを作成します:

dbt run

run成功後、dbt-power-userでview_raw_event_stream.sqlの実行し作成されたテーブルを確認してみます。
スクリーンショット 2024-11-30 19.44.17.png

5. 追加データのロードと確認

a. 追加データの準備

以下のデータを seeds/new_events.csv に追記します:
event_id(3,4)が追加されています。

event_id,user_id,event_type,event_time,payload
1,1001,click,2024-11-27T08:00:00,"{""key"": ""value1""}"
2,1002,purchase,2024-11-28T09:00:00,"{""key"": ""value2""}"
3,1003,view,2024-11-28T10:00:00,"{""key"": ""value3""}"
4,1004,click,2024-11-28T11:00:00,"{""key"": ""value4""}"

b. データの更新

  1. データのロード

    dbt seed
    
  2. モデルの更新

    dbt run
    

実際にテーブルを確認してみると、event_idを基準に正しくデータが追加されていることがわかります。
スクリーンショット 2024-11-30 20.04.03.png

おまけ:event_time を使用した増分更新

event_id ではなく、event_time を基準にして新しいデータをインクリメンタルに更新する方法もあります。この方法は、イベントのタイムスタンプが一意であり、順序が保証されている場合に適しています。

-- depends_on: {{ ref('new_events') }}

{{ config(
    materialized='incremental',
    unique_key='event_id'
) }}

SELECT *
FROM {{ this }}
{% if is_incremental() %}
UNION ALL
SELECT
    event_id,
    user_id,
    event_type,
    event_time,
    payload
FROM {{ ref('new_events') }}
WHERE event_time > (SELECT COALESCE(MAX(event_time), TIMESTAMP '1900-01-01 00:00:00') FROM {{ this }})
{% endif %}

まとめ

この記事では、初期データのロード(dbt seed)を先に行い、その後にモデルを構築する正しい実行順序で作業を進めました。また、schema.yml を使用してモデルのドキュメント化を行い、incremental モデルの依存関係を正しく管理しました。この手順に従うことで、柔軟なデータパイプラインを構築できます。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?