0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Reactor】実装要件から選択するデザインパターン:「並行処理」「イベント処理」

Last updated at Posted at 2025-08-02

実装要件から選択するデザインパターン

  • 本記事では、特定のデザインパターンについて、実装要件から逆引きで紹介します
    • 実装時に最適なパターンを選択するための指針となることを目的とします

実現したい実装要件

■ 多数の外部との並行的なやり取りを効率よく処理したい

現代のアプリケーションでは、外部との並行的な通信が頻繁に発生します。
たとえば以下のようなケースが挙げられます:

  • Webクライアントやモバイルアプリとの多数の同時通信(例:WebSocketチャット)
  • 外部APIとの複数並列呼び出し(例:マイクロサービス連携)
  • 複数ファイルやデバイスとの非同期I/O(例:センサーデータ、ログストリーム)
  • GUIにおけるユーザーイベント(例:キーボード入力、マウス操作)

■ メモリやスレッド数など、使用リソースを抑えたい

IoTデバイスやクラウドサービスのように、資源が制約される環境では、メモリやCPUの使用を抑える必要があります。
このような場面では、各処理ごとにスレッドやプロセスを起動する方式は、スケールしない選択となりえません。


■ ノンブロッキングなI/Oが可能なプラットフォームで稼働する

Linuxの epoll や FreeBSD/macOSの kqueue 、Windowsの IOCP など、非同期I/O APIが使用可能な環境では、イベント駆動型の処理が最適です。


■ スレッド間共有状態やロック制御を極力避けたい

マルチスレッド環境では、状態共有のためにロックや排他制御が必要になり、コードの複雑性とバグのリスクが高まります。

Reactorパターン

Reactorパターンは、非同期I/Oとイベント駆動によって、複数の入力ソースを効率的に処理するためのアーキテクチャパターンです。

このパターンでは、I/Oの状態変化(データの受信準備ができた、ソケットに接続が来た等)に応じて、あらかじめ登録された処理(EventHandler)が呼び出されます。

このパターンの重要な特徴は、イベントドリブンな設計により「待機処理を省略」できる点です。
待機を必要とするI/O操作はReactorが一括で監視し、実際にイベントが発生したときだけ対応する処理を起動するため、CPUやメモリの無駄がありません。

また、基本的にシングルスレッドで処理できる設計であるため、状態共有や排他制御の複雑性も最小限に抑えられます。そのため、保守性や拡張性にも優れています。

クラス図

以下はReactorパターンにおける主要な構成要素の関係を表したクラス図です。

Reactorパターンは、大まかに3つから構成されます

  • Demultiplexer
    • epoll(Linux)などを使い、目的のリソースの状態変化(通信準備完了・データ受信・タイマー起動など...)について監視を行う
  • EventHandler
    • Reactorパターンにより実行する実際の業務ロジック
  • Reactor
    • DemultiplexerとEventHandlerの仲立ちを行う制御処理
    • Reactorはイベントループにより、Demultiplexerからの通知を逐次確認する
    • 通知を受け取った後、該当するEventHandlerを呼び出す

サンプル

ReactorPatternSample

  • 非同期I/Oのエコーサーバ
    • 接続を受け取るListnerSocketと、通信を行うEchoSocketを、EventHandlerとして、Demultiplexer(selectorモジュール)に登録する
    • Reactorがイベント発生時にのみEventHandlerを呼び出すことで、EventHandlerは通信の完了を待機する必要がない

EventHandler

IEventHandlerクラス(i_event_handler.py
  • ConcreteEventHandlerのインターフェイス
    • Reactorクラスとの関係を粗結合にする
class IEventHandler(ABC):
    @property
    @abstractmethod    - **100個の接続を同時に起動**
    - **1秒間に10回 "ping" を送信**
    - **10秒間の間同時接続し続ける**
    def file_descriptor(self) -> Any:
        pass

    @abstractmethod
    def handle(self, *args) -> Any:
        pass
ListnerSocketクラス (listener_socket.py)
  • ソケットを生成・バインドし、非ブロッキングモードで listen() を開始
  • handle() で新規接続を accept() し、EchoSocket インスタンスを返却
  • Reactor は返却されたハンドラを登録し、後続の入出力処理を移譲
class ListnerSocket(IEventHandler):
    def __init__(self) -> None:
        self._init_socket()

    def _init_socket(self):
        self._socket = Socket()
        self._socket.bind((HOST, PORT))
        self._socket.setblocking(False)
        self._socket.listen()

    @property
    def file_descriptor(self):
        return self._socket

    def handle(self, *args):
        if len(args) > 0 and isinstance(args[0], Socket):
            client_sock, address = args[0].accept()
            return EchoSocket(client_sock, address)
    # ...

EchoSocketクラス (echo_socket.py)
  • クライアントから届いたデータを recv() で読み取り、そのまま send() で返送(エコー)
  • 切断を検知したら should_close フラグを立て、自身を返却して Reactor に解除を指示
class EchoSocket(IEventHandler):
    def __init__(self, socket: socket, address):
        self._socket = socket
        self._address = address
        self.should_close = False
        self._socket.setblocking(False)

    @property
    def file_descriptor(self) -> Any:
        return self._socket

    def handle(self, *_) -> Any:
        data = self._socket.recv(1000)
        if data:
            print('echoing', repr(data), 'to', self._socket)
            self._socket.send(data)
            return None
        else:
            print('closing', self._socket)
            self.should_close = True
            return self
    # ...

Reactorクラス (reactor.py

Reactor パターンにおける Reactor クラス
EventLoopにより、Demultiplexerに登録したクラスが処理可能になったかどうかを逐次確認しています

また、本サンプルにおいて、Demultiplexerは、selectorモジュールをそのまま使用します

  • 複数のイベントソース(IEventHandler が公開するファイルディスクリプタ)を selectors モジュールで監視
  • イベント発生時に該当ハンドラの handle() を呼び出して処理を実行
  • regist_new_event_handler()unregist_event_handler() によるハンドラの動的登録・解除をサポート
class Reactor:
    def __init__(self, event_handler: IEventHandler):
        self._init_selector(event_handler)

    def _init_selector(self, event_handler: IEventHandler):
        self.selector = selectors.DefaultSelector()
        self.selector.register(
            fileobj=event_handler.file_descriptor,
            events=selectors.EVENT_READ,
            data=event_handler,
        )

    def event_loop(self):
        while True:
            events = self.selector.select()
            for key, _ in events:
                yield key.data.handle(key.fileobj)
    # ...

main.py(エントリーポイント) (main.py)
  • 最初に ListnerSocketReactor に登録し、event_loop() を起動
  • ループ内で返却されるハンドラを判定し、EchoSocket の場合は should_close に応じて登録/解除
if __name__ == "__main__":
    reactor = Reactor(ListnerSocket())
    for result in reactor.event_loop():
        if isinstance(result, EchoSocket):
            if result.should_close:
                reactor.unregist_event_handler(result)
                del result
            else:
                reactor.regist_new_event_handler(result)

動作検証

ReactorPatternSampleが、複数クライアントからの同時通信に対応できるのかを、tcpkaliコマンドを使用し、検証しました。

tcpkali

tcpkaliコマンドは、多数のtcp同時接続をテストできるツールです。

tcpkali -c 100 -r 10 --connect-rate=100 -T 10 127.0.0.1:8080 --message "ping"

上記のコマンドは:

  • 100個の接続を同時に起動
  • 1秒間に10回 "ping" を送信
  • 10秒間の間、同時接続し続ける

※ ReactorPatternSampleの起動は、README.md参照

検証結果
tcpkali -c 100 -r 10 --connect-rate=100 -T 10 127.0.0.1:8080 --message "ping"

Destination: [127.0.0.1]:8080
Interface lo address [127.0.0.1]:0
Using interface lo to connect to [127.0.0.1]:8080
Ramped up to 100 connections.
Total data sent:     39.1 KiB (40000 bytes)
Total data received: 39.1 KiB (40000 bytes)
Bandwidth per channel: 0.001⇅ Mbps (0.1 kBps)
Aggregate bandwidth: 0.032↓, 0.032↑ Mbps
Packet rate estimate: 999.2↓, 999.2↑ (1↓, 1↑ TCP MSS/op)
Test duration: 10.0076 s.
  • 100接続 × 10回 = 1000回の I/O が正常に完了
  • 合計 40KB の送受信データを処理
  • 約2.5件/秒 の処理をシングルスレッドで実行
  • パケットロスや遅延なし

これは、非同期I/Oモデルが正しく動作している証拠 であり、Reactorパターンの有効性を実証するものです。

まとめ:実装要件に合致した最適解としてのReactorパターン

Reactorパターンは以下のような実装要件に対して最適な設計パターンと評価できます:

  • 多数の外部との平行処理を需要する
  • CPU/メモリ使用を最小限に抑えたい
  • 非同期I/Oの利用が可能な環境
  • 状態共有やロック制御を使わずに設計したい

マルチスレッド/プロセス型のサーバーに比べ、依存性や複雑性が低く、小さな環境でも高い効率を発揮できる Reactor パターンは、業務ロジックをイベント駆動型に切り替えることでコードを簡潔にし、スケーラビリティを高める企業システムに有効です。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?