0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Confluent Cloud の Flink SQL で「Unsupported format: json」エラー時に使う `raw + JSON_VALUE()` の理由と対処法

Last updated at Posted at 2025-08-13

1. 背景・前提条件

Confluent Cloud 上で Flink SQL を使い、Kafka に流し込まれた 純粋な JSON メッセージを読み込もうとすると、次のように Unsupported format: json エラーが出ることがあります:

Error
Unsupported format: json

これは Flink SQL 側で value.format='json' を指定しても、Confluent Cloud の Flink ワークスペース上では このフォーマットがサポートされていないためです。


2. なぜ value.format='json' が使えないのか?

Confluent Cloud の Flink では、手動の CREATE TABLE に指定可能なフォーマットは以下に限られています:

  • avro-registry
  • json-registry
  • proto-registry

それ以外、つまりSchema Registry を使わずに生の JSON をそのまま読むための json フォーマットは使えません。なお、Schema Registry 経由で JSON Schema を利用する場合は json-registry が使えます。
(Confluent Documentation)

このため、Schema Registry を用いない Kafka トピックを参照する際、Flink SQL はその topic を自動的に**“推断テーブル(inferred table)”**として扱い、以下のようになります:

CREATE TABLE `...` (
  `key` VARBINARY(...),
  `val` VARBINARY(...)
)
WITH (
  'connector' = 'confluent',
  'value.format' = 'raw',
  ...
)

値(JSON)がそのままバイナリ形式(VARBINARY)で val カラムに格納され、JSON パースされることはありません。
(Confluent Documentation)


3. 解決策:value.format='raw' + JSON_VALUE() を使う

この状況を解決するための定石が以下です:

  1. value.format='raw' にして Kafka のメッセージを文字列としてそのまま取り込む
  2. JSON_VALUE() 関数で、必要なフィールドを JSON から抽出する

JSON_VALUE() は Flink SQL によって公式サポートされている JSON 関数で、特定のパスからスカラー値を取り出せます。
(Stack Overflow, Confluent Documentation)

Stack Overflow でも次のように紹介されています:

“If use the format=raw, you can use the JSON_VALUE function to extract the field of interest from payload.”
(Stack Overflow)


4. サンプルコード

以下は Confluent Cloud 上の SQL ワークスペースで動く、推奨ワークフローです:

-- ① 推断テーブルの確認(自動生成)
SHOW CREATE TABLE `default`.`cluster_0`.`orders_raw`;

-- ② JSON を解析する VIEW を作成
CREATE OR REPLACE VIEW orders_parsed AS
SELECT
  CAST(val AS STRING) AS payload,
  CAST(JSON_VALUE(CAST(val AS STRING), '$.orderid')      AS INT)    AS orderid,
  JSON_VALUE(CAST(val AS STRING), '$.itemid')                         AS itemid,
  CAST(JSON_VALUE(CAST(val AS STRING), '$.orderunits') AS DOUBLE)    AS orderunits
FROM `default`.`cluster_0`.`orders_raw`;

-- ③ 内容を確認するクエリ
SELECT orderid, itemid, orderunits
FROM orders_parsed
LIMIT 10;

5. まとめ

項目 内容
エラー内容 Unsupported format: jsonvalue.format='json' は使えない
原因 Confluent Cloud の Flink は json フォーマット未対応
手動生成可能な値は json-registry のみ
対処 value.format='raw' + JSON_VALUE() を使って JSON を解析
ドキュメント引用 Confluent Cloud Flink SQL Examples(推断テーブル)(Confluent Documentation, Stack Overflow, Apache Nightlies)
JSON 関数定義(Confluent Documentation)
StackOverflow 実例(Stack Overflow)
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?