Kafkaのパーティションキー徹底解説
この記事のねらい
- Apache Kafkaを使ったストリーミング開発に欠かせない"パーティションキー"の概念を、エンジニア目線で体系的に整理する。
1. パーティションキーとは
- 定義: Kafkaメッセージに付与するキー値。Kafkaはこのキーをハッシュし、そのメッセージをどのパーティションへ格納するか決定する。
- 例え: 荷物(メッセージ)に貼る"配送ラベル"。同じラベルは同じトラック(パーティション)に載せられるイメージ。
- データ型: バイト列(bytes)。アプリ側では文字列や整数をエンコードして渡すのが一般的。
2. パーティションキーの役割
-
順序保証 (Ordering)
- 同一キーのメッセージは常に同一パーティションへ→パーティション内の順序保証をそのまま享受できる。
-
並列処理と一貫性の両立
- キー単位で一意に順序を保ちながら、パーティション単位で並列コンシュームが可能。
-
ホットパーティション回避への設計指針
- キー分布が偏ると一部パーティションが過負荷になるため、"値のカーディナリティ"に注意し設計する。
3. Kafkaとは
-
分散ログ & ストリーミングプラットフォーム。高スループット・低レイテンシでリアルタイムデータを扱う。
-
コア概念: トピック / パーティション / レプリケーション / オフセット。
-
開発者に嬉しいポイント
- 言語クライアントが豊富 (Java, Python, Go, Rust ...)
- 公式 + OSSツール群 (Kafka Connect, ksqlDB, Schema Registry など)
コア概念の解説
-
1トピック = 複数パーティション
- トピックは 0 番から始まる連番付きログファイル集合で構成される(例
my_topic-0
,my_topic-1
, ...)。
- トピックは 0 番から始まる連番付きログファイル集合で構成される(例
-
順序保証単位
- パーティション内ではオフセットが単調増加し、書き込み順序どおりにのみ読み出せる。パーティションをまたいだ順序は保証されない。
-
スケールアウトと並列処理
- パーティション数を増やすほど Producer 書き込み/Consumer Group 読み取りを並列化でき、スループットが線形に伸びやすい。
-
レプリケーション
- 各パーティションはクラスタ内で leader + follower に複製され、障害時もフェイルオーバ可能。
-
デフォルト値と指定方法
- ブローカー設定
num.partitions
の既定値は 1。 - トピック作成時に
--partitions
(CLI)や Admin API で明示でき、増やすことは後から可能だが減らせない。 - 例:
kafka-topics.sh --create --topic my_topic --partitions 8 --replication-factor 3 --bootstrap-server localhost:9092
- ブローカー設定
-
設計時の注意
- キー分布が偏るとホットパーティションが発生し性能が頭打ちになる。
- パーティション数を増やしすぎるとメタデータ管理のオーバーヘッドも増えるためバランス設計が必要。
4. Kafkaの代表的ユースケース
- IoT センサーストリーム: デバイスIDをキーにし、センサーデータをリアルタイム集約。
- ECサイト注文イベント: ユーザーIDキー → ユーザー操作の順序保証。
- 銀行トランザクション監査: 口座IDキー → 不正検知や残高整合性チェック。
- チャット / SNSタイムライン: ルームIDやユーザーIDキー → 投稿順序維持と水平スケールを両立。
5. サンプルコード1: confluent-kafka
C/C++製libkafkaをPythonバインディングした高性能クライアント。商用でも定番。
5‑1 プロデューサー (IoT例)
from confluent_kafka import Producer
import json, time, random
def make_data(device):
return {
"device_id": device,
"temperature": round(random.uniform(20, 30), 2),
"timestamp": time.strftime('%Y-%m-%dT%H:%M:%S')
}
p = Producer({'bootstrap.servers': 'localhost:9092'})
TOPIC = 'iot_sensor_data'
DEVICES = ['sensor_001', 'sensor_002', 'sensor_003']
for _ in range(10):
d = random.choice(DEVICES)
record = json.dumps(make_data(d))
p.produce(TOPIC, key=d.encode(), value=record)
p.flush(0)
5‑2 コンシューマー
from confluent_kafka import Consumer, KafkaError
import json
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'iot_group',
'auto.offset.reset': 'earliest'
})
c.subscribe(['iot_sensor_data'])
while True:
msg = c.poll(1.0)
if msg is None or msg.error():
continue
data = json.loads(msg.value())
print(f"{msg.key().decode()}: {data}")
6. サンプルコード2: kafka-python
100% Python実装。シンプル用途やテストで便利。ただしスループットは
confluent-kafka
より劣る。
6‑1 プロデューサー
from kafka import KafkaProducer
import json, random, time
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode(),
key_serializer=lambda k: k.encode()
)
for i in range(10):
dev = f'sensor_{i%3:03d}'
record = {
'device_id': dev,
'humidity': random.randint(30, 70),
'timestamp': time.strftime('%Y-%m-%dT%H:%M:%S')
}
producer.send('iot_sensor_data', key=dev, value=record)
producer.flush()
6‑2 コンシューマー
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'iot_sensor_data',
bootstrap_servers='localhost:9092',
group_id='iot_group_py',
value_deserializer=lambda v: json.loads(v.decode()),
key_deserializer=lambda k: k.decode() if k else None,
auto_offset_reset='earliest'
)
for msg in consumer:
print(f"{msg.key}: {msg.value}")
7. パーティションキー設計Tipsまとめ
-
業務一意性 × 高カーディナリティの属性を選ぶと負荷が均等化しやすい (例:
device_id
,user_id
)。 - スループット>順序ならキーを空にしてラウンドロビン分散する手もある。
- 後方互換性: キーフォーマットを将来変える場合はバージョン埋め込みを考慮。
終わりに
パーティションキーは"とりあえず user_id"ではなく、ユースケースの順序保証要件と負荷特性を天秤にかけて設計するのがポイント。この記事が、その判断軸を整理する一助になれば幸いです。