7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

IBM BobでKafka環境を構築 & Bob Findingsでセキュリティレビュー

7
Last updated at Posted at 2026-02-17

はじめに

本記事では、AI開発アシスタント「IBM Bob」を活用して、Apache Kafkaの環境構築からConsumerアプリケーションの実装、そしてセキュリティ問題の自動検出までを実践します。IBM Bobの/reviewコマンドとBob Findingsパネルを使用することで、コードレビューの効率化とセキュリティ品質の向上を実現できます。

検証環境

  • IBM Bob
  • Docker
  • Apache Kafka 4.1.1
  • Python 3.13

1. Kafka環境構築

Apache Kafkaの公式クイックスタートガイドに従い、DockerでKafka環境を構築しました。
下図のようにIBM Bobに指示して、構築することができます。

Screenshot 2026-02-17 at 22.08.39.png

1.1 Kafkaイメージの取得と起動

# Kafkaイメージの取得
docker pull apache/kafka:4.1.1

# Kafkaコンテナの起動
docker run -d -p 9092:9092 --name kafka-container apache/kafka:4.1.1

1.2 トピックの作成と動作確認

# トピック作成
docker exec kafka-container /opt/kafka/bin/kafka-topics.sh \
  --create --topic quickstart-events \
  --bootstrap-server localhost:9092

1.3 イベントの書き込みと読み取り

# イベントの書き込み
echo -e "This is my first event" | \
  docker exec -i kafka-container /opt/kafka/bin/kafka-console-producer.sh \
  --topic quickstart-events --bootstrap-server localhost:9092

これでKafka環境の構築が完了しました。

2. IBM BobでKafka Consumerを作成

次に、IBM Bobを使用してKafka Consumerアプリケーションを実装します。今回は、Bob Findingsの検証のため、意図的にセキュリティ上の問題を含むコードを生成しました。

Screenshot 2026-02-17 at 22.12.32.png

2.1 Consumer実装

kafka_consumer.py

import json
from kafka import KafkaConsumer
import logging

# Security Issue 1: Hardcoded credentials
KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
KAFKA_USERNAME = "admin"
KAFKA_PASSWORD = "admin123"  # Hardcoded password

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class InsecureKafkaConsumer:
    def __init__(self, topics):
        # Security Issue 2: No SSL/TLS encryption
        self.consumer = KafkaConsumer(
            *topics,
            bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
            security_protocol='PLAINTEXT',  # Unencrypted connection
            sasl_mechanism='PLAIN',
            sasl_plain_username=KAFKA_USERNAME,
            sasl_plain_password=KAFKA_PASSWORD,
            value_deserializer=lambda m: m.decode('utf-8'),  # Decode as string first
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            group_id='consumer-group'
        )
        logger.info("Consumer initialized")
    
    def process_message(self, message):
        try:
            data = message.value
            
            # Display event details
            print("\n" + "="*80)
            print(f"📨 Event from topic: {message.topic}")
            print(f"Partition: {message.partition} | Offset: {message.offset}")
            print(f"Key: {message.key}")
            print("-"*80)
            
            # Try to parse as JSON, otherwise display as plain text
            try:
                json_data = json.loads(data)
                print(f"Value (JSON): {json.dumps(json_data, indent=2, ensure_ascii=False)}")
                
                # Security Issue 3: SQL Injection vulnerability
                if 'user_id' in json_data:
                    query = f"SELECT * FROM users WHERE id = {json_data['user_id']}"  # No parameterization
                    logger.info(f"Executing query: {query}")
            except json.JSONDecodeError:
                print(f"Value (Text): {data}")
            
            print("="*80)
            
            logger.info(f"Processed message from topic {message.topic}")
            
        except Exception as e:
            logger.error(f"Error processing message: {str(e)}")
    
    def consume_messages(self):
        logger.info("Starting to consume messages...")
        
        try:
            for message in self.consumer:
                logger.debug(f"Received message from topic {message.topic}")
                self.process_message(message)
                
        except KeyboardInterrupt:
            logger.info("Consumer interrupted")
        finally:
            self.close()
    
    def close(self):
        self.consumer.close()
        logger.info("Consumer closed")

def main():
    topics = ['quickstart-events']
    
    logger.info(f"Starting consumer for topics: {topics}")
    consumer = InsecureKafkaConsumer(topics)
    consumer.consume_messages()

if __name__ == "__main__":
    main()

実行すると以下のようにeventを受信できます。

python -u kafka_consumer.py
... (略) ...
================================================================================
📨 Event from topic: quickstart-events
Partition: 0 | Offset: 18
Key: None
--------------------------------------------------------------------------------
Value (Text): This is my first event
================================================================================

ですが、このコードには以下のセキュリティ問題が含まれています:

  1. 認証情報の管理: 認証情報をハードコーディング
  2. 暗号化なし: security_protocol='PLAINTEXT'で平文通信
  3. SQLインジェクション脆弱性: パラメータ化されていないSQL文字列構築

3. Bob Findingsによるセキュリティ問題の自動検出

IBM Bobの/reviewコマンドを実行することで、コード内のセキュリティ問題を自動的に検出できます。

Screenshot 2026-02-17 at 22.13.34.png

3.1 レビューの実行

IBM Bobで以下のコマンドを実行:

/review

3.2 Bob Findingsパネルでの確認

Bob Findingsパネルに、検出されたセキュリティ問題が表示されます:

Screenshot 2026-02-17 at 21.27.02.png

認証情報の管理

KAFKA_USERNAME = "admin"
KAFKA_PASSWORD = "admin123"  # Hardcoded password

問題点:

  • 認証情報がコード内にハードコーディングされている
  • 平文でパスワードが保存されており、セキュリティリスクが高い

Screenshot 2026-02-17 at 22.31.43.png

暗号化されていない通信

security_protocol='PLAINTEXT',  # Unencrypted connection

問題点:

  • Kafka接続が平文で行われている
  • ネットワーク上で傍受される可能性がある

Screenshot 2026-02-17 at 22.31.57.png

SQLインジェクション脆弱性

query = f"SELECT * FROM users WHERE id = {data['user_id']}"

問題点:

  • ユーザー入力を直接SQL文字列に埋め込んでいる
  • パラメータ化されていないため、SQLインジェクション攻撃のリスクがある

Screenshot 2026-02-17 at 22.32.10.png


これらの検出された問題は Direct fix を実行することで即時に修正することができます。

Screenshot 2026-02-17 at 21.37.37.png

3.3 Bob Findingsの利点

  1. 自動検出: コードレビュー時に見落としがちなセキュリティ問題を自動的に検出
  2. 具体的な修正提案: 各問題に対して実装可能な修正案を提示
  3. 開発効率の向上: セキュリティレビューの時間を大幅に短縮

まとめ

本記事では、IBM Bobを活用したKafka環境の構築から、セキュリティ問題の自動検出までを実践しました。

この記事にて検証したIBM Bobの主な利点:

  • 環境構築の効率化: Dockerコマンドの実行からトピック作成まで、対話的にサポート
  • コード生成の迅速化: 要件を伝えるだけで、実装可能なコードを生成
  • セキュリティレビューの自動化: /reviewコマンドで即座にセキュリティ問題を検出
  • Bob Findingsによる可視化: 問題の重要度と修正案を分かりやすく提示

IBM Bobは、開発の各フェーズで開発者をサポートし、コードの品質とセキュリティを向上させる強力なツールです。特にBob Findingsは、セキュリティレビューの負担を軽減し、より安全なアプリケーション開発を実現します。

参考資料


本記事のコードは検証目的で作成されており、本番環境での使用は推奨されません。

7
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?