この記事でやること
ここではDuckDBをデータストアとするdbtプロジェクトを作成し、前回取り込んだCloudTrailの証跡テーブルを分析しやすい形に変換してみます。
参考にしたページ
dbtとは
dbtについてはこちらの説明を引用します。
dbt(正式名称はdata build tool)はデータアナリストやアナリティクスエンジニアが(ほぼ)SQLだけで、データを変換しデータウエアハウス、データマートを構築していくことができるツールです。
dbtはELTデータパイプラインのアプローチにおける「T」層の部分を担います。データの抽出やロードは出来ませんが、既にデータウェアハウスに読み込まれたデータを変換することに優れています。
また、dbtの一通りの使い方は以下のサイトでわかりやすく解説されています。
dbtはデータ分析プラットフォームにおける「Transform(変換)」を担うツールとして様々なところで活用されており、データエンジニアリングのトレンドを知る意味でも使ってみようと思います。
環境準備
①で利用した環境をベースにdbtをセットアップしていきます。
PythonとPoetryを導入
dbtはPythonのパッケージで提供されるのでPythonの開発環境をセットアップします。ここではPoetryで作成したプロジェクト用の仮想環境上にdbtをインストールします。
Poetryの導入や仮想環境の作り方はこちらを参照してください。
Poetryプロジェクトと仮想環境の作成、dbtのインストール
$ poetry --version
Poetry (version 1.8.5)
# dbtプロジェクトのディレクトリを作成
$ mkdir dbt
$ cd dbt
# Poetryプロジェクトを作成
$ poetry init
# pythonのバージョンを指定して仮想環境を作成
$ poetry env use 3.13.1
# dbt-postgresとdbt-duckdbパッケージをインストール
$ poetry add dbt-postgres dbt-duckdb
# 仮想環境のシェルを起動
$ poetry shell
# dbtのバージョンを確認
(dbt-py3.13) $ dbt --version
Core:
- installed: 1.9.1
- latest: 1.9.1 - Up to date!
Plugins:
- postgres: 1.9.0 - Up to date!
- duckdb: 1.9.1 - Up to date!
プロファイルの作成
データウェアハウスへの接続設定を~/.dbt/profiles.yml
に書きます。
type
はduckdb
、path
はtest.duckdb
(永続化するファイル名)とします。
duckdb_test:
outputs:
dev:
type: duckdb
path: test.duckdb
threads: 1
target: dev
dbtによるモデル開発
dbtプロジェクトの作成
dbt init
コマンドでプロジェクトの雛形を作成します。
$ dbt init duckdb_test
モデルファイルを作成
dbtプロジェクトのmodels/配下にモデルファイルを作成していきます。
まずはCloudTrailを読み込むテーブルのモデルを作成します。
このモデルはテーブルとして実体化したいので、冒頭で{{ config(materialized='table') }}
と定義します。
なお、{{ env_var('AWS_LOG_DIR') }}
の部分には後で設定する環境変数の値が入ります。
{{ config(materialized='table') }}
WITH raw_data AS (
SELECT *
FROM read_json(
'{{ env_var('AWS_LOG_DIR') }}/CloudTrail/*/*/*/*/*.json.gz',
maximum_depth=2,
sample_size=-1
)
)
SELECT unnest(Records) AS Event
FROM raw_data
次に上のct_rawテーブルからフィールドを抽出する新しいモデルを作成します。
このモデルはconfig(materialized=
を定義せずにビューとして作成します。
なお、FROM {{ ref('ct_raw') }}
ではテーブル名の代わりにref関数を使っています。dbtではref関数を活用してモデル間の依存関係をdbtに管理させることが推奨されているようです。
SELECT
json_extract_string(Event, '$.eventVersion') AS eventVersion,
json_extract_string(Event, '$.userIdentity.type') AS userType,
json_extract_string(Event, '$.userIdentity.principalId') AS principalId,
json_extract_string(Event, '$.userIdentity.arn') AS userArn,
json_extract_string(Event, '$.userIdentity.accountId') AS accountId,
json_extract_string(Event, '$.userIdentity.invokedBy') AS invokedBy,
json_extract_string(Event, '$.userIdentity.accessKeyId') AS accessKeyId,
json_extract_string(Event, '$.userIdentity.userName') AS userName,
json_extract_string(Event, '$.userIdentity.sessionContext') AS sessionContext,
strptime(json_extract_string(Event, '$.eventTime'), '%xT%XZ') as eventTime,
json_extract_string(Event, '$.eventSource') AS eventSource,
json_extract_string(Event, '$.eventName') AS eventName,
json_extract_string(Event, '$.awsRegion') AS awsRegion,
json_extract_string(Event, '$.sourceIPAddress') AS sourceIPAddress,
json_extract_string(Event, '$.userAgent') AS userAgent,
json_extract_string(Event, '$.errorCode') AS errorCode,
json_extract_string(Event, '$.errorMessage') AS errorMessage,
json_extract_string(Event, '$.requestParameters') AS requestParameters,
json_extract_string(Event, '$.responseElements') AS responseElements,
json_extract_string(Event, '$.additionalEventData') AS additionalEventData,
json_extract_string(Event, '$.requestID') AS requestID,
json_extract_string(Event, '$.eventID') AS eventID,
json_extract_string(Event, '$.resources') AS resources,
json_extract_string(Event, '$.eventType') AS eventType,
json_extract_string(Event, '$.apiVersion') AS apiVersion,
json_extract_string(Event, '$.readOnly') AS readOnly,
json_extract_string(Event, '$.recipientAccountId') AS recipientAccountId,
json_extract_string(Event, '$.serviceEventDetails') AS serviceEventDetails,
json_extract_string(Event, '$.sharedEventID') AS sharedEventID,
json_extract_string(Event, '$.vpcEndpointId') AS vpcEndpointId,
json_extract_string(Event, '$.tlsDetails') AS tlsDetails,
json_extract_string(Event, '$.managementEvent') AS managementEvent,
json_extract_string(Event, '$.eventCategory') AS eventCategory,
FROM {{ ref('ct_raw') }}
dbtモデルを処理
環境変数を設定
ct_rawモデルから参照する環境変数を定義しておきます。
$ export AWS_LOG_DIR=(ログをダウンロードしたディレクトリのパス)
定義したモデルのテーブル、ビューをDuckDBへ作成
dbt run
コマンドを実行するとモデルの定義に沿ったテーブル、ビューがDuckDBに作成されます。
$ dbt run
(dbt-py3.13) $ dbt run
13:14:30 Running with dbt=1.9.1
13:14:31 Registered adapter: duckdb=1.9.1
13:14:31 Found 2 models, 424 macros
13:14:31
13:14:31 Concurrency: 1 threads (target='dev')
13:14:31
13:14:31 1 of 2 START sql table model main.ct_raw ....................................... [RUN]
13:14:37 1 of 2 OK created sql table model main.ct_raw .................................. [OK in 5.11s]
13:14:37 2 of 2 START sql view model main.ct_detail ..................................... [RUN]
13:14:37 2 of 2 OK created sql view model main.ct_detail ................................ [OK in 0.15s]
13:14:37
13:14:37 Finished running 1 table model, 1 view model in 0 hours 0 minutes and 5.93 seconds (5.93s).
13:14:37
13:14:37 Completed successfully
13:14:37
13:14:37 Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2
作成されたテーブル、ビューを確認する
dbtプロジェクト内のtest.duckdb
をDBeaverで開くと、モデルで定義したct_rawテーブルとct_detailビューが作成されていることが確認できます。
例えばct_detailビューに対して以下のSQLを実行すれば、CloudTrailの証跡からEC2インスタンスの作成/起動/停止イベントを抽出することができます。
SELECT
CASE WHEN eventName = 'RunInstances'
THEN UNNEST (json_extract_string(responseElements, '$.instancesSet.items[*].instanceId'))
ELSE UNNEST (json_extract_string(requestParameters, '$.instancesSet.items[*].instanceId'))
END AS instanceId,
eventTime,
CASE WHEN userName IS NULL
THEN json_extract_string(sessionContext , '$.sessionIssuer.userName')
ELSE userName
END AS userName,
eventName,
FROM main.ct_detail
WHERE eventName IN ('StartInstances', 'StopInstances', 'RunInstances')
AND errorCode IS NULL
ORDER BY instanceId, eventTime
まとめ
ここではdbtを使ってDuckDBに取り込んだCloudTrailの証跡テーブルを分析しやすい形に変換しました。次はCloudTrailのデータを他のAWS構成情報と紐づけるより高度な分析にチャレンジします。