0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Kafka × Flink SQL × OpenSearch 全托管デモ(Confluent Cloud編)

Last updated at Posted at 2025-08-12

📝 概要

本記事では、Confluent Cloud 上で Kafka・Flink SQL・OpenSearch を組み合わせ、
「ソース → Kafka → Flink SQL → Kafka → Sink → OpenSearch」 というストリーミング処理パイプラインを最小構成で構築する手順を紹介します。

特徴:

  • フルマネージド(Kafka / Flink / Connector 全てクラウド実行)
  • ノーコードでデータ投入(Datagen Source Connector)
  • リアルタイム集計 & 可視化可能
  • ローカル環境構築不要。ブラウザと OpenSearch 環境があれば OK。

🎯 目的

  • Confluent Cloud の基本操作(Topic / Connector / Flink SQL)
  • Flink SQL を用いた Kafka トピック間のリアルタイム変換
  • OpenSearch へのリアルタイム出力と可視化

🗺️ アーキテクチャ概要

[Datagen Source] 
    → [Kafka Topic: orders_raw] 
    → [Flink SQL Processing] 
    → [Kafka Topic: orders_agg] 
    → [OpenSearch Sink] 
    → [OpenSearch Index]

📌 前提条件

  • Confluent Cloud アカウント

  • OpenSearch(または Elasticsearch Service)

    • エンドポイント & 認証情報が利用可能であること
  • ブラウザ操作可能な環境


1. Kafka トピック作成

Confluent Cloud Console で以下の 2 トピックを作成。

トピック名 用途
orders_raw 入力(生データ)
orders_agg 出力(集計結果)

パーティション数はデフォルト(1〜3)で OK。


2. Datagen Source Connector 設定(入力データ生成)

  1. Connectors タブ → Datagen Source を検索
  2. Topicorders_raw を指定
  3. Quickstartorders(EC 注文データモデル)
  4. 出力形式:JSON
  5. レコード生成レート:5〜20件/秒
  6. 保存 & 起動

これで orders_raw にランダム注文データが継続投入されます。


3. Flink SQL 設定

3.1 ソース定義(Kafka → Flink)

-- orders_raw を JSON パースして VIEW 化
CREATE OR REPLACE VIEW orders_parsed AS
SELECT
  CAST(val AS STRING)                                               AS payload,
  CAST(JSON_VALUE(CAST(val AS STRING), '$.orderid')     AS INT)     AS orderid,
  JSON_VALUE(CAST(val AS STRING), '$.itemid')                       AS itemid,
  CAST(JSON_VALUE(CAST(val AS STRING), '$.orderunits') AS DOUBLE)   AS orderunits,
  TO_TIMESTAMP_LTZ(
    CAST(JSON_VALUE(CAST(val AS STRING), '$.ordertime') AS BIGINT), 3
  )                                                                 AS event_time,
  CAST(JSON_VALUE(CAST(val AS STRING), '$.address.zipcode') AS INT) AS zipcode,
  JSON_VALUE(CAST(val AS STRING), '$.address.city')                 AS city,
  JSON_VALUE(CAST(val AS STRING), '$.address.state')                AS state
FROM `default`.`cluster_0`.`orders_raw`;

3.2 シンク定義(Flink → Kafka)

-- 単一カラム(val) の Kafka Sink テーブル作成
CREATE TABLE IF NOT EXISTS orders_agg (
  val STRING
) WITH (
  'connector' = 'confluent',
  'value.format' = 'raw',
  'value.raw.charset' = 'UTF-8',
  'changelog.mode' = 'append'
);

JSON 文字列に変換して書き込み:

INSERT INTO orders_agg
SELECT
  JSON_OBJECT(
    'orderid'    VALUE CAST(orderid AS STRING),
    'itemid'     VALUE itemid,
    'orderunits' VALUE CAST(orderunits AS STRING),
    'event_time' VALUE CAST(event_time AS STRING),
    'city'       VALUE city,
    'state'      VALUE state,
    'zipcode'    VALUE CAST(zipcode AS STRING)
  ) AS val
FROM orders_parsed;

3.3 動作確認

-- Kafka Sink テーブル内容を確認
SELECT * FROM orders_agg LIMIT 10;

-- val を文字列化して確認
SELECT CAST(val AS STRING) AS payload
FROM `default`.`cluster_0`.`orders_agg`
LIMIT 10;

4. OpenSearch Sink Connector 設定

  1. Connectors タブ → Elasticsearch Service Sink または OpenSearch Sink を選択

  2. 設定:

    • Topics: orders_agg
    • Connection URL: https://<your-opensearch-endpoint>
    • 認証: ユーザー名 / パスワード
    • Key ignore: true
    • Schema ignore: true
  3. 保存 & 起動


5. OpenSearch インデックス作成(任意)

PUT orders_agg_idx
{
  "mappings": {
    "properties": {
      "user_id":      { "type": "keyword" },
      "window_start": { "type": "date"    },
      "window_end":   { "type": "date"    },
      "total_amount": { "type": "double"  },
      "order_cnt":    { "type": "long"    }
    }
  }
}

6. 動作確認

  • Confluent Cloud の Topics → orders_agg → Messages で結果確認
  • OpenSearch Dashboards の Discoverorders_agg_idx を検索し、リアルタイム更新を確認

📚 まとめ

  • Source Connector:外部システムやデータ生成器 → Kafka 取り込み
  • Flink SQL:Kafka トピック間のリアルタイム変換・集計
  • Sink Connector:Kafka → 外部システム(今回は OpenSearch)へ書き出し
  • Confluent Cloud なら UI 操作だけで完結、ローカル構築不要

🔗 関連リンク

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?