3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Docker+Kafka環境でのE2Eテスト奮闘記:`confluent-kafka`への移行とOOMエラーとの戦い

Last updated at Posted at 2025-10-24

Docker+Kafka環境でのE2Eテスト奮闘記:confluent-kafkaへの移行とOOMエラーとの戦い

はじめに

本記事は、kafka-python から confluent-kafka へのライブラリ移行プロジェクトにおける、E2Eテストの安定化までの道のりを一部記録したものです。

当初は単純なライブラリの置き換えで完了するはずでした。しかし、現実は甘くなく、ModuleNotFoundError の嵐、そして最大の難関であった コンテナのOOM(Out Of Memory)エラー との長い戦いを繰り広げることになりました...。

この記事が、同様の環境で開発・テストを行う皆さんにとって、問題解決の一助となれば幸いです。

プロジェクト概要

  • 目的: Kafkaを利用した非同期タスク処理システムの開発
  • 技術スタック:
    • Python 3.11
    • Docker, Docker Compose
    • Kafka (Confluent Platform)
    • 当初のKafkaクライアント: kafka-python
  • テストフレームワーク: Pytest

なぜ confluent-kafka への移行が必要だったのか?

開発当初、私は kafka-python を採用していました。しかし、特定の環境下でKafkaブローカーとのSSL接続が不安定になり、SSLHandshakeError が頻発するという問題に悩まされていました。様々な設定調整を試みましたが、根本的な解決には至りませんでした。

調査を進める中で、confluent-kafkalibrdkafka(Cで実装された高性能なKafkaクライアント)のラッパーであり、より安定したパフォーマンスと堅牢な接続管理を提供することがわかりました。このSSL接続問題を根本的に解決するため、私は confluent-kafka への移行を決断しました!

移行プロセスと困難

Step 1: ライブラリの置き換えとAPIの修正

まずは、requirements.txt を更新し、kafka-pythonconfluent-kafka に置き換えました。

requirements.txt
- kafka-python==2.0.2
- opentelemetry-instrumentation-kafka-python==0.45b0
+ confluent-kafka==2.4.0
+ opentelemetry-instrumentation-confluent-kafka==0.2.0b0

当然ながら、これだけでは動きませんでした...。confluent-kafkakafka-python とAPIの互換性がないため、コードの至る所で AttributeErrorTypeError が発生しました。

特に、ConsumerProducer の初期化、メッセージの送受信、そしてエラーハンドリングの部分で大幅な修正が必要でした。とてもたいへんでした。

修正例(communication/pubsub_client.py):

# 修正前 (kafka-python)
# self.consumer = KafkaConsumer(
#     self.topic,
#     bootstrap_servers=self.bootstrap_servers,
#     group_id=self.group_id,
#     ...
# )

# 修正後 (confluent-kafka)
consumer_conf = {
    "bootstrap.servers": self.bootstrap_servers,
    "group.id": self.group_id,
    "auto.offset.reset": "earliest",
    ...
}
self.consumer = Consumer(consumer_conf)
self.consumer.subscribe([self.topic])

confluent-kafka では、設定を辞書として Consumer のコンストラクタに渡す必要があります。この修正をプロジェクト全体に適用し、ようやく基本的な import エラーや TypeError を解消することができました。

Step 2: E2Eテストの実行と exit code 137 の絶望

全ての単体テストがパスすることを確認し、満を持してE2Eテストを実行しました。使用したコマンドは以下の通りです。

docker-compose up --build -d
docker-compose exec worker pytest -m e2e

すると、テストの実行途中で worker コンテナが突然停止し、exit code 137 を返して処理が異常終了してしまいました。

tests/test_e2e.py
Error: (none)
Exit Code: 137
Signal: (none)

exit code 137 は、Linuxシステムにおいて OOM (Out Of Memory) Killer によってプロセスが[[[強制終了]]]されたことを示す。ただし、必ずしもOOMだけが原因ではなく、手動で kill -9 した場合も 137 になります。つまり、worker コンテナが割り当てられたメモリをすべて使い果たしてしまったのです。

Step 3: OOMエラーとの戦い

confluent-kafkakafka-python よりも多くのメモリを消費することは予想していましたが、まさかコンテナが落ちるほどとは思いませんでした。予想よりもメモリの消費がプロジェクトを通しても多いのでメモリの増設も今検討してまふ。

仮説1: メモリリーク

まず疑ったのは、コード内にメモリリークが存在する可能性です。しかし、コードレビューやプロファイリングツールを使っても、特定の箇所でメモリが異常に増加している様子は見られませんでした。

仮説2: 単純なメモリ不足

次に、confluent-kafka のメモリ消費量と、E2Eテスト実行時の負荷が相まって、単純にコンテナに割り当てられたメモリが不足しているのではないか、という仮説を立てました。

この仮説を検証するため、docker-compose.yml を修正し、worker サービスのメモリ上限を段階的に引き上げてみることにしました。

試行1: 512M -> 失敗
試行2: 1G -> 失敗

そして、最終的に 2G まで引き上げたところ、ついにOOMエラーは発生しなくなりました。

docker-compose.yml
services:
  # ... (他のサービス)

  worker:
    build:
      context: .
      args:
        - INSTALL_PYTEST=true
    container_name: worker
    deploy:
      resources:
        limits:
          memory: 2G # メモリ上限を2Gに設定
    command: ["python", "worker.py"]
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    depends_on:
      kafka:
        condition: service_healthy

この結果から、今回のOOMエラーは、メモリリークのような深刻なバグではなく、confluent-kafka の採用とE2Eテストの負荷による、純粋なメモリ不足が原因であったと結論付けました。

Step 4: 最後の仕上げ

OOMエラーを乗り越え、E2Eテストは最後まで実行されるようになりました。しかし、まだいくつかの問題が残っていました。

  1. DLQテストの失敗:
    DLQ(Dead Letter Queue)のテストで、confluent-kafkaConsumer の使い方を誤っていたために TypeError が発生していました。これは、設定を辞書で渡すように修正し解決しました。

    tests/test_e2e.py
    # 修正前 (kafka-python)
    # self.consumer = KafkaConsumer(
    #     self.topic,
    #     bootstrap_servers=self.bootstrap_servers,
    #     group_id=self.group_id,
    #     ...
    # )
    
    # 修正後 (confluent-kafka)
    consumer_conf = {
        "bootstrap.servers": self.bootstrap_servers,
        "group.id": self.group_id,
        "auto.offset.reset": "earliest",
    }
    self.consumer = Consumer(consumer_conf)
    self.consumer.subscribe([self.topic])
    
    
  2. デバッグログの削除:
    問題解決のために communication/pubsub_client.py に埋め込んでいた "debug": "all" という設定が残っていました。これは不要なログを出力し、パフォーマンスに影響を与える可能性があるため、忘れずに、しっかり、削除しました。

これらの修正を経て、ついに全てのE2Eテストがグリーンになりました。

$ docker-compose exec worker pytest -m e2e
============================= test session starts ==============================
...
================= 2 passed, 45 deselected, 3 warnings in 43.98s =================

ヤッター(^^)/

まとめと教訓

kafka-python から confluent-kafka への移行は、予想以上に困難な道のりでした。しかし、この経験を通して、多くの貴重な教訓を得ることができました。

  1. ライブラリの特性を理解する: ライブラリを移行する際は、APIの互換性だけでなく、メモリ使用量やパフォーマンス特性といった非機能要件も十分に調査・考慮する必要があります。

  2. エラーコードは重要なヒント: exit code 137 がOOMエラーを示すことを知っていたおかげで、問題解決の方向性を早期に定めることができました。馴染みのないエラーに遭遇した際は、その意味を正確に調べることが、解決への近道となります。

  3. インフラ層(Docker)の設定も疑う: アプリケーションコードだけに目を向けるのではなく、コンテナのメモリ割り当てのようなインフラ層の設定も問題の原因となり得ます。特に、リソース消費の大きいライブラリを使用する場合は注意が必要です。

  4. E2Eテストの価値を再認識: 今回のOOMエラーは、複数のコンポーネントが連携して動作するE2Eテストの環境でなければ顕在化しなかったでしょう。システムの安定性を担保する上で、E2Eテストの重要性を改めて認識させられました。

この長い戦いの記録が、皆さんのプロジェクトにおける Kafka との格闘において、少しでもお役に立てれば幸いです。わたしもこれを通してかなり学ばせてもらいました。記事の内容で間違っているかもしれないことがあればご連絡いただけると幸いです。

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?