はじめに
本記事では、AI開発アシスタント「IBM Bob」を活用して、Apache Kafkaの環境構築からConsumerアプリケーションの実装、そしてセキュリティ問題の自動検出までを実践します。IBM Bobの/reviewコマンドとBob Findingsパネルを使用することで、コードレビューの効率化とセキュリティ品質の向上を実現できます。
検証環境
- IBM Bob
- Docker
- Apache Kafka 4.1.1
- Python 3.13
1. Kafka環境構築
Apache Kafkaの公式クイックスタートガイドに従い、DockerでKafka環境を構築しました。
下図のようにIBM Bobに指示して、構築することができます。
1.1 Kafkaイメージの取得と起動
# Kafkaイメージの取得
docker pull apache/kafka:4.1.1
# Kafkaコンテナの起動
docker run -d -p 9092:9092 --name kafka-container apache/kafka:4.1.1
1.2 トピックの作成と動作確認
# トピック作成
docker exec kafka-container /opt/kafka/bin/kafka-topics.sh \
--create --topic quickstart-events \
--bootstrap-server localhost:9092
1.3 イベントの書き込みと読み取り
# イベントの書き込み
echo -e "This is my first event" | \
docker exec -i kafka-container /opt/kafka/bin/kafka-console-producer.sh \
--topic quickstart-events --bootstrap-server localhost:9092
これでKafka環境の構築が完了しました。
2. IBM BobでKafka Consumerを作成
次に、IBM Bobを使用してKafka Consumerアプリケーションを実装します。今回は、Bob Findingsの検証のため、意図的にセキュリティ上の問題を含むコードを生成しました。
2.1 Consumer実装
import json
from kafka import KafkaConsumer
import logging
# Security Issue 1: Hardcoded credentials
KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
KAFKA_USERNAME = "admin"
KAFKA_PASSWORD = "admin123" # Hardcoded password
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class InsecureKafkaConsumer:
def __init__(self, topics):
# Security Issue 2: No SSL/TLS encryption
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
security_protocol='PLAINTEXT', # Unencrypted connection
sasl_mechanism='PLAIN',
sasl_plain_username=KAFKA_USERNAME,
sasl_plain_password=KAFKA_PASSWORD,
value_deserializer=lambda m: m.decode('utf-8'), # Decode as string first
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='consumer-group'
)
logger.info("Consumer initialized")
def process_message(self, message):
try:
data = message.value
# Display event details
print("\n" + "="*80)
print(f"📨 Event from topic: {message.topic}")
print(f"Partition: {message.partition} | Offset: {message.offset}")
print(f"Key: {message.key}")
print("-"*80)
# Try to parse as JSON, otherwise display as plain text
try:
json_data = json.loads(data)
print(f"Value (JSON): {json.dumps(json_data, indent=2, ensure_ascii=False)}")
# Security Issue 3: SQL Injection vulnerability
if 'user_id' in json_data:
query = f"SELECT * FROM users WHERE id = {json_data['user_id']}" # No parameterization
logger.info(f"Executing query: {query}")
except json.JSONDecodeError:
print(f"Value (Text): {data}")
print("="*80)
logger.info(f"Processed message from topic {message.topic}")
except Exception as e:
logger.error(f"Error processing message: {str(e)}")
def consume_messages(self):
logger.info("Starting to consume messages...")
try:
for message in self.consumer:
logger.debug(f"Received message from topic {message.topic}")
self.process_message(message)
except KeyboardInterrupt:
logger.info("Consumer interrupted")
finally:
self.close()
def close(self):
self.consumer.close()
logger.info("Consumer closed")
def main():
topics = ['quickstart-events']
logger.info(f"Starting consumer for topics: {topics}")
consumer = InsecureKafkaConsumer(topics)
consumer.consume_messages()
if __name__ == "__main__":
main()
実行すると以下のようにeventを受信できます。
python -u kafka_consumer.py
... (略) ...
================================================================================
📨 Event from topic: quickstart-events
Partition: 0 | Offset: 18
Key: None
--------------------------------------------------------------------------------
Value (Text): This is my first event
================================================================================
ですが、このコードには以下のセキュリティ問題が含まれています:
- 認証情報の管理: 認証情報をハードコーディング
-
暗号化なし:
security_protocol='PLAINTEXT'で平文通信 - SQLインジェクション脆弱性: パラメータ化されていないSQL文字列構築
3. Bob Findingsによるセキュリティ問題の自動検出
IBM Bobの/reviewコマンドを実行することで、コード内のセキュリティ問題を自動的に検出できます。
3.1 レビューの実行
IBM Bobで以下のコマンドを実行:
/review
3.2 Bob Findingsパネルでの確認
Bob Findingsパネルに、検出されたセキュリティ問題が表示されます:
認証情報の管理
KAFKA_USERNAME = "admin"
KAFKA_PASSWORD = "admin123" # Hardcoded password
問題点:
- 認証情報がコード内にハードコーディングされている
- 平文でパスワードが保存されており、セキュリティリスクが高い
暗号化されていない通信
security_protocol='PLAINTEXT', # Unencrypted connection
問題点:
- Kafka接続が平文で行われている
- ネットワーク上で傍受される可能性がある
SQLインジェクション脆弱性
query = f"SELECT * FROM users WHERE id = {data['user_id']}"
問題点:
- ユーザー入力を直接SQL文字列に埋め込んでいる
- パラメータ化されていないため、SQLインジェクション攻撃のリスクがある
これらの検出された問題は Direct fix を実行することで即時に修正することができます。
3.3 Bob Findingsの利点
- 自動検出: コードレビュー時に見落としがちなセキュリティ問題を自動的に検出
- 具体的な修正提案: 各問題に対して実装可能な修正案を提示
- 開発効率の向上: セキュリティレビューの時間を大幅に短縮
まとめ
本記事では、IBM Bobを活用したKafka環境の構築から、セキュリティ問題の自動検出までを実践しました。
この記事にて検証したIBM Bobの主な利点:
- 環境構築の効率化: Dockerコマンドの実行からトピック作成まで、対話的にサポート
- コード生成の迅速化: 要件を伝えるだけで、実装可能なコードを生成
-
セキュリティレビューの自動化:
/reviewコマンドで即座にセキュリティ問題を検出 - Bob Findingsによる可視化: 問題の重要度と修正案を分かりやすく提示
IBM Bobは、開発の各フェーズで開発者をサポートし、コードの品質とセキュリティを向上させる強力なツールです。特にBob Findingsは、セキュリティレビューの負担を軽減し、より安全なアプリケーション開発を実現します。
参考資料
本記事のコードは検証目的で作成されており、本番環境での使用は推奨されません。







