はじめに
この記事は、AWS Certified Data Engineer - Associate(DEA-C01)の学習ノートです。
Domain1「Data Ingestion and Transformation」は、試験全体の**34%**を占める最重要ドメインです。データの取り込みから変換、パイプラインのオーケストレーションまで、データエンジニアリングの核となる内容が詰まっています。
自分の学習用にまとめたものですが、同じくDEAを目指す方の参考になれば幸いです。
試験における位置づけ
| 項目 | 内容 |
|---|---|
| 配点比率 | 34%(全4ドメイン中最大) |
| タスク数 | 4つ(1.1〜1.4) |
| 重要度 | 最重要ドメイン |
目次
- データエンジニアリングの基礎概念
- Task 1.1: データの取り込み(Ingestion)
- Task 1.2: データの変換と処理(Transform)
- Task 1.3: データパイプラインのオーケストレーション
- Task 1.4: プログラミングの概念を適用する
- サービス比較と使い分け
- 試験対策のポイント
1. データエンジニアリングの基礎概念
1.1 データエンジニアリングライフサイクル
データエンジニアリングには5つの段階があります。この流れを理解することが、Domain1の全体像を把握する鍵となります。
+------------+ +------------+ +------------+ +------------+ +------------+
| | | | | | | | | |
| 生成 | --> | ストレージ | --> | 取り込み | --> | 変換 | --> | 提供 |
| Generation | | Storage | | Ingestion | | Transform | | Serving |
| | | | | | | | | |
+------------+ +------------+ +------------+ +------------+ +------------+
| | | | |
v v v v v
ソースシステム 保存先の選択 データ収集 データ加工 分析・可視化
(IoT,DB,App) (S3,Redshift) (Kinesis,Glue) (EMR,Glue) (Athena,QuickSight)
各段階の役割:
| 段階 | What(何をするか) | Why(なぜ必要か) | 担当範囲 |
|---|---|---|---|
| 生成 | データの発生源を理解する | 取り込み設計の前提条件 | ソースシステム側 |
| ストレージ | データの保存先を選択する | 後続処理に最適な形式で保存 | データエンジニア |
| 取り込み | ソースからデータを収集する | 分析基盤にデータを集約 | Domain1の主題 |
| 変換 | データを使える形に加工する | rawデータを価値ある形に | Domain1の主題 |
| 提供 | データを利用者に届ける | ビジネス価値の創出 | データエンジニア |
1.2 データの5つのV
データを扱う際に考慮すべき5つの重要な特性があります。
| V | 日本語 | 説明 | 考慮すべき点 |
|---|---|---|---|
| Volume | ボリューム | データ量 | サービスの容量制限、スループット |
| Velocity | 速度 | 処理速度 | リアルタイム vs バッチ |
| Variety | 多様性 | データ形式 | 構造化/半構造化/非構造化 |
| Veracity | 信憑性 | データ品質 | 欠損、不整合、重複 |
| Value | 価値 | ビジネス価値 | 投資対効果(ROI) |
実務での活用例:
- Volume が大きい → S3 + EMR での分散処理を検討
- Velocity が高い → Kinesis でリアルタイム処理
- Variety が多様 → Glue Data Catalog でスキーマ管理
2. Task 1.1: データの取り込み(Ingestion)
2.1 取り込みの基本概念
ストリーミング vs バッチ
データの取り込み方法は大きく2種類に分けられます。
| 項目 | ストリーミング | バッチ |
|---|---|---|
| What | データを連続的にリアルタイムで処理 | データを一定期間蓄積して一括処理 |
| Why | 即座にデータを活用したい場合 | コスト効率を重視する場合 |
| How | Kinesis, MSK | Glue, EMR, DMS |
| データ特性 | 小さなイベントが連続 | 大きなファイルを定期実行 |
| レイテンシ | 低(秒〜分) | 高(分〜時間) |
| コスト | 常時稼働でやや高め | 実行時のみ課金で効率的 |
| ユースケース | クリックストリーム、IoT | 日次売上集計、週次レポート |
【ストリーミング処理のイメージ】
データソース 取り込み 処理 保存
| | | |
v v v v
+--------+ +-----------+ +-----------+ +--------+
| IoT |----->| Kinesis |----->| Lambda |----->| S3 |
| センサー| 常時 | Data | 即座 | (変換) | | |
+--------+ | Streams | +-----------+ +--------+
+-----------+
|
v
リアルタイムに流れ続ける
【バッチ処理のイメージ】
データソース 蓄積 取り込み・処理 保存
| | | |
v v v v
+--------+ +-----------+ +-----------+ +--------+
| DB |----->| S3 |---------> | Glue ETL |------->|Redshift|
| (売上) | 日次 | (raw) | スケジュール | (変換) | | |
+--------+ +-----------+ +-----------+ +--------+
|
v
1日分まとめて処理
プッシュ vs プル
| 方式 | 説明 | 例 |
|---|---|---|
| プッシュ | ソースがデータを送信する | IoTセンサー → Kinesis |
| プル | 取り込み側がデータを取得する | Glue → RDS から抽出 |
2.2 ストリーミングデータ取り込みサービス
Amazon Kinesis ファミリー
Kinesisは「ストリーミングデータを処理するためのサービス群」です。用途に応じて使い分けます。
| サービス | What(何をするか) | Why(なぜ使うか) | How(どう使うか) |
|---|---|---|---|
| Kinesis Data Streams | データストリームの受信・保持 | カスタム処理、複数コンシューマー対応 | シャード数でスループット制御 |
| Kinesis Data Firehose | ストリームを宛先に配信 | 運用不要で簡単にデータ配信 | バッファ設定で配信頻度調整 |
| Managed Service for Apache Flink | ストリームをリアルタイム分析 | SQLやFlinkでストリーム処理 | SQL/Flinkコードで分析ロジック定義 |
| Kinesis Video Streams | 動画・音声ストリームの処理 | 時系列メディアデータの処理 | 動画分析、機械学習連携 |
Kinesis Data Streams の仕組み:
Kinesis Data Streams
+--------+ +------------------------------------+ +----------+
|Producer| ------> | シャード1 | シャード2 | シャード3 | ------> |Consumer |
| (送信側)| | [|||] | [|||] | [|||] | | (受信側)|
+--------+ +------------------------------------+ +----------+
|
v
パーティションキーでシャード振り分け
各シャード: 1MB/秒 書き込み, 2MB/秒 読み取り
重要な概念:
| 用語 | 説明 | 試験ポイント |
|---|---|---|
| シャード | データの分散単位 | スループットを決定する |
| パーティションキー | シャード振り分けのキー | ホットシャード対策が重要 |
| IteratorAgeMilliseconds | 処理遅延の指標 | 高い=処理が追いついていない |
| 拡張ファンアウト | コンシューマーごとに2MB/秒 | 複数コンシューマー時に有効 |
ホットシャード問題と対策:
ホットシャードとは、特定のシャードにデータが集中してしまう問題です。
【問題のある状態】
パーティションキー: ユーザーID (人気ユーザーに集中)
シャード1: |||||||||||||| <-- 過負荷(ホットシャード)
シャード2: ||
シャード3: |
【対策後】
パーティションキー: ランダム or 複合キー
シャード1: |||||
シャード2: |||||
シャード3: |||| <-- 均等に分散
対策方法:
- ランダムなパーティションキーを使用
- 複合キー(ユーザーID + タイムスタンプ)を使用
- UpdateShardCount でシャード数を増加
Kinesis Data Streams vs Data Firehose
| 観点 | Data Streams | Data Firehose |
|---|---|---|
| 管理 | シャード管理が必要 | フルマネージド |
| カスタム処理 | 可能(Lambda, KCL) | 限定的(Lambda変換のみ) |
| 直接配信先 | なし(コンシューマー必須) | S3, Redshift, OpenSearch, Splunk |
| データ保持 | 24時間〜365日 | なし(即配信) |
| 複数コンシューマー | 対応 | 非対応 |
| 課金 | シャード時間 | データ量 |
選択の指針:
- カスタム処理が必要 → Data Streams
- 単純にS3やRedshiftに配信したい → Data Firehose
- 複数の異なる処理を並列で行いたい → Data Streams
Amazon MSK(Managed Streaming for Apache Kafka)
| 項目 | 説明 |
|---|---|
| What | Apache Kafkaのマネージドサービス |
| Why | 既存Kafkaワークロードの移行、オープンソース互換性 |
| How | 既存のKafkaクライアント・ツールがそのまま使える |
Kinesis vs MSK の選択基準:
| 観点 | Kinesis | MSK |
|---|---|---|
| 新規構築 | 推奨(サーバーレス) | - |
| 既存Kafka移行 | - | 推奨(互換性) |
| 課金 | データスループット | クラスター時間 |
| 運用負荷 | 低い | やや高い |
Amazon DynamoDB Streams
DynamoDBテーブルの変更をリアルタイムでキャプチャするストリーミング機能です。
| 項目 | 説明 |
|---|---|
| What | DynamoDBテーブルの変更(INSERT/UPDATE/DELETE)をストリームとして提供 |
| Why | テーブル変更をトリガーに後続処理を実行したい場合 |
| How | Lambda関数やKinesis Client Library(KCL)で消費 |
+------------+ DynamoDB Streams +----------+ +----------+
| DynamoDB | -----------------------> | Lambda | --> | S3 |
| テーブル | 変更イベント | (処理) | | (保存) |
+------------+ +----------+ +----------+
ユースケース:
- テーブル変更の監査ログ作成
- リアルタイムの集計・分析
- クロスリージョンレプリケーション
- 変更をトリガーとした通知
2.3 バッチデータ取り込みサービス
AWS Glue
| 項目 | 説明 |
|---|---|
| What | サーバーレスETLサービス |
| Why | インフラ管理不要でETL処理を実行 |
| How | Spark/Pythonでジョブを定義、クローラでカタログ化 |
AWS Glue の主要コンポーネント:
+------------------+ +------------------+ +------------------+
| Glue Crawler | | Glue Data | | Glue ETL Job |
| | | Catalog | | |
| データソースを | --> | メタデータを | --> | データを変換 |
| スキャン | | 管理 | | して出力 |
+------------------+ +------------------+ +------------------+
| | |
v v v
S3のファイル構造 テーブル定義、スキーマ Spark/Pythonコード
を自動検出 パーティション情報 で変換ロジック実行
重要機能:ジョブブックマーク
ジョブブックマークは「前回どこまで処理したか」を記録する機能です。
【ジョブブックマークなし】
1回目実行: ファイルA, B, C を処理
2回目実行: ファイルA, B, C, D, E を処理(重複あり)
【ジョブブックマークあり】
1回目実行: ファイルA, B, C を処理 → 記録
2回目実行: ファイルD, E のみ処理(新規分のみ)
試験ポイント:
- ジョブブックマークを有効化すると、増分処理が可能
- 重複処理を防ぎ、効率的なパイプラインを実現
AWS DMS(Database Migration Service)
| 項目 | 説明 |
|---|---|
| What | データベース移行・レプリケーションサービス |
| Why | 既存DBからAWSへの移行、継続的なデータ同期 |
| How | ソースDBのトランザクションログを読み取り、ターゲットに反映 |
DMSのユースケース:
【データベース移行】
オンプレミスOracle ----DMS----> Amazon Aurora
【継続的レプリケーション(CDC)】
本番DB (RDS) ----DMS (CDC)----> S3 データレイク
|
v
分析用に継続同期
CDC(Change Data Capture)とは:
データベースの変更(INSERT, UPDATE, DELETE)をリアルタイムで捕捉し、ターゲットに反映する仕組みです。
その他の取り込みサービス
| サービス | What | ユースケース |
|---|---|---|
| Amazon AppFlow | SaaSアプリとの連携 | Salesforce → S3/Redshift |
| AWS Transfer Family | FTP/SFTPでのファイル転送 | 既存FTPワークフローの移行 |
| AWS DataSync | オンプレミスストレージとの同期 | NAS → S3 のデータ移行 |
| AWS Snow Family | 大容量オフラインデータ転送 | ペタバイト級のデータ移行 |
2.4 データAPIの利用
外部システムやSaaSからデータを取得する際、APIを通じてデータを消費することがあります。
API経由でのデータ取り込みパターン
【API経由の取り込みフロー】
+------------+ API呼び出し +----------+ +----------+
| 外部API | <----------------- | Lambda | --> | S3 |
| (REST/ | レスポンス | or | | (保存) |
| GraphQL) | -----------------> | AppFlow | +----------+
+------------+ +----------+
AWSでのAPI利用パターン:
| パターン | 実装方法 | ユースケース |
|---|---|---|
| 定期的なAPI呼び出し | EventBridge + Lambda | 日次でAPIからデータ取得 |
| SaaS連携 | Amazon AppFlow | Salesforce、ServiceNowなど |
| カスタムAPI | Lambda + API Gateway | 独自APIの構築 |
API呼び出し時の考慮点:
- レート制限(後述)への対応
- 認証情報の安全な管理(Secrets Manager)
- エラー時のリトライ戦略
2.5 スロットリングとレート制限
大量のデータを扱う際、AWSサービスや外部APIのレート制限に対応する必要があります。
レート制限とは
サービスが過負荷になることを防ぐため、一定時間内のリクエスト数を制限する仕組みです。
主なサービスのレート制限:
| サービス | 制限の種類 | 対策 |
|---|---|---|
| DynamoDB | RCU/WCUのプロビジョニング、バーストキャパシティ | オンデマンドモード、指数バックオフ |
| Kinesis | シャードあたり1MB/秒(書き込み) | シャード追加、パーティションキー分散 |
| RDS | 接続数制限 | 接続プーリング、RDS Proxy |
| Lambda | 同時実行数(デフォルト1000) | 予約済み同時実行、プロビジョンド同時実行 |
| API Gateway | 10,000リクエスト/秒(デフォルト) | スロットリング設定、使用量プラン |
指数バックオフとジッター
スロットリングエラー発生時の再試行戦略です。
【指数バックオフの仕組み】
1回目の再試行: 1秒後
2回目の再試行: 2秒後
3回目の再試行: 4秒後
4回目の再試行: 8秒後
↓
指数関数的に待機時間を増加
【ジッター(ランダム化)を追加】
待機時間にランダムな値を加えることで、
複数クライアントの再試行タイミングを分散
→ 同時リトライによる再スロットリングを防止
実装例(概念):
import random
import time
def exponential_backoff_with_jitter(attempt, base=1, max_delay=32):
delay = min(base * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay)
return delay + jitter
2.6 再実行容易性とべき等性
再実行容易性(Replayability)
パイプラインの再実行容易性とは、「障害発生時にデータを再処理できる能力」です。
実現のためのベストプラクティス:
| 方法 | 説明 | AWSサービス |
|---|---|---|
| イベント駆動設計 | データ到着時にトリガー | S3イベント、EventBridge |
| チェックポイント | 処理の進捗を記録 | Glueジョブブックマーク |
| rawデータ保存 | 元データを保持 | S3(耐久性のあるストレージ) |
| データバージョニング | 世代管理 | S3バージョニング |
べき等性(Idempotency)
べき等性とは、「同じ処理を何度実行しても同じ結果になる」性質です。
【べき等でない処理】
処理: カウンターに+1
1回目実行: 0 → 1
2回目実行: 1 → 2 ← 結果が変わる(問題)
【べき等な処理】
処理: 値を「処理済み」に更新
1回目実行: 未処理 → 処理済み
2回目実行: 処理済み → 処理済み ← 結果が同じ(OK)
2.7 ステートフル vs ステートレス
| 種類 | 説明 | サービス例 |
|---|---|---|
| ステートフル | 状態を保持する | ElastiCache, RDS |
| ステートレス | 状態を保持しない | Lambda, API Gateway, S3 |
実務での意味:
- ステートレスなサービスはスケールしやすい
- ステートフルなサービスはセッション管理などに必要
2.8 IPアドレス許可リスト(Allowlist)
データソースへの接続時、セキュリティのためIPアドレスベースのアクセス制御が必要な場合があります。
設定が必要なケース:
- オンプレミスDBへの接続
- 外部SaaSへのアクセス
- パートナー企業のAPIへの接続
AWSでの実現方法:
| 方法 | 説明 | ユースケース |
|---|---|---|
| NAT Gateway | 固定のElastic IPでアウトバウンド | 外部サービスへの接続 |
| VPCエンドポイント | プライベート接続 | AWSサービスへのアクセス |
| AWS Global Accelerator | 固定IPアドレス | グローバルなアクセス |
3. Task 1.2: データの変換と処理(Transform)
3.1 変換の基本概念
なぜ変換が必要か
rawデータはそのままでは分析に使えません。変換によって「使えるデータ」にします。
【変換前(rawデータ)】
"2024-01-15","John Doe","100.50","USD"
"2024/01/16","Jane Smith","200","JPY"
"Jan 17, 2024","Bob","invalid","EUR"
【変換後(クレンジング済み)】
date | name | amount | currency
------------|------------|--------|----------
2024-01-15 | John Doe | 100.50 | USD
2024-01-16 | Jane Smith | 200.00 | JPY
2024-01-17 | Bob | NULL | EUR
変換の種類
| 変換タイプ | 説明 | 例 |
|---|---|---|
| データ型変換 | 文字列→数値、日付 | "100" → 100 |
| フォーマット変換 | CSV → Parquet | 列指向形式への変換 |
| クレンジング | 不正データの除去・修正 | NULL埋め、重複削除 |
| 正規化 | スキーマの標準化 | 住所の分割、名前の統一 |
| 集約 | データの要約 | 日次売上の月次集計 |
| エンリッチ化 | 外部データの付加 | 顧客IDからマスタ情報を結合 |
3.2 ETL vs ELT
データ処理のアプローチには2種類あります。
| 観点 | ETL | ELT |
|---|---|---|
| 正式名称 | Extract → Transform → Load | Extract → Load → Transform |
| 変換場所 | 中間処理サーバー | データウェアハウス内 |
| 適した場面 | 複雑な変換、多様なソース | DWH主体の分析環境 |
| AWSサービス例 | Glue ETL, EMR | Redshift SQL |
【ETLパターン】
ソース --抽出--> Glue ETL --変換--> Redshift
(Spark) (ロード済み)
【ELTパターン】
ソース --抽出--> Redshift --変換--> Redshift
(staging) (SQL) (マート)
3.3 OLTP vs OLAP
| 観点 | OLTP | OLAP |
|---|---|---|
| 正式名称 | Online Transaction Processing | Online Analytical Processing |
| 目的 | トランザクション処理 | 分析処理 |
| データ | 最新の状態 | 履歴データ含む |
| クエリ | 単純、高頻度 | 複雑、低頻度 |
| スキーマ | 正規化 | スタースキーマ、スノーフレーク |
| サービス例 | RDS, DynamoDB | Redshift, Athena |
3.4 変換サービスの詳細
AWS Glue(ETL)
Glue ETL の特徴:
| 特徴 | 説明 |
|---|---|
| サーバーレス | インフラ管理不要 |
| Spark基盤 | PySpark/Scalaでコーディング |
| DynamicFrame | スキーマの柔軟な扱いが可能 |
| ビジュアル開発 | Glue Studioでノーコード開発も可能 |
Glue Studio vs Glue DataBrew:
| ツール | 用途 | ユーザー |
|---|---|---|
| Glue Studio | ETLジョブの視覚的開発 | データエンジニア |
| Glue DataBrew | データプロファイリング・前処理 | データアナリスト |
Amazon EMR
| 項目 | 説明 |
|---|---|
| What | Hadoopエコシステムのマネージドサービス |
| Why | 大規模データの分散処理、カスタマイズ可能 |
| How | Spark, Hive, Prestoなどのフレームワークを選択して実行 |
EMR の構成:
+------------------+
| Primary Node | ← クラスター管理
+------------------+
|
+----+----+
| |
+--------+ +--------+
|Core | |Core | ← データ保持 + 処理
|Node 1 | |Node 2 |
+--------+ +--------+
| |
+--------+ +--------+
|Task | |Task | ← 処理のみ(スポット向き)
|Node 1 | |Node 2 |
+--------+ +--------+
Glue vs EMR の使い分け:
| 観点 | Glue | EMR |
|---|---|---|
| 運用負荷 | 低い(サーバーレス) | やや高い(クラスター管理) |
| カスタマイズ | 制限あり | 高い自由度 |
| コスト | 処理時間課金 | クラスター時間課金 |
| 適した場面 | シンプルなETL | 複雑な処理、Hadoopエコシステム活用 |
選択の指針:
- シンプルなETL、運用負荷を減らしたい → Glue
- Hive, HBase, Prestoなど特定ツールが必要 → EMR
- 大規模データの複雑な処理 → EMR
コンテナでのデータ処理(EKS/ECS)
コンテナ技術を使ったデータ処理も選択肢の一つです。
| サービス | 説明 | ユースケース |
|---|---|---|
| Amazon ECS | AWSマネージドのコンテナオーケストレーション | シンプルなコンテナワークロード |
| Amazon EKS | マネージドKubernetes | Kubernetes互換が必要な場合 |
| AWS Batch | バッチ処理に特化 | 大規模バッチジョブ |
Glue/EMR vs コンテナの使い分け:
| 観点 | Glue/EMR | ECS/EKS |
|---|---|---|
| 学習コスト | 低い(マネージド) | やや高い |
| カスタマイズ | 制限あり | 高い自由度 |
| 既存資産 | Spark/Hadoopの知識活用 | 既存コンテナイメージ活用 |
| 適した場面 | データ処理特化 | 汎用的なワークロード |
EMR on EKS:
KubernetesクラスターでSparkジョブを実行する選択肢もあります。既存のKubernetes環境を活用しつつ、Sparkの分散処理能力を利用できます。
AWS Lambda(軽量変換)
| 項目 | 説明 |
|---|---|
| What | サーバーレス関数実行サービス |
| Why | 軽量な変換、イベント駆動処理 |
| How | S3イベントやKinesisトリガーでコード実行 |
Lambda の制約:
| 制約 | 値 |
|---|---|
| 最大実行時間 | 15分 |
| メモリ | 128MB〜10GB |
| 一時ストレージ | 最大10GB |
適した用途:
- ファイルの検証(形式チェック)
- 軽量な変換(JSON→CSV)
- イベント通知のトリガー
3.5 データ形式と最適化
ファイル形式の比較
| 形式 | タイプ | 圧縮 | クエリ効率 | 用途 |
|---|---|---|---|---|
| CSV | 行指向 | 低 | 低 | データ交換、互換性重視 |
| JSON | 行指向 | 低 | 低 | API、ログ |
| Parquet | 列指向 | 高 | 高 | 分析クエリ |
| ORC | 列指向 | 高 | 高 | Hive最適化 |
| Avro | 行指向 | 中 | 中 | スキーマ進化が必要な場合 |
列指向形式(Parquet/ORC)のメリット:
【行指向(CSV)の読み取り】
SELECT name FROM users;
→ 全列を読み取ってからnameを抽出
id,name,age,email
1,John,30,john@... ← 全部読む
2,Jane,25,jane@... ← 全部読む
【列指向(Parquet)の読み取り】
SELECT name FROM users;
→ name列のみ読み取り
id列: [1,2,3...] ← スキップ
name列: [John,Jane...] ← これだけ読む
age列: [30,25,...] ← スキップ
試験ポイント:
- 分析クエリにはParquetが最適
- Athena, Redshift Spectrumとの組み合わせで効果大
3.6 JDBC/ODBC接続
データソースへの接続方式を理解しておく必要があります。
| 方式 | 説明 | 使用場面 |
|---|---|---|
| JDBC | Java Database Connectivity | Glue, EMRからDB接続 |
| ODBC | Open Database Connectivity | 汎用的なDB接続 |
Glue での JDBC 接続:
+------------+ +----------------+ +------------+
| Glue | JDBC | Security | | RDS |
| Job | ------> | Group | ------> | MySQL |
+------------+ | (適切なルール) | +------------+
+----------------+
設定のポイント:
- JDBCドライバーの準備
- セキュリティグループのインバウンドルール設定
- 接続URL、認証情報の設定
3.7 中間ステージングロケーション
ETLパイプラインでは、処理の途中でデータを一時的に保存する「中間ステージング」が重要です。
【中間ステージングの役割】
ソース --> 中間ステージング --> 変換処理 --> 中間ステージング --> ターゲット
(raw zone) (processed zone)
・再処理が可能 ・品質チェック後のデータ
・障害時の復旧ポイント ・下流処理への入力
S3をステージングに使用する理由:
| メリット | 説明 |
|---|---|
| 耐久性 | 99.999999999%(11ナイン)の耐久性 |
| スケーラビリティ | 容量制限なし |
| コスト効率 | ストレージ階層で最適化可能 |
| 統合性 | Glue、EMR、Athenaと直接連携 |
データレイクのゾーン設計:
| ゾーン | 目的 | データ状態 |
|---|---|---|
| Raw Zone | 生データの保存 | 未加工、ソースのまま |
| Staging Zone | 処理中の一時保存 | 変換途中 |
| Processed Zone | 変換済みデータ | クレンジング、正規化済み |
| Curated Zone | 分析用データ | 集約、最適化済み |
3.8 変換のトラブルシューティング
変換処理で問題が発生した場合の調査・解決アプローチです。
よくある障害と対策
| 障害タイプ | 原因例 | 対策 |
|---|---|---|
| データ品質エラー | 想定外の形式、NULL値 | バリデーション追加、try-catch |
| メモリ不足 | データサイズ過大 | パーティション分割、インスタンスサイズ変更 |
| タイムアウト | 処理時間超過 | 並列化、増分処理 |
| 接続エラー | ネットワーク、認証 | リトライ、セキュリティグループ確認 |
| スキーマ不一致 | ソースのスキーマ変更 | スキーマ進化対応、DynamicFrame使用 |
デバッグの手順
1. ログの確認
└── CloudWatch Logs でエラーメッセージ確認
2. データ検証
└── 入力データのサンプルを確認
└── スキーマ、データ型をチェック
3. 小規模テスト
└── サンプルデータで変換ロジックを検証
└── Glue開発エンドポイント/インタラクティブセッション活用
4. パフォーマンス分析
└── Spark UI でジョブの実行状況確認
└── ボトルネック(シャッフル、スキュー)を特定
5. 段階的な修正
└── 変更は小さく、テストしながら適用
AWS Glue でのデバッグ機能:
- インタラクティブセッション: Jupyter Notebookでステップ実行
- 継続的ログ記録: CloudWatch Logsへのリアルタイム出力
- ジョブメトリクス: CloudWatchでDPU使用率などを監視
3.9 データAPIの作成
変換したデータを他のシステムから利用できるようにAPIを作成する方法です。
【データAPIアーキテクチャ】
+----------+ +-------------+ +----------+ +----------+
|クライアント| --> | API Gateway | --> | Lambda | --> | データソース|
| | | (認証/制御) | | (処理) | | (S3/RDS) |
+----------+ +-------------+ +----------+ +----------+
|
v
- 認証(IAM, Cognito)
- レート制限
- キャッシュ
構成要素:
| コンポーネント | 役割 | AWSサービス |
|---|---|---|
| APIエンドポイント | リクエスト受付 | API Gateway |
| 処理ロジック | データ取得・加工 | Lambda |
| データストア | データ保存 | S3, RDS, DynamoDB |
| 認証 | アクセス制御 | IAM, Cognito |
| キャッシュ | 応答高速化 | API Gateway キャッシュ, ElastiCache |
4. Task 1.3: データパイプラインのオーケストレーション
4.1 オーケストレーションの基本
オーケストレーションとは
複数のデータ処理タスクを適切な順序で実行し、依存関係を管理することです。
【単純なパイプライン】
タスクA → タスクB → タスクC
【複雑なパイプライン】
+→ タスクB1 →+
タスクA → → タスクD → タスクE
+→ タスクB2 →+
↓
失敗時はタスクF(通知)
DAG(有向非巡回グラフ)
DAGは「ループのない一方向のグラフ」で、パイプラインの構造を表現します。
A
/ \
B C
\ /
D
A → B → D
A → C → D
(AからDへは2つのパス、ループなし)
DAGが必要なサービス:
- Amazon MWAA(Apache Airflow)→ DAGとして定義必須
- AWS Step Functions → DAGである必要はない
4.2 オーケストレーションサービス
AWS Step Functions
| 項目 | 説明 |
|---|---|
| What | サーバーレスのワークフローオーケストレーション |
| Why | ビジュアル設計、AWS統合が簡単 |
| How | ASL(Amazon States Language)でステートマシン定義 |
Step Functions の構造:
+------------------+
| 開始状態 |
+--------+---------+
|
v
+------------------+
| Lambda実行 | ← タスク状態
+--------+---------+
|
v
+------------------+
| 条件分岐 | ← 選択状態
+----+-------+-----+
| |
v v
+--------+ +--------+
| 成功 | | 失敗 |
+--------+ +--------+
再試行とエラーハンドリング:
"Retry": [
{
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 10,
"BackoffRate": 1.5,
"MaxAttempts": 3
}
]
| 設定 | 説明 |
|---|---|
| IntervalSeconds | 再試行間隔(秒) |
| BackoffRate | 間隔の増加率(1.5なら10→15→22.5秒) |
| MaxAttempts | 最大再試行回数 |
Amazon MWAA(Managed Workflows for Apache Airflow)
| 項目 | 説明 |
|---|---|
| What | Apache Airflowのマネージドサービス |
| Why | 既存Airflowコードの移行、オープンソース互換 |
| How | PythonでDAGを定義、S3にアップロード |
Airflow DAG の例(概念):
# DAG定義
with DAG('etl_pipeline', schedule='@daily') as dag:
extract = GlueJobOperator(task_id='extract', ...)
transform = GlueJobOperator(task_id='transform', ...)
load = RedshiftOperator(task_id='load', ...)
extract >> transform >> load
AWS Glue ワークフロー
| 項目 | 説明 |
|---|---|
| What | Glueコンポーネント専用のオーケストレーション |
| Why | Glue内で完結するパイプラインに最適 |
| How | トリガー、クローラー、ETLジョブを連携 |
Glue ワークフローの構成:
+----------+ +----------+ +----------+ +----------+
| Trigger | --> | Crawler | --> | ETL Job | --> | Crawler |
| (定時) | | (rawデータ| | (変換) | | (変換後) |
+----------+ | カタログ化)| +----------+ | カタログ化)|
+----------+ +----------+
オーケストレーションサービスの比較
| サービス | 特徴 | 適した場面 |
|---|---|---|
| Step Functions | AWS統合◎、ビジュアル開発 | 新規AWSネイティブ開発 |
| MWAA | Airflow互換、Python DAG | 既存Airflow移行 |
| Glue ワークフロー | Glue専用、シンプル | Glueのみで完結 |
| EventBridge | イベント駆動、スケジューリング | シンプルなトリガー |
選択の指針:
+------------------+
| オーケストレーション |
| が必要か? |
+--------+---------+
|
+------------+------------+
| |
既存Airflow 新規構築
コードあり?
| |
v v
+--------+ +------------------+
| MWAA | | Glueのみで |
+--------+ | 完結するか? |
+--------+---------+
|
+---------+---------+
| |
Yes No
| |
v v
+--------+ +-------------+
| Glue | | Step |
| ワーク | | Functions |
| フロー | +-------------+
+--------+
4.3 イベント駆動アーキテクチャ
Amazon EventBridge
| 項目 | 説明 |
|---|---|
| What | サーバーレスイベントバス |
| Why | AWSサービス間、SaaS連携のイベント処理 |
| How | ルールでイベントをフィルタし、ターゲットに送信 |
EventBridge の構造:
イベントソース EventBridge ターゲット
+----------+ +-------------+ +----------+
| S3 | ------> | | ------> | Lambda |
| イベント | | ルール | +----------+
+----------+ | マッチング | +----------+
+----------+ | | ------> | Step |
| Glue | ------> | | | Functions|
| ジョブ完了 | +-------------+ +----------+
+----------+
スケジュール式の例:
| 式 | 意味 |
|---|---|
rate(1 hour) |
1時間ごと |
rate(1 day) |
1日ごと |
cron(0 9 * * ? *) |
毎日9:00 |
cron(0 0 1 * ? *) |
毎月1日 0:00 |
S3 イベント通知
S3バケットへのオブジェクト操作をトリガーにできます。
+----------+ S3イベント +-------------+ 呼び出し +----------+
| S3 | --------------> | EventBridge | ------------> | Lambda |
| (upload) | PUT/DELETE | または | | (処理) |
+----------+ | Lambda直接 | +----------+
+-------------+
設定のポイント:
- プレフィックス/サフィックスでフィルタ可能
-
s3:ObjectCreated:*で作成イベントをキャッチ
4.4 通知サービス
Amazon SNS(Simple Notification Service)
| 項目 | 説明 |
|---|---|
| What | Pub/Sub型メッセージングサービス |
| Why | 複数の受信者に同時通知 |
| How | トピックを作成し、サブスクライバーを登録 |
+----------+
| SNS |
| Topic |
+----+-----+
|
+------------+------------+
| | |
v v v
+-------+ +-------+ +-------+
| Email | | SMS | | Lambda|
+-------+ +-------+ +-------+
Amazon SQS(Simple Queue Service)
| 項目 | 説明 |
|---|---|
| What | メッセージキューサービス |
| Why | 非同期処理、疎結合化 |
| How | キューにメッセージを送信、ポーリングで受信 |
重要な設定:
| 設定 | 説明 | デフォルト |
|---|---|---|
| 可視性タイムアウト | 処理中に他から見えない時間 | 30秒 |
| メッセージ保持期間 | キュー内保持時間 | 4日(最大14日) |
| デッドレターキュー | 処理失敗メッセージの退避先 | なし |
4.5 パイプラインの信頼性設計
データパイプラインを本番運用するには、パフォーマンス、可用性、スケーラビリティ、耐障害性を考慮した設計が必要です。
信頼性設計の原則
| 原則 | 説明 | 実現方法 |
|---|---|---|
| パフォーマンス | 要求されるスループットとレイテンシを満たす | 並列化、パーティション化、適切なインスタンスサイズ |
| 可用性 | サービス停止を最小化 | マルチAZ、マネージドサービス活用 |
| スケーラビリティ | 負荷に応じて拡張可能 | Auto Scaling、サーバーレス |
| 耐障害性 | 障害発生時も継続稼働 | リトライ、チェックポイント、冗長化 |
ベストプラクティス
1. エラーハンドリングと再試行
【再試行パターン】
処理 → 失敗 → 待機(指数バックオフ) → 再試行 → 成功
↓
最大試行回数超過
↓
デッドレターキュー or アラート
2. チェックポイントと再開
- Glue ジョブブックマーク
- Kinesis チェックポイント(KCL)
- Step Functions の実行履歴
3. モニタリングとアラート
| 監視対象 | メトリクス | アラート条件例 |
|---|---|---|
| Glue ジョブ | 実行時間、エラー数 | 失敗時、閾値超過時 |
| Kinesis | IteratorAgeMilliseconds | 遅延が閾値を超えた場合 |
| Lambda | エラー率、Duration | エラー率上昇時 |
| Step Functions | 失敗した実行 | 実行失敗時 |
4. データのバックアップと復旧
- S3バージョニングの有効化
- クロスリージョンレプリケーション
- AWS Backup による一元管理
5. Task 1.4: プログラミングの概念を適用する
5.1 SQL と Spark
いつSQLを使うか、Sparkを使うか
| 観点 | SQL | Spark |
|---|---|---|
| 環境 | Redshift, Athena中心 | EMR, Glue中心 |
| 複雑さ | 単純〜中程度の変換 | 複雑な変換、ML連携 |
| スキル | 広く普及 | 専門スキル必要 |
| パフォーマンス | DWH最適化 | 大規模分散処理 |
AWS での Spark 実行オプション:
| サービス | 特徴 |
|---|---|
| AWS Glue | サーバーレス、ETL向け |
| Amazon EMR | フルカスタマイズ、大規模処理 |
| EMR on EKS | Kubernetes上でSpark |
| EMR Serverless | サーバーレスEMR |
5.2 SQLクエリの最適化
データパイプラインでSQLを使用する際、パフォーマンスを意識したクエリ設計が重要です。
クエリ最適化の基本
| 最適化手法 | 説明 | 例 |
|---|---|---|
| カラム指定 | SELECT * を避け、必要な列のみ取得 | SELECT id, name FROM users |
| フィルタ早期適用 | WHERE句でデータを早期に絞り込む | パーティション列でのフィルタ |
| 適切なJOIN | 小さいテーブルを先に(ブロードキャスト) | 内部結合 vs 外部結合の使い分け |
| サブクエリ回避 | CTEやJOINで置き換え | WITH句の活用 |
Amazon Redshift のSQL最適化
ウィンドウ関数:
| 関数 | 用途 | 例 |
|---|---|---|
ROW_NUMBER() |
行番号の付与(一意) | 重複排除、ページング |
RANK() |
順位付け(同順位あり) | ランキング作成 |
DENSE_RANK() |
順位付け(ギャップなし) | 連続した順位 |
LAG()/LEAD() |
前後の行の値を参照 | 時系列比較 |
-- 例:顧客ごとの最新注文を取得
SELECT *
FROM (
SELECT
customer_id,
order_date,
amount,
ROW_NUMBER() OVER (
PARTITION BY customer_id
ORDER BY order_date DESC
) as rn
FROM orders
) t
WHERE rn = 1;
ストアドプロシージャ:
Redshiftでは、複雑なデータ変換ロジックをストアドプロシージャとして定義できます。
-- ストアドプロシージャの作成
CREATE OR REPLACE PROCEDURE process_daily_sales()
AS $$
BEGIN
-- 一時テーブルにデータ集約
CREATE TEMP TABLE temp_sales AS
SELECT date, SUM(amount) as total
FROM sales
WHERE date = CURRENT_DATE - 1
GROUP BY date;
-- 結果をサマリーテーブルに挿入
INSERT INTO daily_summary
SELECT * FROM temp_sales;
END;
$$ LANGUAGE plpgsql;
-- 実行
CALL process_daily_sales();
試験ポイント:
-
CALL文でストアドプロシージャを実行 - Step FunctionsからRedshiftのストアドプロシージャを呼び出し可能
5.3 コード最適化
取り込み・変換の高速化手法
| 手法 | 説明 | 効果 |
|---|---|---|
| 並列処理 | 複数ノードで分散処理 | スループット向上 |
| パーティション化 | データを分割して格納 | スキャン量削減 |
| 列指向形式 | Parquet/ORC使用 | I/O削減 |
| 圧縮 | gzip, snappy等 | ストレージ・転送効率化 |
| フィルタ早期適用 | 必要なデータのみ処理 | 処理量削減 |
Lambda 最適化
| 最適化項目 | 方法 |
|---|---|
| コールドスタート対策 | プロビジョンド同時実行を設定 |
| メモリ設定 | 適切なメモリ量を設定(CPU比例) |
| VPCレイテンシ | 必要な場合のみVPC設定 |
| タイムアウト | 適切な値を設定(デフォルト3秒は短い) |
Lambda でのストレージボリュームマウント:
Lambda関数から Amazon EFS(Elastic File System)をマウントして、大きなファイルや共有データにアクセスできます。
【EFSマウントの構成】
+----------+ マウント +----------+
| Lambda | <-------------> | EFS |
| 関数 | /mnt/efs | ファイル |
+----------+ | システム |
| +----------+
| 同一VPC内で接続
v
+----------+
| VPC |
+----------+
ユースケース:
- 大きな機械学習モデルの読み込み
- 共有設定ファイルへのアクセス
- 一時ファイルの永続化(/tmpの512MB制限を超える場合)
5.4 Infrastructure as Code(IaC)
CloudFormation vs CDK
| ツール | 記述方式 | 特徴 |
|---|---|---|
| CloudFormation | YAML/JSON | AWS標準、宣言的 |
| CDK | TypeScript/Python等 | プログラム的、抽象化 |
| SAM | YAML | サーバーレス特化 |
IaC のメリット:
- バージョン管理可能
- 再現性のあるデプロイ
- レビュー・テスト可能
AWS SAM(Serverless Application Model)
サーバーレスアプリケーションのデプロイに特化したフレームワークです。
SAMの特徴:
- CloudFormationの拡張(簡潔な記法)
- ローカルでのテスト機能(sam local)
- パッケージング・デプロイの自動化
# SAMテンプレート例(template.yaml)
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Resources:
ProcessDataFunction:
Type: AWS::Serverless::Function
Properties:
Handler: app.handler
Runtime: python3.9
Events:
S3Event:
Type: S3
Properties:
Bucket: !Ref DataBucket
Events: s3:ObjectCreated:*
DataBucket:
Type: AWS::S3::Bucket
SAMのデプロイフロー:
sam build → sam package → sam deploy
↓ ↓ ↓
ビルド S3へアップロード CloudFormation実行
5.5 CI/CD
データパイプラインの CI/CD
+----------+ +----------+ +----------+ +----------+
| Code | --> | Build | --> | Test | --> | Deploy |
| Commit | | (CodeBuild) | (単体/統合)| | (本番) |
+----------+ +----------+ +----------+ +----------+
| |
v v
CodeCommit CloudFormation
or GitHub or CDK deploy
AWSサービスの組み合わせ:
| 段階 | サービス |
|---|---|
| ソース管理 | CodeCommit, GitHub |
| ビルド | CodeBuild |
| テスト | CodeBuild(テスト実行) |
| デプロイ | CodeDeploy, CloudFormation |
| パイプライン | CodePipeline |
5.6 Git によるバージョン管理
データパイプラインのコード(ETLスクリプト、IaCテンプレート)をGitで管理することは、チーム開発とCI/CDの基盤となります。
基本的なGitコマンド
| コマンド | 説明 | 使用場面 |
|---|---|---|
git clone <url> |
リポジトリをローカルにコピー | 開発開始時 |
git branch <name> |
新しいブランチを作成 | 機能開発開始時 |
git checkout <branch> |
ブランチを切り替え | 作業ブランチへ移動 |
git add <file> |
変更をステージング | コミット前の準備 |
git commit -m "message" |
変更を記録 | 作業区切りごと |
git push |
リモートにアップロード | 変更の共有 |
git pull |
リモートから取得 | 最新状態への更新 |
git merge <branch> |
ブランチを統合 | 機能の統合 |
ブランチ戦略
main (本番)
│
├── develop (開発)
│ │
│ ├── feature/add-etl-job (機能開発)
│ │
│ └── feature/fix-transform (バグ修正)
│
└── release/v1.0 (リリース準備)
AWS CodeCommit:
AWSマネージドのGitリポジトリサービスです。IAMと統合し、AWSの他サービスとシームレスに連携できます。
5.7 データ構造とアルゴリズム
データエンジニアリングで重要な概念
| 概念 | 説明 | 活用場面 |
|---|---|---|
| ハッシュテーブル | キーで高速アクセス | DynamoDB、パーティションキー |
| グラフ | 関係性の表現 | DAG、データリネージ |
| ツリー | 階層構造 | ファイルシステム、XML/JSON |
| 分散処理 | 並列化による高速化 | MapReduce、Spark |
6. サービス比較と使い分け
6.1 取り込みサービスの選択
+------------------+
| データの特性は? |
+--------+---------+
|
+----------------+----------------+
| |
リアルタイム バッチ
/ストリーミング (定期実行)
| |
v v
+---------------+ +---------------+
| Kinesis系 | | Glue / EMR |
| or MSK | | or DMS |
+-------+-------+ +-------+-------+
| |
+-------+-------+ +-------+-------+
| | | |
新規構築 既存Kafka DB移行/同期 ETL処理
| | | |
v v v v
Kinesis MSK DMS Glue/EMR
6.2 変換サービスの選択
| 要件 | 推奨サービス |
|---|---|
| サーバーレスで簡単にETL | AWS Glue |
| 大規模データ、Hadoopエコシステム | Amazon EMR |
| 軽量な変換、イベント駆動 | Lambda |
| DWH内での変換(ELT) | Redshift SQL |
| ノーコードでデータ準備 | Glue DataBrew |
6.3 主要サービス比較表
| サービス | カテゴリ | マネージド度 | コスト特性 | 主なユースケース |
|---|---|---|---|---|
| Kinesis Data Streams | ストリーミング | 中 | シャード課金 | カスタムストリーム処理 |
| Kinesis Data Firehose | ストリーミング | 高 | データ量課金 | S3/Redshiftへの配信 |
| Amazon MSK | ストリーミング | 中 | クラスター課金 | Kafka移行 |
| AWS Glue | ETL | 高 | DPU時間課金 | サーバーレスETL |
| Amazon EMR | 処理 | 中 | クラスター課金 | 大規模分散処理 |
| Step Functions | オーケストレーション | 高 | 状態遷移課金 | AWSワークフロー |
| MWAA | オーケストレーション | 高 | 環境課金 | Airflow移行 |
おわりに
Domain1はDEA試験の34%を占める最重要ドメインです。データの取り込みから変換、オーケストレーションまで幅広い内容をカバーしています。
特に押さえておきたいポイントは以下の3つです。
- サービスの使い分け - Kinesis vs MSK、Glue vs EMR など、ユースケースに応じた選択ができるように
- 信頼性設計 - ジョブブックマーク、べき等性、リトライ戦略など、本番運用を意識した設計
- コスト意識 - VPCエンドポイント、サーバーレスの活用など、効率的なアーキテクチャ
この記事が、DEA試験を目指す方の参考になれば幸いです。
一緒に頑張りましょう!