ビッグデータというような規模でなくても、別サービスからイベントデータを取り込んでレポート表示したいときなどに、一旦JSONのままドキュメントストアに格納しておくと、データ取り込みの再実行がやりやすくなり安心感増します。
アプリケーションで既にMySQL5.7以降を利用している場合、ドキュメントストアとしてMySQLのJSON型を使うと、トランザクション・ユニーク制約・柔軟なクエリ・他のテーブルとJOIN、など便利なことがあるかもしれません。
GCPのCloud PubSubでイベントデータを受け取りMySQLのJSON型として保持しておく、以下のようなデータの流れを想定します。MySQLのバージョンは5.7です。
メッセージブローカー(Cloud PubSub) -> コンシューマー(PubSubからpullするバッチアプリケーション) -> ドキュメントストア(MySQLのJSON型) -> データ処理 -> レポート表示
Cloud PubSubからREST APIで取得したJSONは以下のようなJSONになってます。 data
プロパティがBASE64エンコードされたメッセージです。
{
"ackId": "dummy",
"message": {
"data": "eyJyZXF1ZXN0SWQiOiIxMjMtMTIzIiwidXNlck5hbWUiOiJob2dlaG9nZSIsInByb2Nlc3NlZEF0IjoxNTQzMjg0MTMwfQ==",
"messageId": "111111111111111",
"publishTime": "2018-11-27T11:15:53.801Z"
}
}
このpublishされたメッセージに対して何かしらの加工・集計を行うことで、レポート表示するとします。publishされたメッセージをいきなり加工してしまうのではなく、一旦そのままのJSONでMySQLに格納してしまいます。
以下のようなテーブルを用意します。
mysql> show create table messages\G
*************************** 1. row ***************************
Table: messages
Create Table: CREATE TABLE `messages` (
`message_id` varchar(255) COLLATE utf8mb4_bin NOT NULL,
`received_message` json NOT NULL,
`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`message_id`),
KEY `on_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC
メッセージのINSERTです。コンシューマーであるバッチ処理でCloud PubSubからメッセージをpullしてINSERTします。Cloud PubSubはat least onceであるためメッセージが重複する可能性があ流ので ON DUPLICATE KEY
を使って対応します。
INSERT INTO messages(message_id, received_message)
VALUES ('111111111111111', '{ "ackId": "dummy", "message": ...<省略>... }')
ON DUPLICATE KEY UPDATE created_at = created_at
INSERT INTO ... SELECT ...
で加工・集計対象のテーブルに書き込むときは、以下のようなSQLになります。
INSERT INTO targets (request_id, user_name, processed_at)
SELECT JSON_UNQUOTE(JSON_EXTRACT(t.msg, '$.requestId')) AS request_id
,JSON_UNQUOTE(JSON_EXTRACT(t.msg, '$.userName')) AS user_name
,JSON_UNQUOTE(JSON_EXTRACT(t.msg, '$.processedAt')) AS processed_at
FROM ( SELECT CONVERT(FROM_BASE64(JSON_UNQUOTE(JSON_EXTRACT(received_message, '$.message.data'))) USING utf8mb4) AS msg
FROM messages
WHERE created_at >= '2018-11-27 11:00:00'
AND created_at < '2018-11-27 12:00:00'
) t
ON DUPLICATE KEY UPDATE request_id = request_id
;
なかなか読み辛いです...
-
JSON_EXTRACT 関数を使うとJSON pathを使って値を取り出せる
JSON_EXTRACT('{ message: { data: "hogehoge" } }', '$.message.data)
=>"hogehoge"
- ただし、
JSON_EXTRACT
で取り出した文字列はダブルクォーテーションで囲まれていて、そのまま他の関数に渡すと期待と違う挙動をしてしまう。そこで、 JSON_UNQUOTE 関数を使ってダブルクォーテーションを取り除く。JSON_UNQUOTE('{"k1":"v1"}')
=>{"k1":"v1"}
読み辛いだけでなく、書くのも結構面倒ですね。そこで、MySQLにはJSON型用に演算子が用意されております。
-
->
( 12.17.3 Functions That Search JSON Values )received_message->'$.message.data'
JSON_EXTRACT(received_message, '$.message.data)
-
->>
( 12.17.3 Functions That Search JSON Values )received_message->>'$.message.data'
JSON_UNQUOTE(JSON_EXTRACT(received_message, '$.message.data)
これらの演算子を使って先ほどのSQLを書き換えると以下のようになります。
INSERT INTO targets (request_id, user_name, processed_at)
SELECT t.msg->>'$.request_id' AS request_id
,t.msg->>'$.user_name' AS user_name
,t.msg->>'$.processed_at' AS processed_at
FROM ( SELECT CONVERT(FROM_BASE64(received_message->>'$.message.data' USING utf8mb4) AS msg
FROM messages
WHERE created_at >= '2018-11-27 11:00:00'
AND created_at < '2018-11-27 12:00:00'
) t
ON DUPLICATE KEY UPDATE request_id = request_id
;
これでtargetsテーブルとしてがっちりと構造化されたデータが出来上がったので、加工・集計を施してレポートの生成などを行います。
仮にレポートをリアルタイムで更新する必要があったとしても、リアルタイムな処理とは別ラインでバッチ処理できるようにしておくと、過去に遡ってやり直しがやりやすくて安心感増しますね。