Looking ahead to the new JSON SQL functions in Apache Flink® 1.15.0の翻訳です。
2022年3月3日
Apache Flink® 1.15.0の新しいJSON SQL関数に期待する
Apache Flink® 1.15.0では新しいJSON SQL関数が導入され、より強力なデータ処理が可能になります。何が待ち受けているのか、ご一読ください。
Apache Flink® 1.15.0のJSON SQL関数のスニークプレビュー
Apache Flink® SQL API は非常に人気があり、今日ではストリーミングデータパイプラインを構築するための主要なエントリポイントとなっています。Apache Flink® コミュニティは、リリースごとに新しいオプション、機能、コネクタを追加し、ますます貢献しています。
Flink SQLへの関心が高まっている一例として、テーブルSQLのJSONサポートが挙げられます。JSONはデータの世界で最も使用されているフォーマットの1つで、Apache Flinkの基本的なJSON関数は1.14で利用できるようになり、リリースごとに新しい機能が追加されています。このブログポストでは、まだ正式リリースされていないApache Flink 1.15.0-SNAPSHOT
で何が利用可能になるかを見ていこうと思います。そのため、今すぐ触って遊びたい場合はApache Flinkのソースコードをクローンして自分でビルドするか、1.15.0の公式リリースを待つ必要があります。Apache Flink®のビルド方法の詳細は、Building Flink from Source を参照してください。
使用例
このブログポストでは、IoTセンサーの受信データセットを模倣する。これらのセンサーは、それらが配置されているエリア内で測定されたデータのサプライヤーである。一方の側からのメッセージは、ネストされたJSONフィールドの可能性があるJSONフォーマットであり、もう一方の側からは、デバイスに欠陥がある可能性があるため、無効なJSONが存在する可能性がある。
例えば、各タイムスタンプとメトリックについて、対応する最大測定値を持つエリアのグループを見つけ、それを新しいJSONメッセージとして表示するなどです。
後述するように、Apache Flink®はSQLステートメントのみを使用して、ネストされたJSONをパースして構築することができます。
データセットの探索
Apache Flink® 1.15.0を使用する準備ができたら、データセットに集中することができます。例えば、以前のブログ記事で説明したように、Apache Kafka®トピックにストリーミングデータセットを作成し、Apache Flink®を接続することができます。しかし、Apache Flink®のJSON関数の能力をフルに発揮するには、ネストされたJSONデータセットが必要です。
先に述べたように、世界中の様々な場所からセンサー読み取りメッセージを発するIoTデバイスを模倣したい。以下のメッセージを(1行に1つずつ)iot-readings.json
という名前のファイルに格納することができる。
`{"id":1, "name":"温度センサー", "ペイロード":{"data":{"metric": "Temperature", "value": "23", "dimension":"℃" }, "location": "ベルリン", "timamp": "2018-12-10 13:45:00:ベルリン", "タイムスタンプ": "2018-12-10 13:45:00.000"}} {"id": 2
{"id":2, "name":「温度センサー", "ペイロード":ペイロード": {"data":ベルリン", "タイムスタンプ": "2018-12-10 13:55:00.000"}} {"id": 3
{"id": 3, "name":"comment2", "payload":"Out of Order" }.
{"id":4, "name":「光センサー", "ペイロード":ペイロード": {"data":metric": "光", "value": "故障中" } {"id": 4「Ev"}、"場所":「ベルリン"、"タイムスタンプ":"2018-12-10 13:45:00.000"}}
{"id":5, "name":「ノイズセンサー", "ペイロード":ペイロード": {"data":"data": {"metric":"noise", "value": 43, "dimension":""}、"タイムスタンプ":"2018-12-10 13:45:00.000"}}
{"id":6, "name":「温度センサー", "ペイロード":{:{"metric": "Temperature", "value": "23", "dimension":"℃" }, "location": "パリ"paris"}、"timestamp": "2018-12-10 13:45:00.000"}。
{"id": 7, "name":「光センサー", "ペイロード":{"metric":「Ev"}、"場所":"Paris", "timestamp": "2018-12-10 13:45:00.000"}}。
{"id":8, "name":「ノイズセンサー", "ペイロード":{}}
{"id":9, "name":「温度センサー", "ペイロード":{"data":{"metric": "Temperature", "value": "23", "dimension":"℃" }, "location": "パリ", "タイムスタンプ": "0":"Paris", "timestamp": "2018-12-10 13:45:00.000"}}。
{"id":10, "name":「温度センサー", "ペイロード":ペイロード": {"data":"Paris", "timestamp": "2018-12-10 13:55:00.000"}}。
{"id": 11, "name":「温度センサー", "ペイロード":ペイロード": {"data":"London", "timestamp": "2018-12-10 13:45:00.000"}}。
{"id": 12, "name":「温度センサー", "ペイロード":ペイロード": {"data":"Rome", "timestamp": "2018-12-10 13:45:00.000"}}.
{"id": 13, "name":「温度センサー", "ペイロード":ペイロード": {"data":"Rome", "timestamp": "2018-12-10 13:55:00.000"}}`クリップボードにコピーする
上記のメッセージのほとんどはJSON形式であり、payload
属性はネストされたJSONを含んでいる。しかし、現場の壊れたIoTデバイスの中には、{"id":3, "name":"comment2", "payload":"Out of Order"}
のような誤ったメッセージを生成することもある。私たちは信頼性の高いデータパイプラインを構築したいので、複数の欠陥のあるデバイスからこのような誤ったメッセージを受信しても、データフローが停止することはないと考えています。
このようなメッセージをApache Kafkaに提供する方法の1つは、Python fake data producer for Apache Kafkaを拡張することであり、kcatのようなツールを使うこともできる。
Apache Flink® テーブルの定義
Apache Kafkaトピック内のデータを処理できるようにするには、Apache Flinkテーブルを定義する必要があります。Flink SQL クライアントでは、以下の SQL 文でテーブルを定義できます:
テーブル
センサ (
id STRING、
name STRING、
ペイロード STRING
Apache Flink® には JSON データ型がないので、 // ここではペイロードを STRING 型として宣言する。
)WITH (
'connector' = 'kafka'、
...
// その他のプロパティは、Apache Kafkaへの接続設定に基づいて入力する必要があります。
...
);`クリップボードにコピーする
Apache Flink® の JSON 関数を使ってデータを分析する
テーブルが定義されたので、壊れた IoT デバイスから送られてくる形成不良のレコードをフィルタリングし始めることができる。これは、data
属性を含むpayload
フィールドに有効なJSONを含むメッセージのみを保持することで実現できる。新しい JSON_EXISTS
関数は、まさにこの問題を解決してくれる:
JSON_EXISTS` 関数はまさにこの問題を解決します。
FROM センサー
WHERE JSON_EXISTS(payload, '$.data');`Copy to clipboard
上記のSQLでは、特別な文字$$$は、.data'のようにプロパティにアクセスできるJSONパスのルートノードを示しています。
配列を解析する必要がある場合、同様の構文を使用して、位置によって正確な要素にアクセスすることができます (.data'.Ifweneedtoparsearrays、asimilararsynxtaを使用して、位置によって正確な要素にアクセスすることができます(.a[0].b)、または配列内の各要素のプロパティを取得します($$.a[*][.b
)。
上記のSQLは以下の出力を生成します:
id名 ペイロード
1 温度センサー {"data":{"metric": "Temperatur~
2 温度センサー {"data":{"metric": "Temperatur~
4 光センサー {"data":{"metric": "Light", "va~
5 ノイズセンサー{"data":{"metric": "Noise", "va~
9 温度センサー {"data":{"metric": "Temperatur~
10 温度センサー{"data":{"metric": "温度
11 温度センサー {"data":{"metric": "Temperatur~
12 温度センサー {"data": {"metric": "Temperatur~
13 温度センサー {"data":{"metric": "Temperatur~` クリップボードにコピーする
有効なメッセージのみをフィルタリングした後、データの分析を始めることができる。例えば、正しく形成されたメッセージの location
属性に格納されている都市を抽出する必要があるかもしれない。JSON_VALUE` 関数を使って、JSON フィールドからスカラーを取得し、必要な情報を取り出すことができる:
SELECT DISTINCT JSON_VALUE(payload, '$.location') AS `city`
FROM sensors
WHERE JSON_EXISTS(payload, '$.data');`Copy to clipboard
結果は以下のようになるはずだ。これはメッセージに location
属性が含まれていないためである。
都市
ベルリン
<NULL
パリ
ロンドン
ローマ`クリップボードにコピー
JSON_VALUE関数は、デフォルトでは
STRING` としてデータを取り出す。しかし、出力されるデータ型を変更したい場合があります。特に、何らかの計算を行いたい数値が出力される場合です。
例えば、都市別の平均気温を計算したい場合、ペイロードフィールド .data.value
を抽出し、RETURNING
オプションで INTEGER
にキャストすることで、センサーの測定値を得ることができる。最後に .location
フィールドを集約することができる:
SELECT
AVG(JSON_VALUE(payload, '$.data.value' RETURNING INTEGER))AS `avg_temperature`、
JSON_VALUE(payload, '$.location') AS `city`.
FROM センサー
WHERE JSON_VALUE(payload, '$.data.metric') = 'Temperature'
GROUP BY JSON_VALUE(payload, '$.location');`Copy to clipboard
そして期待される出力は
都市
19 ベルリン
17 パリ
23 ロンドン
25 Rome`クリップボードにコピー
これまでのところ、抽出されたフィールドはすべてスカラーであった。しかし、Apache Flink® は JSON_QUERY
という、より複雑なデータポイントを抽出する関数も提供している。
例えば、以下のクエリは有効な JSON を持つ全てのメッセージから完全な data
フィールドを抽出し、残りのフィールドについては NULL
を表示する。
SELECT JSON_QUERY(payload, '$.data') AS `data`
FROM sensors;`クリップボードにコピー
クエリーの出力は
データ
{"metric": "Temperature", "valu~
{"metric": "Temperature", "valu~
<NULL
{"metric":"Light","value":23,~
{"metric":"Noise","value":43,~
<NULL
<NULL>
{"メトリック": "温度", "値~
{メトリック": "温度", "値
<NULL> <NULL>{"メトリック": "温度", "値~
{"メトリック": "温度", "値
{"metric": "Temperature", "valu~` クリップボードにコピーする。
Apache Flink® SQL で JSON 構造体を構築する
前のセクションでは、SQL を使って JSON メッセージを解析し、必要なフィールドを抽出する方法を説明しましたが、これはほんの一部に過ぎません。Apache Flink® SQL では、ネストされた JSON データセットを構築することもできます。まず、max
関数で特定の場所、メトリック、タイムスタンプのピーク計測値を抽出して、上のデータセットのフラット表現を作成してみましょう。
SELECT
JSON_VALUE(payload, '$.location') as loc、
JSON_VALUE(payload, '$.data.metric') as metric、
TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp')) as timestamp_value、
MAX(JSON_VALUE(payload, '$.data.value')) as max_value
FROM センサー
WHERE JSON_EXISTS(payload, '$.data')
AND JSON_EXISTS(payload, '$.location')
GROUP BY
JSON_VALUE(payload, '$.data.metric')、
JSON_VALUE(payload, '$.location')、
TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp'));`Copy to clipboard
出力は、各メトリクスのピーク値、都市、タイムスタンプを含むフラットテーブルである:
loc metric max_value timestamp_value
ベルリン 気温 23 2018-12-10 13:45:00.000
ベルリン気温 16 2018-12-10 13:55:00.000
ベルリン ライト 23 2018-12-10 13:45:00.000
パリ気温 23 2018-12-10 13:45:00.000
パリ 気温 12 2018-12-10 13:55:00.000
ロンドン気温 23 2018-12-10 13:45:00.000
ローマ気温 23 2018-12-10 13:45:00.000
ローマ気温 28 2018-12-10 13:55:00.000`クリップボードにコピー
ここで、IoTの読み取り値を持つすべての都市と読み取り値そのものをリストアップしたJSONフィールドを含む、timestamp_value
とmetric
ごとにユニークなメッセージを作成したいとします。
すべての値を一意の行にまとめるには、キーと値の式を集約して JSON オブジェクト文字列を作成する JSON_OBJECTAG
関数を使用することができる。
Apache Flink® SQL はネストされた集約関数を許さないので、別のラッパーが必要です。
WITH sensors_with_max_metric AS (
SELECT
JSON_VALUE(payload, '$.location') AS loc、
JSON_VALUE(payload, '$.data.metric') AS metric、
TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp'))AS timestamp_value、
MAX(JSON_VALUE(payload, '$.data.value'))AS max_value
FROM センサー
WHERE JSON_EXISTS(payload, '$.data')
AND JSON_EXISTS(payload, '$.location')
GROUP BY
JSON_VALUE(payload, '$.data.metric')、
JSON_VALUE(payload, '$.location')、
TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp'))
)
SELECT
timestamp_value、
メトリック、
JSON_OBJECTAGG(KEY loc VALUE max_value) AS json_object_value
FROM sensors_with_max_metric
GROUP BY timestamp_value, metric;`クリップボードにコピー
JSON_OBJECTAGによって、JSONオブジェクトの
KEYと
VALUE` として使用するフィールドを定義できることに注意する。上記のSQLの出力は次のようになる:
timestamp_value metric json_object_value
2018-12-10 13:45:00.000 光 {"Berlin": "23"}.
2018-12-10 13:45:00.000 気温{"ベルリン": "23", "ロンドン": "23",~
2018-12-10 13:55:00.000 気温{"ベルリン": "16", "パリ": "12","~`クリップボードにコピーする
最後のステップとして、JSON_OBJECT
関数を使用して、上記の3つのカラムをすべて含む、メッセージごとに一意のネストされたJSONドキュメントを作成することができます:
WITH sensors_with_max_metric AS (
SELECT
JSON_VALUE(payload, '$.location') AS loc、
JSON_VALUE(payload, '$.data.metric') AS metric、
TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp'))AS timestamp_value、
MAX(JSON_VALUE(payload, '$.data.value'))AS max_value
FROM センサー
WHERE JSON_EXISTS(payload, '$.data')
AND JSON_EXISTS(payload, '$.location')
GROUP BY
JSON_VALUE(payload, '$.data.metric')、
JSON_VALUE(payload, '$.location')、
TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp'))
),
sensors_with_max_metric_grouped_by_metric_and_timestamp AS (
SELECT
timestamp_value、
metric、
JSON_OBJECTAGG(KEY loc VALUE max_value) AS loc
FROM sensors_with_max_metric
GROUP BY
timestamp_value、
metric)
SELECT
JSON_OBJECT(
KEY 'timestamp' VALUE timestamp_value、
KEY 'metric' VALUE s.metric、
KEY 'values' VALUE s.loc
)
FROM sensors_with_max_metric s;`Copy to clipboard
最終的な出力は以下のようになり、3つのJSONオブジェクトがネストされている。
`{"timestamp":"2018-12-10 13:45:00.000","metric":"Light","values":"{\"Berlin\":\"23\"}"}
{"timestamp":"2018-12-10 13:45:00.000","metric":"Temperature","values":"{\"Berlin\":\"23\",\"London\":\"23\",\"Paris\":\"23\",\"Rome\":\"23\"}"}
{"timestamp":"2018-12-10 13:55:00.000","metric":"Temperature","values":"{\"Berlin\":\"16\",\"Paris\":\"12\",\"Rome\":\"28\"}"}`Copy to clipboard
SQL文だけで、ネストしたJSONデータ構造をパースして構築することができた。データ技術で最も使用されるデータ形式の1つを処理するためのカスタム関数はもう必要ありません!
次のステップ
Apache Flink® の SQL は、コミュニティの多大な努力によって、リリースのたびにますます強力になっています。より多くの機能、コネクター、関数が追加され、SQL API はストリーミング・データ・パイプラインを定義する主要なドライバーとなっています。ここで説明するJSON関数はバージョン1.15.0に搭載される予定ですが、現在利用可能な膨大なSQL関数のセットにはすでに慣れ始めていることでしょう。
その他の参考文献
- FlinkにおけるJSON TABLE SQL関数についての公式ドキュメント。
- Apache Flink SQL Clientを使ってみましょう。
- [Aiven for Apache Flink®]のトライアルを開始する(https://aiven.io/flink)
- Aiven for Apache Flink®の詳細は、Aiven開発者専用ドキュメント をご覧ください。