0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Observer】機能・実装要件から選択するデザインパターン:「順番非依存」「fire and forget」

Posted at

機能・実装要件から選択するデザインパターン

  • この記事では、機能・実装要件から、実装に適切なデザインパターンと実装サンプルを紹介する

実現したい機能・実装要件

  • 順番非依存
    • 処理A,Bについて、順番を逆転しても結果が同じになる
    • つまり、処理がお互いに独立していること
      • 並行可能・可換性ともいう
  • Fire and Forget
    • Fire and Forgetとは、「実行後に結果を確認しない処理」のこと
    • つまり、処理結果が本筋の処理に影響を及ぼさない処理である

処理の例

  • 下記のような処理が、上記性質に当てはまる
    • ログの保存
    • DBの永続化(永続化したものを後続で使わない前提)
    • 外部への通知
    • キャッシュ更新

Observerパターン

  • 上記のような処理は、Observerパターンとの相性が良い
  • Observerパターンは、オブジェクトの状態変化を他のオブジェクトに通知するための処理

クラス図

  • 下記は、Observerパターンのクラス図である

  • 登場人物

    • Subject

      • Observerに通知を送るためのクラス
    • Observer

      • 通知を受け取るインターフェイス
      • 下記役割を持つ
        • Subjectとの結合を疎にする
        • 依存の逆転によりConcreateObserverが誰からも依存されなくなる
          • 柔軟な拡張・追加・改修が可能になる
    • ConcreateObserver

      • Observerインターフェイスの具象クラス
      • ここで実装した処理同士は、実現したい機能・実装要件で紹介した通り、下記を満たす
        • 順番非依存・可換性・並行可能
        • Fire and Forget

サンプル

ObserverPatternSample

  • CSVに記載された商品データを取り込む処理
  • 下記2つのObserverを、Subjectに登録しており、1行取り込むごとに、Observer処理を実行する
    • 取り込みログの保存
    • 商品データの永続化
  • これら2つは、順番非依存とFire and Forgetを満たしており、Observerパターンとの相性が良い
  • Subjectは、マルチスレッド化しており、Obserberの処理が、本処理(CSVの取り込み・バリデーション)を阻害しないようになっている

Oberver

  • i_observer.py
    • 各Obserberのインターフェイス
    • Subjectから、取り込み結果おぶじぇくと(import_result)を受取り、commit()を実行する
class IObserver(ABC):
    @abstractmethod
    def commit(self, import_result: ImportResult):
        pass

ConcreateObserver

  • 上記IObserberを具象化したObserver処理である

  • import_log_repository.py

    • 取り込み結果オブジェクトから、取り込みログを作成する処理
    • 取り込み結果オブジェクトは、取り込み結果を持っており、それによって取り込みログの書き込み処理が分岐する
      • バリデーションエラー
      • 正常取り込み
class ImportLogRepository(IObserver):

    def commit(self, import_result: ImportResult):
        if import_result.event_code == ImportResult.SKU_UPSERT:
            self._write_sku_upsert_log(import_result.sku)
        elif import_result.event_code == ImportResult.VALIDATE_ERROR:
            self._write_validate_error_log(import_result.error_messages)

    def _write_sku_upsert_log(self, sku_input: SkuInput):
        # Write a log for upserting the SKU here
        print(f'the SKU saved (sku_code: {sku_input.sku_code})')
        
    def _write_validate_error_log(self, error_messages: List[str]):
        # Write a validate error log here
        print(f'validate error! (error_messages: {"\t".join(error_messages)})')
  • sku_repository.py
    • CSVから読み取った商品情報を、DBに永続化する処理
from fire_and_forget.import_result import ImportResult, SkuInput
from fire_and_forget.observers import IObserver


class SkuRepository(IObserver):
    def commit(self, import_result: ImportResult):
        if import_result.event_code == ImportResult.SKU_UPSERT:
            self._upsert_sku(import_result.sku)


    def _upsert_sku(self, sku: SkuInput):
        # Write a something that upsert and commit the SKU here
        print(f'upsert and commit a sku(sku_code: {sku.sku_code})')

Subject

  • subject.py
    • 上記Observerに、通知(取り込み結果オブジェクト)を送る処理である
    • 本処理(CSVの取り込み・バリデーション)は、Observer処理の結果に依存していないため、Observer処理は、マルチスレッドで処理する
      1. 【本処理】notify()を実行し、取り込み結果をキューに入れる
      2. 【workerスレッド】キューから取り込み結果を取り出し、Observerのcommit()を実行する
from fire_and_forget.import_result import ImportResult
from fire_and_forget.observers import IObserver
from typing import List, Optional
from queue import Queue
from threading import Thread


class Subject:
    def __init__(self, observers: Optional[List[IObserver]] = None):
        self.observers = observers if observers else []
        self.queue = Queue()
        self.thread = Thread(target=self.worker, daemon=True)
        self.thread.start()

    def attach_observer(self, observer: IObserver):
        self.observers.append(observer)

    def detach_observer(self, number: int):
        try:
            self.observers.pop(number)
        except IndexError:
            print(f"Can't detach {number}th observer")

    def notify(self, import_result: ImportResult):
        for observer in self.observers:
            self.queue.put((observer, import_result))

    def worker(self):
        while True:
            observer, import_result = self.queue.get()
            if observer is None and import_result is None:
                self.queue.task_done()
                break
            observer.commit(import_result)
            self.queue.task_done()

    def worker_stop(self):
        self.queue.put((None, None))
        self.thread.join()

まとめ

  • 順番非依存かつFire and Forgetな処理の性質を持つ処理(ログ記録、非同期通知、キャッシュ更新など)においては、Observerパターンが非常に有効

  • Observerパターンを適用することで、本処理に影響を与えずに副次的な処理を追加・拡張でき、保守性や拡張性を確保できる

  • 本記事で紹介したサンプルでは、Observerの処理をマルチスレッドで非同期実行することで、メイン処理のパフォーマンスを損なうことなく処理が可能

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?