1. 背景・前提条件
Confluent Cloud 上で Flink SQL を使い、Kafka に流し込まれた 純粋な JSON メッセージを読み込もうとすると、次のように Unsupported format: json
エラーが出ることがあります:
Error
Unsupported format: json
これは Flink SQL 側で value.format='json'
を指定しても、Confluent Cloud の Flink ワークスペース上では このフォーマットがサポートされていないためです。
2. なぜ value.format='json'
が使えないのか?
Confluent Cloud の Flink では、手動の CREATE TABLE
に指定可能なフォーマットは以下に限られています:
avro-registry
json-registry
proto-registry
それ以外、つまりSchema Registry を使わずに生の JSON をそのまま読むための json
フォーマットは使えません。なお、Schema Registry 経由で JSON Schema を利用する場合は json-registry
が使えます。
(Confluent Documentation)
このため、Schema Registry を用いない Kafka トピックを参照する際、Flink SQL はその topic を自動的に**“推断テーブル(inferred table)”**として扱い、以下のようになります:
CREATE TABLE `...` (
`key` VARBINARY(...),
`val` VARBINARY(...)
)
WITH (
'connector' = 'confluent',
'value.format' = 'raw',
...
)
値(JSON)がそのままバイナリ形式(VARBINARY)で val
カラムに格納され、JSON パースされることはありません。
(Confluent Documentation)
3. 解決策:value.format='raw'
+ JSON_VALUE()
を使う
この状況を解決するための定石が以下です:
-
value.format='raw'
にして Kafka のメッセージを文字列としてそのまま取り込む -
JSON_VALUE()
関数で、必要なフィールドを JSON から抽出する
JSON_VALUE()
は Flink SQL によって公式サポートされている JSON 関数で、特定のパスからスカラー値を取り出せます。
(Stack Overflow, Confluent Documentation)
Stack Overflow でも次のように紹介されています:
“If use the
format=raw
, you can use theJSON_VALUE
function to extract the field of interest from payload.”
(Stack Overflow)
4. サンプルコード
以下は Confluent Cloud 上の SQL ワークスペースで動く、推奨ワークフローです:
-- ① 推断テーブルの確認(自動生成)
SHOW CREATE TABLE `default`.`cluster_0`.`orders_raw`;
-- ② JSON を解析する VIEW を作成
CREATE OR REPLACE VIEW orders_parsed AS
SELECT
CAST(val AS STRING) AS payload,
CAST(JSON_VALUE(CAST(val AS STRING), '$.orderid') AS INT) AS orderid,
JSON_VALUE(CAST(val AS STRING), '$.itemid') AS itemid,
CAST(JSON_VALUE(CAST(val AS STRING), '$.orderunits') AS DOUBLE) AS orderunits
FROM `default`.`cluster_0`.`orders_raw`;
-- ③ 内容を確認するクエリ
SELECT orderid, itemid, orderunits
FROM orders_parsed
LIMIT 10;
5. まとめ
項目 | 内容 |
---|---|
エラー内容 |
Unsupported format: json — value.format='json' は使えない |
原因 | Confluent Cloud の Flink は json フォーマット未対応手動生成可能な値は json-registry のみ |
対処 |
value.format='raw' + JSON_VALUE() を使って JSON を解析 |
ドキュメント引用 | Confluent Cloud Flink SQL Examples(推断テーブル)(Confluent Documentation, Stack Overflow, Apache Nightlies) JSON 関数定義(Confluent Documentation) StackOverflow 実例(Stack Overflow) |