1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Apache Flink® 1.15.0の新しいJSON SQL関数を前にして

Posted at

Looking ahead to the new JSON SQL functions in Apache Flink® 1.15.0の翻訳です。

2022年3月3日

Apache Flink® 1.15.0の新しいJSON SQL関数に期待する

Apache Flink® 1.15.0では新しいJSON SQL関数が導入され、より強力なデータ処理が可能になります。何が待ち受けているのか、ご一読ください。

Apache Flink® 1.15.0のJSON SQL関数のスニークプレビュー

Apache Flink® SQL API は非常に人気があり、今日ではストリーミングデータパイプラインを構築するための主要なエントリポイントとなっています。Apache Flink® コミュニティは、リリースごとに新しいオプション、機能、コネクタを追加し、ますます貢献しています。

Flink SQLへの関心が高まっている一例として、テーブルSQLのJSONサポートが挙げられます。JSONはデータの世界で最も使用されているフォーマットの1つで、Apache Flinkの基本的なJSON関数は1.14で利用できるようになり、リリースごとに新しい機能が追加されています。このブログポストでは、まだ正式リリースされていないApache Flink 1.15.0-SNAPSHOTで何が利用可能になるかを見ていこうと思います。そのため、今すぐ触って遊びたい場合はApache Flinkのソースコードをクローンして自分でビルドするか、1.15.0の公式リリースを待つ必要があります。Apache Flink®のビルド方法の詳細は、Building Flink from Source を参照してください。

使用例

このブログポストでは、IoTセンサーの受信データセットを模倣する。これらのセンサーは、それらが配置されているエリア内で測定されたデータのサプライヤーである。一方の側からのメッセージは、ネストされたJSONフィールドの可能性があるJSONフォーマットであり、もう一方の側からは、デバイスに欠陥がある可能性があるため、無効なJSONが存在する可能性がある。

例えば、各タイムスタンプとメトリックについて、対応する最大測定値を持つエリアのグループを見つけ、それを新しいJSONメッセージとして表示するなどです。

後述するように、Apache Flink®はSQLステートメントのみを使用して、ネストされたJSONをパースして構築することができます。

データセットの探索

Apache Flink® 1.15.0を使用する準備ができたら、データセットに集中することができます。例えば、以前のブログ記事で説明したように、Apache Kafka®トピックにストリーミングデータセットを作成し、Apache Flink®を接続することができます。しかし、Apache Flink®のJSON関数の能力をフルに発揮するには、ネストされたJSONデータセットが必要です。

先に述べたように、世界中の様々な場所からセンサー読み取りメッセージを発するIoTデバイスを模倣したい。以下のメッセージを(1行に1つずつ)iot-readings.jsonという名前のファイルに格納することができる。

`{"id":1, "name":"温度センサー", "ペイロード":{"data":{"metric": "Temperature", "value": "23", "dimension":"℃" }, "location": "ベルリン", "timamp": "2018-12-10 13:45:00:ベルリン", "タイムスタンプ": "2018-12-10 13:45:00.000"}} {"id": 2
{"id":2, "name":「温度センサー", "ペイロード":ペイロード": {"data":ベルリン", "タイムスタンプ": "2018-12-10 13:55:00.000"}} {"id": 3
{"id": 3, "name":"comment2", "payload":"Out of Order" }.
{"id":4, "name":「光センサー", "ペイロード":ペイロード": {"data":metric": "光", "value": "故障中" } {"id": 4「Ev"}、"場所":「ベルリン"、"タイムスタンプ":"2018-12-10 13:45:00.000"}}
{"id":5, "name":「ノイズセンサー", "ペイロード":ペイロード": {"data":"data": {"metric":"noise", "value": 43, "dimension":""}、"タイムスタンプ":"2018-12-10 13:45:00.000"}}
{"id":6, "name":「温度センサー", "ペイロード":{:{"metric": "Temperature", "value": "23", "dimension":"℃" }, "location": "パリ"paris"}、"timestamp": "2018-12-10 13:45:00.000"}。
{"id": 7, "name":「光センサー", "ペイロード":{"metric":「Ev"}、"場所":"Paris", "timestamp": "2018-12-10 13:45:00.000"}}。
{"id":8, "name":「ノイズセンサー", "ペイロード":{}}
{"id":9, "name":「温度センサー", "ペイロード":{"data":{"metric": "Temperature", "value": "23", "dimension":"℃" }, "location": "パリ", "タイムスタンプ": "0":"Paris", "timestamp": "2018-12-10 13:45:00.000"}}。
{"id":10, "name":「温度センサー", "ペイロード":ペイロード": {"data":"Paris", "timestamp": "2018-12-10 13:55:00.000"}}。
{"id": 11, "name":「温度センサー", "ペイロード":ペイロード": {"data":"London", "timestamp": "2018-12-10 13:45:00.000"}}。
{"id": 12, "name":「温度センサー", "ペイロード":ペイロード": {"data":"Rome", "timestamp": "2018-12-10 13:45:00.000"}}.
{"id": 13, "name":「温度センサー", "ペイロード":ペイロード": {"data":"Rome", "timestamp": "2018-12-10 13:55:00.000"}}`クリップボードにコピーする

上記のメッセージのほとんどはJSON形式であり、payload属性はネストされたJSONを含んでいる。しかし、現場の壊れたIoTデバイスの中には、{"id":3, "name":"comment2", "payload":"Out of Order"}のような誤ったメッセージを生成することもある。私たちは信頼性の高いデータパイプラインを構築したいので、複数の欠陥のあるデバイスからこのような誤ったメッセージを受信しても、データフローが停止することはないと考えています。

このようなメッセージをApache Kafkaに提供する方法の1つは、Python fake data producer for Apache Kafkaを拡張することであり、kcatのようなツールを使うこともできる。

Apache Flink® テーブルの定義

Apache Kafkaトピック内のデータを処理できるようにするには、Apache Flinkテーブルを定義する必要があります。Flink SQL クライアントでは、以下の SQL 文でテーブルを定義できます:

テーブル
 センサ (
 id STRING、
 name STRING、
 ペイロード STRING
 Apache Flink® には JSON データ型がないので、 // ここではペイロードを STRING 型として宣言する。
 )WITH (
 'connector' = 'kafka'、
 ...
 // その他のプロパティは、Apache Kafkaへの接続設定に基づいて入力する必要があります。
 ...
);`クリップボードにコピーする

Apache Flink® の JSON 関数を使ってデータを分析する

テーブルが定義されたので、壊れた IoT デバイスから送られてくる形成不良のレコードをフィルタリングし始めることができる。これは、data属性を含むpayloadフィールドに有効なJSONを含むメッセージのみを保持することで実現できる。新しい JSON_EXISTS 関数は、まさにこの問題を解決してくれる:

JSON_EXISTS` 関数はまさにこの問題を解決します。
FROM センサー
WHERE JSON_EXISTS(payload, '$.data');`Copy to clipboard

上記のSQLでは、特別な文字$$$は、.data'のようにプロパティにアクセスできるJSONパスのルートノードを示しています。
配列を解析する必要がある場合、同様の構文を使用して、位置によって正確な要素にアクセスすることができます (.data'.Ifweneedtoparsearrays、asimilararsynxtaを使用して、位置によって正確な要素にアクセスすることができます(.a[0].b)、または配列内の各要素のプロパティを取得します($$.a[*][.b)。

上記のSQLは以下の出力を生成します:

 id名 ペイロード
 1 温度センサー {"data":{"metric": "Temperatur~
 2 温度センサー {"data":{"metric": "Temperatur~
 4 光センサー {"data":{"metric": "Light", "va~
 5 ノイズセンサー{"data":{"metric": "Noise", "va~
 9 温度センサー {"data":{"metric": "Temperatur~
 10 温度センサー{"data":{"metric": "温度
 11 温度センサー {"data":{"metric": "Temperatur~
 12 温度センサー {"data": {"metric": "Temperatur~
 13 温度センサー {"data":{"metric": "Temperatur~` クリップボードにコピーする

有効なメッセージのみをフィルタリングした後、データの分析を始めることができる。例えば、正しく形成されたメッセージの location 属性に格納されている都市を抽出する必要があるかもしれない。JSON_VALUE` 関数を使って、JSON フィールドからスカラーを取得し、必要な情報を取り出すことができる:

SELECT DISTINCT JSON_VALUE(payload, '$.location') AS `city`
FROM sensors
WHERE JSON_EXISTS(payload, '$.data');`Copy to clipboard

結果は以下のようになるはずだ。これはメッセージに location 属性が含まれていないためである。

 都市
 ベルリン
 <NULL
 パリ
 ロンドン
 ローマ`クリップボードにコピー

JSON_VALUE関数は、デフォルトではSTRING` としてデータを取り出す。しかし、出力されるデータ型を変更したい場合があります。特に、何らかの計算を行いたい数値が出力される場合です。

例えば、都市別の平均気温を計算したい場合、ペイロードフィールド .data.value を抽出し、RETURNING オプションで INTEGER にキャストすることで、センサーの測定値を得ることができる。最後に .location フィールドを集約することができる:

SELECT
 AVG(JSON_VALUE(payload, '$.data.value' RETURNING INTEGER))AS `avg_temperature`、
 JSON_VALUE(payload, '$.location') AS `city`.
FROM センサー
WHERE JSON_VALUE(payload, '$.data.metric') = 'Temperature'
GROUP BY JSON_VALUE(payload, '$.location');`Copy to clipboard

そして期待される出力は

 都市
 19 ベルリン
 17 パリ
 23 ロンドン
 25 Rome`クリップボードにコピー

これまでのところ、抽出されたフィールドはすべてスカラーであった。しかし、Apache Flink® は JSON_QUERY という、より複雑なデータポイントを抽出する関数も提供している。

例えば、以下のクエリは有効な JSON を持つ全てのメッセージから完全な data フィールドを抽出し、残りのフィールドについては NULL を表示する。

SELECT JSON_QUERY(payload, '$.data') AS `data`
FROM sensors;`クリップボードにコピー

クエリーの出力は

 データ
 {"metric": "Temperature", "valu~
 {"metric": "Temperature", "valu~
 <NULL
 {"metric":"Light","value":23,~
 {"metric":"Noise","value":43,~
 <NULL
 <NULL>
 {"メトリック": "温度", "値~
 {メトリック": "温度", "値
 <NULL> <NULL>{"メトリック": "温度", "値~
 {"メトリック": "温度", "値
 {"metric": "Temperature", "valu~` クリップボードにコピーする。

Apache Flink® SQL で JSON 構造体を構築する

前のセクションでは、SQL を使って JSON メッセージを解析し、必要なフィールドを抽出する方法を説明しましたが、これはほんの一部に過ぎません。Apache Flink® SQL では、ネストされた JSON データセットを構築することもできます。まず、max関数で特定の場所、メトリック、タイムスタンプのピーク計測値を抽出して、上のデータセットのフラット表現を作成してみましょう。

SELECT
 JSON_VALUE(payload, '$.location') as loc、
 JSON_VALUE(payload, '$.data.metric') as metric、
 TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp')) as timestamp_value、
 MAX(JSON_VALUE(payload, '$.data.value')) as max_value
FROM センサー
WHERE JSON_EXISTS(payload, '$.data')
 AND JSON_EXISTS(payload, '$.location')
GROUP BY
 JSON_VALUE(payload, '$.data.metric')、
 JSON_VALUE(payload, '$.location')、
 TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp'));`Copy to clipboard

出力は、各メトリクスのピーク値、都市、タイムスタンプを含むフラットテーブルである:

 loc metric max_value timestamp_value
 ベルリン 気温 23 2018-12-10 13:45:00.000
 ベルリン気温 16 2018-12-10 13:55:00.000
 ベルリン ライト 23 2018-12-10 13:45:00.000
 パリ気温 23 2018-12-10 13:45:00.000
 パリ 気温 12 2018-12-10 13:55:00.000
 ロンドン気温 23 2018-12-10 13:45:00.000
 ローマ気温 23 2018-12-10 13:45:00.000
 ローマ気温 28 2018-12-10 13:55:00.000`クリップボードにコピー

ここで、IoTの読み取り値を持つすべての都市と読み取り値そのものをリストアップしたJSONフィールドを含む、timestamp_valuemetricごとにユニークなメッセージを作成したいとします。

すべての値を一意の行にまとめるには、キーと値の式を集約して JSON オブジェクト文字列を作成する JSON_OBJECTAG 関数を使用することができる。

Apache Flink® SQL はネストされた集約関数を許さないので、別のラッパーが必要です。

WITH sensors_with_max_metric AS (
 SELECT
 JSON_VALUE(payload, '$.location') AS loc、
 JSON_VALUE(payload, '$.data.metric') AS metric、
 TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp'))AS timestamp_value、
 MAX(JSON_VALUE(payload, '$.data.value'))AS max_value
 FROM センサー
 WHERE JSON_EXISTS(payload, '$.data')
 AND JSON_EXISTS(payload, '$.location')
 GROUP BY
 JSON_VALUE(payload, '$.data.metric')、
 JSON_VALUE(payload, '$.location')、
 TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp'))
)
SELECT
 timestamp_value、
 メトリック、
 JSON_OBJECTAGG(KEY loc VALUE max_value) AS json_object_value
FROM sensors_with_max_metric
GROUP BY timestamp_value, metric;`クリップボードにコピー

JSON_OBJECTAGによって、JSONオブジェクトの KEYVALUE` として使用するフィールドを定義できることに注意する。上記のSQLの出力は次のようになる:

 timestamp_value metric json_object_value
 2018-12-10 13:45:00.000 光 {"Berlin": "23"}.
 2018-12-10 13:45:00.000 気温{"ベルリン": "23", "ロンドン": "23",~
 2018-12-10 13:55:00.000 気温{"ベルリン": "16", "パリ": "12","~`クリップボードにコピーする

最後のステップとして、JSON_OBJECT関数を使用して、上記の3つのカラムをすべて含む、メッセージごとに一意のネストされたJSONドキュメントを作成することができます:

WITH sensors_with_max_metric AS (
 SELECT
 JSON_VALUE(payload, '$.location') AS loc、
 JSON_VALUE(payload, '$.data.metric') AS metric、
 TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp'))AS timestamp_value、
 MAX(JSON_VALUE(payload, '$.data.value'))AS max_value
 FROM センサー
 WHERE JSON_EXISTS(payload, '$.data')
 AND JSON_EXISTS(payload, '$.location')
 GROUP BY
 JSON_VALUE(payload, '$.data.metric')、
 JSON_VALUE(payload, '$.location')、
 TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp'))
),
sensors_with_max_metric_grouped_by_metric_and_timestamp AS (
 SELECT
 timestamp_value、
 metric、
 JSON_OBJECTAGG(KEY loc VALUE max_value) AS loc
 FROM sensors_with_max_metric
 GROUP BY
 timestamp_value、
 metric)
SELECT
 JSON_OBJECT(
 KEY 'timestamp' VALUE timestamp_value、
 KEY 'metric' VALUE s.metric、
 KEY 'values' VALUE s.loc
 )
FROM sensors_with_max_metric s;`Copy to clipboard

最終的な出力は以下のようになり、3つのJSONオブジェクトがネストされている。

 `{"timestamp":"2018-12-10 13:45:00.000","metric":"Light","values":"{\"Berlin\":\"23\"}"}
 {"timestamp":"2018-12-10 13:45:00.000","metric":"Temperature","values":"{\"Berlin\":\"23\",\"London\":\"23\",\"Paris\":\"23\",\"Rome\":\"23\"}"}
 {"timestamp":"2018-12-10 13:55:00.000","metric":"Temperature","values":"{\"Berlin\":\"16\",\"Paris\":\"12\",\"Rome\":\"28\"}"}`Copy to clipboard

SQL文だけで、ネストしたJSONデータ構造をパースして構築することができた。データ技術で最も使用されるデータ形式の1つを処理するためのカスタム関数はもう必要ありません!

次のステップ

Apache Flink® の SQL は、コミュニティの多大な努力によって、リリースのたびにますます強力になっています。より多くの機能、コネクター、関数が追加され、SQL API はストリーミング・データ・パイプラインを定義する主要なドライバーとなっています。ここで説明するJSON関数はバージョン1.15.0に搭載される予定ですが、現在利用可能な膨大なSQL関数のセットにはすでに慣れ始めていることでしょう。

その他の参考文献

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?