0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DuckDB② dbtでDuckDBのデータを変換する

Last updated at Posted at 2025-01-12

この記事でやること

ここではDuckDBをデータストアとするdbtプロジェクトを作成し、前回取り込んだCloudTrailの証跡テーブルを分析しやすい形に変換してみます。

参考にしたページ

dbtとは

dbtについてはこちらの説明を引用します。

dbt(正式名称はdata build tool)はデータアナリストやアナリティクスエンジニアが(ほぼ)SQLだけで、データを変換しデータウエアハウス、データマートを構築していくことができるツールです。
dbtはELTデータパイプラインのアプローチにおける「T」層の部分を担います。データの抽出やロードは出来ませんが、既にデータウェアハウスに読み込まれたデータを変換することに優れています。

また、dbtの一通りの使い方は以下のサイトでわかりやすく解説されています。

dbtはデータ分析プラットフォームにおける「Transform(変換)」を担うツールとして様々なところで活用されており、データエンジニアリングのトレンドを知る意味でも使ってみようと思います。

環境準備

①で利用した環境をベースにdbtをセットアップしていきます。

PythonとPoetryを導入

dbtはPythonのパッケージで提供されるのでPythonの開発環境をセットアップします。ここではPoetryで作成したプロジェクト用の仮想環境上にdbtをインストールします。

Poetryの導入や仮想環境の作り方はこちらを参照してください。

Poetryプロジェクトと仮想環境の作成、dbtのインストール

$ poetry --version
Poetry (version 1.8.5)
# dbtプロジェクトのディレクトリを作成
$ mkdir dbt
$ cd dbt
# Poetryプロジェクトを作成
$ poetry init
# pythonのバージョンを指定して仮想環境を作成
$ poetry env use 3.13.1
# dbt-postgresとdbt-duckdbパッケージをインストール
$ poetry add dbt-postgres dbt-duckdb
# 仮想環境のシェルを起動
$ poetry shell
# dbtのバージョンを確認
(dbt-py3.13) $ dbt --version
Core:
  - installed: 1.9.1
  - latest:    1.9.1 - Up to date!
Plugins:
  - postgres: 1.9.0 - Up to date!
  - duckdb:   1.9.1 - Up to date!

プロファイルの作成

データウェアハウスへの接続設定を~/.dbt/profiles.ymlに書きます。
typeduckdbpathtest.duckdb(永続化するファイル名)とします。

~/.dbt/profiles.yml
duckdb_test:
  outputs:
    dev:
      type: duckdb
      path: test.duckdb
      threads: 1
  target: dev

dbtによるモデル開発

dbtプロジェクトの作成

dbt initコマンドでプロジェクトの雛形を作成します。

$ dbt init duckdb_test

モデルファイルを作成

dbtプロジェクトのmodels/配下にモデルファイルを作成していきます。

まずはCloudTrailを読み込むテーブルのモデルを作成します。
このモデルはテーブルとして実体化したいので、冒頭で{{ config(materialized='table') }}と定義します。
なお、{{ env_var('AWS_LOG_DIR') }}の部分には後で設定する環境変数の値が入ります。

ct_raw.sql
{{ config(materialized='table') }}

WITH raw_data AS (
    SELECT * 
    FROM read_json(
        '{{ env_var('AWS_LOG_DIR') }}/CloudTrail/*/*/*/*/*.json.gz',
        maximum_depth=2,
        sample_size=-1
    )
)
SELECT unnest(Records) AS Event
FROM raw_data

次に上のct_rawテーブルからフィールドを抽出する新しいモデルを作成します。
このモデルはconfig(materialized=を定義せずにビューとして作成します。
なお、FROM {{ ref('ct_raw') }}ではテーブル名の代わりにref関数を使っています。dbtではref関数を活用してモデル間の依存関係をdbtに管理させることが推奨されているようです。

ct_detail.sql
SELECT
  json_extract_string(Event, '$.eventVersion') AS eventVersion,
  json_extract_string(Event, '$.userIdentity.type') AS userType,
  json_extract_string(Event, '$.userIdentity.principalId') AS principalId,
  json_extract_string(Event, '$.userIdentity.arn') AS userArn,
  json_extract_string(Event, '$.userIdentity.accountId') AS accountId,
  json_extract_string(Event, '$.userIdentity.invokedBy') AS invokedBy,
  json_extract_string(Event, '$.userIdentity.accessKeyId') AS accessKeyId,
  json_extract_string(Event, '$.userIdentity.userName') AS userName,
  json_extract_string(Event, '$.userIdentity.sessionContext') AS sessionContext,
  strptime(json_extract_string(Event, '$.eventTime'), '%xT%XZ') as eventTime,
  json_extract_string(Event, '$.eventSource') AS eventSource,
  json_extract_string(Event, '$.eventName') AS eventName,
  json_extract_string(Event, '$.awsRegion') AS awsRegion,
  json_extract_string(Event, '$.sourceIPAddress') AS sourceIPAddress,
  json_extract_string(Event, '$.userAgent') AS userAgent,
  json_extract_string(Event, '$.errorCode') AS errorCode,
  json_extract_string(Event, '$.errorMessage') AS errorMessage,
  json_extract_string(Event, '$.requestParameters') AS requestParameters,
  json_extract_string(Event, '$.responseElements') AS responseElements,
  json_extract_string(Event, '$.additionalEventData') AS additionalEventData,
  json_extract_string(Event, '$.requestID') AS requestID,
  json_extract_string(Event, '$.eventID') AS eventID,
  json_extract_string(Event, '$.resources') AS resources,
  json_extract_string(Event, '$.eventType') AS eventType,
  json_extract_string(Event, '$.apiVersion') AS apiVersion,
  json_extract_string(Event, '$.readOnly') AS readOnly,
  json_extract_string(Event, '$.recipientAccountId') AS recipientAccountId,
  json_extract_string(Event, '$.serviceEventDetails') AS serviceEventDetails,
  json_extract_string(Event, '$.sharedEventID') AS sharedEventID,
  json_extract_string(Event, '$.vpcEndpointId') AS vpcEndpointId,
  json_extract_string(Event, '$.tlsDetails') AS tlsDetails,
  json_extract_string(Event, '$.managementEvent') AS managementEvent,
  json_extract_string(Event, '$.eventCategory') AS eventCategory,
FROM {{ ref('ct_raw') }}

dbtモデルを処理

環境変数を設定

ct_rawモデルから参照する環境変数を定義しておきます。

$ export AWS_LOG_DIR=(ログをダウンロードしたディレクトリのパス)

定義したモデルのテーブル、ビューをDuckDBへ作成

dbt runコマンドを実行するとモデルの定義に沿ったテーブル、ビューがDuckDBに作成されます。

$ dbt run
(dbt-py3.13) $ dbt run
13:14:30  Running with dbt=1.9.1
13:14:31  Registered adapter: duckdb=1.9.1
13:14:31  Found 2 models, 424 macros
13:14:31
13:14:31  Concurrency: 1 threads (target='dev')
13:14:31
13:14:31  1 of 2 START sql table model main.ct_raw ....................................... [RUN]
13:14:37  1 of 2 OK created sql table model main.ct_raw .................................. [OK in 5.11s]
13:14:37  2 of 2 START sql view model main.ct_detail ..................................... [RUN]
13:14:37  2 of 2 OK created sql view model main.ct_detail ................................ [OK in 0.15s]
13:14:37
13:14:37  Finished running 1 table model, 1 view model in 0 hours 0 minutes and 5.93 seconds (5.93s).
13:14:37
13:14:37  Completed successfully
13:14:37
13:14:37  Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2

作成されたテーブル、ビューを確認する

dbtプロジェクト内のtest.duckdbをDBeaverで開くと、モデルで定義したct_rawテーブルとct_detailビューが作成されていることが確認できます。
DBeaver_3.png

例えばct_detailビューに対して以下のSQLを実行すれば、CloudTrailの証跡からEC2インスタンスの作成/起動/停止イベントを抽出することができます。

SELECT
	CASE WHEN eventName = 'RunInstances'
		THEN UNNEST (json_extract_string(responseElements, '$.instancesSet.items[*].instanceId'))
		ELSE UNNEST (json_extract_string(requestParameters, '$.instancesSet.items[*].instanceId'))
	END AS instanceId,
	eventTime,
	CASE WHEN userName IS NULL
		THEN json_extract_string(sessionContext , '$.sessionIssuer.userName')
		ELSE userName
	END AS userName,
	eventName,
FROM main.ct_detail
WHERE eventName IN ('StartInstances', 'StopInstances', 'RunInstances')
AND errorCode IS NULL
ORDER BY instanceId, eventTime

まとめ

ここではdbtを使ってDuckDBに取り込んだCloudTrailの証跡テーブルを分析しやすい形に変換しました。次はCloudTrailのデータを他のAWS構成情報と紐づけるより高度な分析にチャレンジします。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?