はじめに
本記事では、Dagsterとオニオンアーキテクチャを組み合わせたETLパイプラインの実装について解説します。
Wikipedia APIからデータを取得してCSVに保存する具体例を通じて、保守性と拡張性を兼ね備えたデータパイプラインの構築方法を紹介します。
完全なコード例は以下のリポジトリで公開しています:
https://github.com/nokoxxx1212/dagster-onion-example
オニオンアーキテクチャとは
概要
オニオンアーキテクチャは、ソフトウェアの関心事を層で分離し、内側の層が外側の層に依存しないよう設計するアーキテクチャパターンです。
主要な4つの層から構成されます
- Domain層: ビジネスロジック・データモデル・抽象インターフェース
- Infrastructure層: 外部システム(API、データベース、ファイルシステム)の具体実装
- Usecase層: アプリケーションの処理フロー・ワークフロー定義
- UI層: ユーザーインターフェース(CLI、Web UI等)
メリット
1. 技術の差し替えが容易
外部システムの変更(例:CSV → BigQuery)時、Infrastructure層のみの修正で対応可能
2. テスタビリティの向上
各層が独立してテスト可能。特にDomain層はモックなしでテスト実行可能
3. チーム開発の効率化
層ごとに責務が明確で、並行開発しやすい
4. 保守性の向上
依存関係が一方向で、変更の影響範囲を予測しやすい
ディレクトリ構成例
実際のプロジェクト構造は以下のようになります:
data-pipeline-etl/
├── domain/ # ドメイン層(抽象化とビジネスロジック)
│ ├── models.py # Pandera でのスキーマ定義・データ構造
│ ├── repositories.py # 抽象インターフェース定義
│ └── services.py # バリデーションなどビジネスロジック処理
├── infrastructure/ # インフラ層(外部システム実装)
│ ├── api_clients.py # 外部API実装(HTTP通信)
│ └── storage.py # ファイルシステム実装(CSV・JSON保存)
├── usecase/ # ユースケース層(Dagster資産の定義)
│ ├── assets.py # Dagsterの @asset 定義
│ └── jobs.py # Dagsterのジョブ定義
├── ui/ # プレゼンテーション層(CLIなど)
│ └── cli.py # CLI でジョブを実行するエントリポイント
├── tests/ # テストコード
├── definitions.py # Dagster Definitions をまとめる
└── pyproject.toml # Python 環境構成(uv 用)
各層の責務が明確に分離されており、依存関係はDomain層を中心とした同心円状になっています。
処理フローとコードサンプル
1. エントリポイント: ui/cli.py
# ui/cli.py
def execute_job(job_name: str, dry_run: bool = False, verbose: bool = False):
"""指定されたジョブを実行"""
from definitions import defs
# 全アセットを取得してmaterializeで実行
all_assets = [asset for asset in defs.get_all_asset_specs()]
result = materialize(all_assets)
if result.success:
context.log.info(f"Job '{job_name}' completed successfully")
else:
context.log.error(f"Job '{job_name}' failed")
2. アセット定義の取得: definitions.py
# definitions.py
from dagster import Definitions
from usecase.assets import fetch_raw_pages, validate_pages, clean_and_process_pages, store_pages_to_csv
defs = Definitions(
assets=[
fetch_raw_pages,
validate_pages,
clean_and_process_pages,
store_pages_to_csv,
],
jobs=[wikipedia_etl_job, filter_pages_job, full_pipeline_job]
)
3. データ取得アセット: usecase/assets.py
# usecase/assets.py
@asset(description="Wikipedia APIからページ一覧を取得")
def fetch_raw_pages(context: AssetExecutionContext) -> pd.DataFrame:
# 環境変数から設定取得
api_url = os.getenv("WIKI_API_URL", "https://en.wikipedia.org/w/api.php")
# API設定作成(Domain層のモデル使用)
api_config = WikipediaApiConfig(base_url=api_url, limit=50)
# Infrastructure層の実装を使用
api_client = WikipediaApiClient()
df = api_client.fetch_wikipedia_pages(api_config)
# UIで見やすくするためのメタデータ追加
context.add_output_metadata({
"row_count": len(df),
"preview": MetadataValue.md(df.head(3).to_markdown(index=False)),
"api_url": MetadataValue.url(api_url),
})
return df
4. データ検証: domain/services.py
# domain/services.py
class ValidationService:
@staticmethod
def validate_wikipedia_pages(df: pd.DataFrame) -> ProcessingResult:
try:
# Panderaスキーマでデータ検証
validated_df = PageSchema.validate(df)
return ProcessingResult(
success=True,
message="Data validation successful",
record_count=len(validated_df)
)
except pa.errors.SchemaError as e:
return ProcessingResult(
success=False,
message="Schema validation failed",
record_count=len(df),
errors=[str(e)]
)
5. 外部API実装: infrastructure/api_clients.py
# infrastructure/api_clients.py
class WikipediaApiClient(WikipediaRepository):
def fetch_wikipedia_pages(self, api_config: WikipediaApiConfig) -> pd.DataFrame:
try:
response = requests.get(
api_config.base_url,
params=api_config.to_params(),
timeout=self.timeout
)
response.raise_for_status()
data = response.json()
pages = data["query"]["allpages"]
if not pages:
return pd.DataFrame(columns=["pageid", "title"])
return pd.DataFrame(pages)[["pageid", "title"]]
except requests.RequestException as e:
raise requests.RequestException(f"Failed to fetch Wikipedia data: {str(e)}")
6. データ保存: infrastructure/storage.py
# infrastructure/storage.py
class CsvStorageAdapter(StorageAdapter):
def save_dataframe(self, df: pd.DataFrame, file_path: str, **kwargs) -> ProcessingResult:
try:
# ディレクトリが存在しない場合は作成
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# CSVファイルに保存
df.to_csv(file_path, index=False, **kwargs)
return ProcessingResult(
success=True,
message=f"Data successfully saved to {file_path}",
record_count=len(df)
)
except Exception as e:
return ProcessingResult(
success=False,
message=f"Failed to save data: {str(e)}",
errors=[str(e)]
)
処理の流れ
-
ui/cli.py
でCLI引数を解析 -
definitions.py
からアセット定義を取得 - Dagsterが依存関係順にアセットを実行
-
usecase/assets.py
の各アセットが順次実行 - 各アセット内で
domain/
とinfrastructure/
の実装を使用 - 最終的に
data/pages.csv
にデータが保存される
実装の役割分担とフロー
データサイエンティストとソフトウェアエンジニアの効率的な協働を実現するため、以下のような役割分担を設定しています。
データサイエンティスト担当
担当ファイル:
-
domain/models.py
- データスキーマ・バリデーションルール定義 -
domain/services.py
- データ処理・分析ロジック -
usecase/assets.py
- データパイプライン設計・アセット定義
主な作業内容:
# 新しいデータスキーマ追加例
class SalesDataSchema(pa.DataFrameModel):
date: pa.typing.Series[pd.Timestamp]
amount: pa.typing.Series[float] = pa.Field(ge=0)
category: pa.typing.Series[str]
# 新しい分析アセット追加例
@asset
def analyze_sales_trends(sales_data: pd.DataFrame):
"""売上トレンド分析を実行し、季節性を検出する"""
# トレンド分析ロジック
return analysis_result
ソフトウェアエンジニア担当
担当ファイル:
-
infrastructure/api_clients.py
- 外部API・HTTP通信実装 -
infrastructure/storage.py
- データストレージ・出力実装 -
ui/cli.py
- CLI・ユーザーインターフェース -
domain/repositories.py
-
主な作業内容:
# 新しいストレージ実装例
class BigQueryAdapter(StorageAdapter):
def save_dataframe(self, df: pd.DataFrame, table_name: str):
# BigQuery出力実装
# 新しいCLIオプション追加例
parser.add_argument('--output-format', choices=['csv', 'parquet', 'bigquery'])
協働フロー
-
要件整理 (データサイエンティスト主導)
- 必要なデータソース・処理・出力形式を定義
-
設計協議 (両チーム)
- データスキーマと抽象インターフェースを共同設計
-
並行開発
- データサイエンティスト:
domain/
+usecase/
の実装 - ソフトウェアエンジニア:
infrastructure/
+ui/
の実装
- データサイエンティスト:
-
統合テスト (両チーム)
- パイプライン全体のテスト・パフォーマンス検証
Tips
依存性注入の利点
従来のアプローチ:
# アセット内で直接実装クラスを使用(密結合)
@asset
def get_data():
client = WikipediaApiClient() # 具体実装に依存
return client.fetch_data()
オニオンアーキテクチャでの依存性注入:
# 抽象インターフェースを定義
class WikipediaRepository(ABC):
def fetch_pages(self, config) -> pd.DataFrame: ...
# アセットは抽象に依存
@asset
def fetch_raw_pages():
# 実装は外部から注入される
api_client = WikipediaApiClient() # 具体実装
return api_client.fetch_wikipedia_pages(config)
利点: テスト時にモック実装を注入可能、本番では実際の実装を使用
抽象インターフェース定義の価値
# domain/repositories.py
class DataRepository(ABC):
@abstractmethod
def fetch_data(self, source: DataSource) -> pd.DataFrame:
"""データを取得する抽象メソッド"""
pass
# infrastructure/api_clients.py
class WikipediaApiClient(DataRepository):
def fetch_data(self, source: DataSource) -> pd.DataFrame:
# Wikipedia API固有の実装
class TwitterApiClient(DataRepository):
def fetch_data(self, source: DataSource) -> pd.DataFrame:
# Twitter API固有の実装
利点: 新しいデータソース追加時、既存コードの変更不要
repositoriesとinfrastructureの違い
- repositories: 抽象インターフェースの定義のみ(実装なし)
- infrastructure: repositoriesの具体実装(実際のHTTP通信等)
# domain/repositories.py(抽象化)
class StorageRepository(ABC):
@abstractmethod
def save(self, data: pd.DataFrame, path: str) -> bool: ...
# infrastructure/storage.py(具体実装)
class CsvStorageAdapter(StorageRepository):
def save(self, data: pd.DataFrame, path: str) -> bool:
data.to_csv(path) # 実際のファイル操作
return True
servicesにバリデーションを配置する理由
# domain/services.py
class ValidationService:
@staticmethod
def validate_wikipedia_pages(df: pd.DataFrame) -> ProcessingResult:
"""ビジネスルールに基づくデータ検証"""
# Panderaスキーマ検証
# カスタムビジネスルール検証
# 結果の統一形式での返却
理由: バリデーションはビジネスロジックの一部。外部技術(Pandera)に依存するが、ビジネスルールを表現するためDomain層に配置
Panderaスキーマ検証の配置場所
# domain/models.py(スキーマ定義)
class PageSchema(pa.DataFrameModel):
pageid: int = pa.Field(ge=1)
title: str = pa.Field(min_length=1)
# domain/services.py(検証実行)
class ValidationService:
@staticmethod
def validate_wikipedia_pages(df: pd.DataFrame):
return PageSchema.validate(df) # スキーマ使用
配置理由: データ構造定義はmodels.py
、検証ロジックはservices.py
で責務分離
オニオンアーキテクチャならではのテスト
# tests/test_domain.py(Domain層のテスト)
def test_validation_service():
# モックなしでテスト可能
data = {"pageid": [1, 2], "title": ["Page1", "Page2"]}
df = pd.DataFrame(data)
result = ValidationService.validate_wikipedia_pages(df)
assert result.success is True
# tests/test_infrastructure.py(Infrastructure層のテスト)
@patch('infrastructure.api_clients.requests.get')
def test_wikipedia_api_client(mock_get):
# HTTPモックでテスト
mock_get.return_value.json.return_value = {"query": {"allpages": []}}
client = WikipediaApiClient()
result = client.fetch_wikipedia_pages(config)
assert len(result) == 0
利点: 各層が独立してテスト可能。Domain層は外部依存なしでテスト実行
Dagsterとは
概要
Dagsterは、データパイプラインを構築・運用するためのデータオーケストレーションツールです。
Apache Airflowの後継的位置づけで、よりモダンな設計とデータ中心のアプローチを採用しています。
Dagsterの利点
1. データ中心設計
処理(Task)ではなくデータ(Asset)を中心とした設計
2. 優れた開発体験
型安全性、豊富なメタデータ、直感的なWeb UI
3. テスタビリティ
各アセットが独立してテスト可能
4. 監視・運用機能
リッチなログ、データ系譜、品質チェック機能を標準搭載
主要な用語
Asset: データの生成・変換を行う単位
@asset
def processed_data(raw_data: pd.DataFrame) -> pd.DataFrame:
return raw_data.dropna()
Job: 複数のAssetをまとめた実行単位
wikipedia_etl_job = define_asset_job(
"wikipedia_etl_job",
selection=[fetch_raw_pages, validate_pages, store_pages_to_csv]
)
Definitions: プロジェクト全体のAsset・Jobをまとめる設定
defs = Definitions(
assets=[asset1, asset2, asset3],
jobs=[job1, job2]
)
Materialize: Assetを実際に実行してデータを生成すること
「UIだけ見れば8割わかる Dagster資産」実践
Dagster Web UIでの可視性を最大化するベストプラクティス:
1. 動詞ベースの命名規則
# 従来: raw_data, processed_data
# 改善: fetch_raw_data, clean_data, store_data
@asset(description="APIからデータを取得する")
def fetch_raw_pages(): ...
@asset(description="データをクリーニングする")
def clean_and_process_pages(): ...
2. 包括的なdocstring
@asset
def validate_pages(context: AssetExecutionContext, fetch_raw_pages: pd.DataFrame):
"""Panderaでスキーマ検証を実行し、型変換と必須フィールドチェックを行う。
Args:
context: Dagsterアセット実行コンテキスト
fetch_raw_pages: 生のWikipediaページデータ
Returns:
pd.DataFrame: 検証済みのpageidとtitleカラムを含むDataFrame
"""
3. 動的メタデータでUI情報充実
context.add_output_metadata({
"row_count": len(df),
"preview": MetadataValue.md(df.head(3).to_markdown(index=False)),
"output_path": MetadataValue.path(output_path),
"file_size_bytes": os.path.getsize(output_path)
})
4. 構造化ログ
context.log.info("fetch_raw_pages: 開始")
context.log.info(f"fetch_raw_pages: 完了 rows={len(df)} api_url={api_url}")
これらの実践により、Web UIを見るだけで:
- 各アセットの処理内容
- データの品質・統計情報
- 実行状況・エラー原因
- ファイル出力先・サイズ
が一目で理解できるようになります。
まとめ
Dagsterとオニオンアーキテクチャの組み合わせにより、以下を実現できます:
- 保守性: 各層の責務が明確で、変更の影響範囲が限定的
- 拡張性: 新しいデータソース・出力先の追加が容易
- テスタビリティ: 各層が独立してテスト可能
- チーム開発: 役割分担により並行開発が効率的
- 運用性: Dagster UIによる優れた監視・デバッグ体験
特に「UIだけ見れば8割わかる」アプローチにより、データパイプラインの理解・保守・運用が大幅に改善されます。
データエンジニアリングにおいて、技術的な実装だけでなく、アーキテクチャ設計とチーム協働の仕組み作りが成功の鍵となることを、この実践例を通じて示すことができました。