📝 概要
本記事では、Confluent Cloud 上で Kafka・Flink SQL・OpenSearch を組み合わせ、
「ソース → Kafka → Flink SQL → Kafka → Sink → OpenSearch」 というストリーミング処理パイプラインを最小構成で構築する手順を紹介します。
特徴:
- フルマネージド(Kafka / Flink / Connector 全てクラウド実行)
- ノーコードでデータ投入(Datagen Source Connector)
- リアルタイム集計 & 可視化可能
- ローカル環境構築不要。ブラウザと OpenSearch 環境があれば OK。
🎯 目的
- Confluent Cloud の基本操作(Topic / Connector / Flink SQL)
- Flink SQL を用いた Kafka トピック間のリアルタイム変換
- OpenSearch へのリアルタイム出力と可視化
🗺️ アーキテクチャ概要
[Datagen Source]
→ [Kafka Topic: orders_raw]
→ [Flink SQL Processing]
→ [Kafka Topic: orders_agg]
→ [OpenSearch Sink]
→ [OpenSearch Index]
📌 前提条件
-
Confluent Cloud アカウント
-
OpenSearch(または Elasticsearch Service)
- エンドポイント & 認証情報が利用可能であること
-
ブラウザ操作可能な環境
1. Kafka トピック作成
Confluent Cloud Console で以下の 2 トピックを作成。
トピック名 | 用途 |
---|---|
orders_raw |
入力(生データ) |
orders_agg |
出力(集計結果) |
パーティション数はデフォルト(1〜3)で OK。
2. Datagen Source Connector 設定(入力データ生成)
-
Connectors タブ →
Datagen Source
を検索 -
Topic に
orders_raw
を指定 -
Quickstart:
orders
(EC 注文データモデル) - 出力形式:
JSON
- レコード生成レート:5〜20件/秒
- 保存 & 起動
これで orders_raw
にランダム注文データが継続投入されます。
3. Flink SQL 設定
3.1 ソース定義(Kafka → Flink)
-- orders_raw を JSON パースして VIEW 化
CREATE OR REPLACE VIEW orders_parsed AS
SELECT
CAST(val AS STRING) AS payload,
CAST(JSON_VALUE(CAST(val AS STRING), '$.orderid') AS INT) AS orderid,
JSON_VALUE(CAST(val AS STRING), '$.itemid') AS itemid,
CAST(JSON_VALUE(CAST(val AS STRING), '$.orderunits') AS DOUBLE) AS orderunits,
TO_TIMESTAMP_LTZ(
CAST(JSON_VALUE(CAST(val AS STRING), '$.ordertime') AS BIGINT), 3
) AS event_time,
CAST(JSON_VALUE(CAST(val AS STRING), '$.address.zipcode') AS INT) AS zipcode,
JSON_VALUE(CAST(val AS STRING), '$.address.city') AS city,
JSON_VALUE(CAST(val AS STRING), '$.address.state') AS state
FROM `default`.`cluster_0`.`orders_raw`;
3.2 シンク定義(Flink → Kafka)
-- 単一カラム(val) の Kafka Sink テーブル作成
CREATE TABLE IF NOT EXISTS orders_agg (
val STRING
) WITH (
'connector' = 'confluent',
'value.format' = 'raw',
'value.raw.charset' = 'UTF-8',
'changelog.mode' = 'append'
);
JSON 文字列に変換して書き込み:
INSERT INTO orders_agg
SELECT
JSON_OBJECT(
'orderid' VALUE CAST(orderid AS STRING),
'itemid' VALUE itemid,
'orderunits' VALUE CAST(orderunits AS STRING),
'event_time' VALUE CAST(event_time AS STRING),
'city' VALUE city,
'state' VALUE state,
'zipcode' VALUE CAST(zipcode AS STRING)
) AS val
FROM orders_parsed;
3.3 動作確認
-- Kafka Sink テーブル内容を確認
SELECT * FROM orders_agg LIMIT 10;
-- val を文字列化して確認
SELECT CAST(val AS STRING) AS payload
FROM `default`.`cluster_0`.`orders_agg`
LIMIT 10;
4. OpenSearch Sink Connector 設定
-
Connectors タブ →
Elasticsearch Service Sink
またはOpenSearch Sink
を選択 -
設定:
-
Topics:
orders_agg
-
Connection URL:
https://<your-opensearch-endpoint>
- 認証: ユーザー名 / パスワード
-
Key ignore:
true
-
Schema ignore:
true
-
Topics:
-
保存 & 起動
5. OpenSearch インデックス作成(任意)
PUT orders_agg_idx
{
"mappings": {
"properties": {
"user_id": { "type": "keyword" },
"window_start": { "type": "date" },
"window_end": { "type": "date" },
"total_amount": { "type": "double" },
"order_cnt": { "type": "long" }
}
}
}
6. 動作確認
- Confluent Cloud の Topics → orders_agg → Messages で結果確認
- OpenSearch Dashboards の Discover で
orders_agg_idx
を検索し、リアルタイム更新を確認
📚 まとめ
- Source Connector:外部システムやデータ生成器 → Kafka 取り込み
- Flink SQL:Kafka トピック間のリアルタイム変換・集計
- Sink Connector:Kafka → 外部システム(今回は OpenSearch)へ書き出し
- Confluent Cloud なら UI 操作だけで完結、ローカル構築不要