11
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

テンプレートメソッドパターンを用いてパイプライン処理を拡張する

Last updated at Posted at 2025-12-17

株式会社ブレインパッド プロダクトユニットの大畑です。

弊社は「データ活用の促進を通じて持続可能な未来をつくる」をミッションに、
データ分析支援やSaaSプロダクトの提供を通じて、企業の「データ活用の日常化」を推進しております。

私は、SaaSプロダクトを開発・提供する「プロダクト・サービス」部門に所属し、
企業のマーケティング活動をデータで支援する「Rtoaster GenAI」の開発を担当しております。

はじめに

デザインパターンの中でも、テンプレートメソッドパターン (Template Method Pattern) は、処理の骨格をスーパークラスで定義し、具体的な処理をサブクラスに委譲するパターンです。

本記事では、複数のソースからファイルを取り込む機能を例に、テンプレートメソッドパターンの実践的な適用方法を紹介します。

想定読者

  • パイプライン処理を保守性高く拡張したい方
  • テンプレートメソッドパターンの実装例を探している方

テンプレートメソッドパターンとは?

概要

テンプレートメソッドパターンは、アルゴリズムの骨組みをあらかじめ定義しておき、そのアルゴリズムの具体的な処理はサブクラスに任せるGoFデザインパターンの一つです。

抽象クラスの templateMethod() は、step1〜3 の呼び出し順だけを定義したメソッドであり、そこでは「処理の骨格」だけを実装し、各ステップの中身は抽象メソッドとしてサブクラスに委譲します。

メリット

  • コードの重複を排除 - 共通処理を基底クラスにまとめられる
  • 拡張性 - 新しいバリエーションを追加する際、サブクラスを追加するだけ
  • 一貫性 - 処理フローが統一される

実装例:ファイル取り込み機能

背景

以下2つの方法でCSVファイルを取り込む機能を考えます:

  1. クラウドストレージ経由 - ブラウザから直接アップロードしたファイルを処理
  2. SFTP経由 - 外部サーバーからファイルを取得して処理

どちらも「ファイルを取得 → データベースに登録 → 後処理」という処理フローは共通ですが、細かい処理は異なります。

始めに検討したアプローチ

最初は、処理フローを完全にステップに分解し、それぞれを抽象メソッドとする設計を検討しました。しかし、抽象メソッドが過剰に増えると「共通処理がサブクラスに分散する」「粒度が合わない」などの問題が起きやすく、今回のようなパイプライン処理には適しませんでした。

class BaseImporter(ABC):
    async def run(self) -> None:
        files = await self.step1_fetch_files()
        validated = await self.step2_validate(files)
        await self.step3_upload_to_storage(validated)
        await self.step4_import_to_database()
        await self.step5_cleanup()
    
    @abstractmethod
    async def step1_fetch_files(self): ...
    
    @abstractmethod
    async def step2_validate(self, files): ...
    
    @abstractmethod
    async def step3_upload_to_storage(self, files): ...
    
    @abstractmethod
    async def step4_import_to_database(self): ...
    
    @abstractmethod
    async def step5_cleanup(self): ...

問題1:共通処理が分断される

例えば、データベースへの登録処理はクラウドストレージ/SFTP どちらでも同じなのに、サブクラス毎に実装が必要になってしまいます。同じ処理を複数のサブクラスで記述するのは本末転倒。

問題2:ステップの粒度が合わない

今回、クラウドストレージとSFTPでこまごまとした処理の違いがあったため、そのままデザインパターンを適用しても不自然で保守性が低い実装になりそうでした。

問題3:エラーハンドリングが複雑になる

各ステップでのエラー時に「取り込み元(クラウドストレージ/SFTP)」や「処理の進捗状況」によって後処理が変わるため、ステップ単位の抽象化では対応しづらくなります。

改善後の設計

そこで、処理フローの骨格はテンプレートメソッド(BaseImporter)に残しつつ、処理として一般化できるイベント部分をサブクラスとして実装するアプローチに変更しました。

最初のアプローチ:「何を」するかをステップ毎に定義
       step1 → step2 → step3 → step4 → step5

改善後のアプローチ:「いつ」処理を挟むかをイベントで定義
       [共通処理] → on_skip / on_success / on_error

実装

定義クラス

from dataclasses import dataclass

@dataclass
class FileInfo:
    """ファイルパスやサイズ、メタ情報を持つドメインオブジェクト"""
    path: str
    size: int
    meta: str | None = None

抽象基底クラス

from abc import ABC, abstractmethod

class BaseImporter(ABC):
    """ファイル取り込み処理の基底クラス"""
    
    def __init__(self, source_id: str, job_id: str):
        self.source_id = source_id
        self.job_id = job_id

    async def run(self) -> None:
        """
        テンプレートメソッド: 処理の骨格を定義
        """
        # Step 1: ファイル一覧を取得(サブクラスで実装)
        files = await self.fetch_files()
        
        # ファイルの分類(共通処理)
        files_to_skip, files_to_process = self._classify_files(files)

        # Step 2: スキップ対象の処理(サブクラスで実装)
        for f in files_to_skip:
            await self.on_skip(f)

        # Step 3: データベースへの登録(共通処理)
        for i, file_info in enumerate(files_to_process):
            try:
                await self._import_to_database(file_info)
                # Step 4a: 成功時コールバック(サブクラスで実装)
                await self.on_success(file_info)
            except Exception as e:
                # Step 4b: エラー時コールバック(サブクラスで実装)
                await self.on_error(file_info, e)
                raise

    def _classify_files(
        self, files: list[FileInfo]
    ) -> tuple[list[FileInfo], list[FileInfo]]:
        """スキップ対象のファイルと取り込み対象のファイルを分ける(共通)"""
        # ...省略(何かしらの実装)...

    async def _import_to_database(self, file_info: FileInfo) -> None:
        """データベースへの登録処理(共通)"""
        # 実際の実装ではDB固有の処理を行う
        # ...省略(何かしらの実装)...

    # ========== 抽象メソッド(サブクラスで実装が必要) ==========
    
    @abstractmethod
    async def fetch_files(self) -> list[FileInfo]:
        """取り込み対象のファイル一覧を取得する"""
        ...

    @abstractmethod
    async def on_skip(self, file_info: FileInfo) -> None:
        """ファイルがスキップされた時の処理"""
        ...

    @abstractmethod
    async def on_success(self, file_info: FileInfo) -> None:
        """ファイル処理が成功した時の処理"""
        ...

    @abstractmethod
    async def on_error(
        self, 
        file_info: FileInfo,
        exception: Exception
    ) -> None:
        """ファイル処理が失敗した時の処理"""
        ...

具象クラス1:CloudStorageImporter

class CloudStorageImporter(BaseImporter):
    """クラウドストレージからの取り込み"""
    
    def __init__(self, source_id: str, job_id: str, storage_url: str):
        super().__init__(source_id, job_id)
        self.storage_url = storage_url

    async def fetch_files(self) -> list[FileInfo]:
        """クラウドストレージ向けのファイル取得時の処理"""
        # ...省略(何かしらの実装)...

    async def on_skip(self, file_info: FileInfo) -> None:
        """クラウドストレージ向けのスキップしたファイルに対する処理"""
        # クラウドストレージではスキップは想定しないため何もしない
        return

    async def on_success(self, file_info: FileInfo) -> None:
        """クラウドストレージ向けの処理済みファイルに対する処理"""
        # ...省略(何かしらの実装)...

    async def on_error(
        self,
        file_info: FileInfo,
        exception: Exception
    ) -> None:
        """クラウドストレージ向けのエラー時の処理"""
       # ...省略(何かしらの実装)...

具象クラス2:SftpImporter

class SftpImporter(BaseImporter):
    """SFTPサーバーからの取り込み"""
    
    def __init__(self, source_id: str, job_id: str, sftp_config: dict):
        super().__init__(source_id, job_id)
        self.sftp_config = sftp_config

    async def fetch_files(self) -> list[FileInfo]:
        """SFTP向けのファイル取得時の処理"""
         # ...省略(何かしらの実装)...

    async def on_skip(self, file_info: FileInfo) -> None:
        """SFTP向けのスキップしたファイルに対する処理"""
        # ...省略(何かしらの実装)...

    async def on_success(self, file_info: FileInfo) -> None:
        """SFTP向けの処理済みファイルに対する処理"""
        # ...省略(何かしらの実装)...

    async def on_error(
        self,
        file_info: FileInfo,
        exception: Exception
    ) -> None:
        """SFTP向けのエラー時の処理"""
        # ...省略(何かしらの実装)...

使用例

async def main():
    source_id = "source_001"
    job_id = "job_12345"
    
    # ソースタイプに応じて適切なImporterを選択
    if source_type == "cloud":
        importer = CloudStorageImporter(
            source_id, job_id,
            storage_url="gs://bucket/data.csv"
        )
    else:
        importer = SftpImporter(
            source_id, job_id,
            sftp_config={"host": "sftp.example.com", "user": "admin"}
        )
    
    # どちらも同じインターフェースで実行可能
    await importer.run()

ソース毎の処理の違い

処理 CloudStorageImporter SftpImporter
fetch_files() URLをパースするだけ DL → 中間ストレージへUL
on_skip() 発生しない(単一ファイル) skippedディレクトリへ移動
on_success() ログ出力のみ processedディレクトリへ移動
on_error() エラーログ記録 ファイル移動 + エラーログ記録

パターン適用のポイント

1. 適切な粒度でメソッドを分割する

テンプレートメソッド(ここでは run())が「どのような流れで何をするか」を定義し、その中で呼び出す fetch_files / on_skip / on_success / on_error などの抽象メソッドで「具体的にどうやるか」をサブクラスに任せます。

2. 共通処理はテンプレートメソッドにする

データベースへの登録やファイルの分類ロジックなど、どのサブクラスでも共通の処理はテンプレートメソッド内に記述します。

3. イベントを適切に設計する

今回の例では、処理の各フェーズ(準備、スキップ、成功、エラー)にイベントを設けています。これにより、サブクラスは必要な処理だけを実装できます。

4. 新しいソースの追加が容易

例えば、今後S3やAzure Blobからの取り込みが必要になった場合も、BaseImporterを継承して4つの抽象メソッドを実装するだけで対応できます。

まとめ

テンプレートメソッドパターンを使用することで:

  • 処理フローの一貫性を保ちながら、ソース毎の差異を吸収
  • 新しいソース追加時も既存コードへの影響が最小限
  • テストが書きやすい(各抽象メソッドを個別にテスト可能)

また、「全てをステップとして抽象化する」アプローチではなく、フックポイントとして注入できるようにすることで、より柔軟で実用的な設計になりました。

参考

11
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?