2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

DeNAAdvent Calendar 2018

Day 8

データ取り込み時の中間テーブルにMySQLをドキュメントストアとして使う

Last updated at Posted at 2018-12-07

ビッグデータというような規模でなくても、別サービスからイベントデータを取り込んでレポート表示したいときなどに、一旦JSONのままドキュメントストアに格納しておくと、データ取り込みの再実行がやりやすくなり安心感増します。

アプリケーションで既にMySQL5.7以降を利用している場合、ドキュメントストアとしてMySQLのJSON型を使うと、トランザクション・ユニーク制約・柔軟なクエリ・他のテーブルとJOIN、など便利なことがあるかもしれません。

GCPのCloud PubSubでイベントデータを受け取りMySQLのJSON型として保持しておく、以下のようなデータの流れを想定します。MySQLのバージョンは5.7です。

メッセージブローカー(Cloud PubSub) -> コンシューマー(PubSubからpullするバッチアプリケーション) -> ドキュメントストア(MySQLのJSON型) -> データ処理 -> レポート表示

Cloud PubSubからREST APIで取得したJSONは以下のようなJSONになってます。 data プロパティがBASE64エンコードされたメッセージです。

{
  "ackId": "dummy",
  "message": {
    "data": "eyJyZXF1ZXN0SWQiOiIxMjMtMTIzIiwidXNlck5hbWUiOiJob2dlaG9nZSIsInByb2Nlc3NlZEF0IjoxNTQzMjg0MTMwfQ==",
    "messageId": "111111111111111",
    "publishTime": "2018-11-27T11:15:53.801Z"
  }
}

このpublishされたメッセージに対して何かしらの加工・集計を行うことで、レポート表示するとします。publishされたメッセージをいきなり加工してしまうのではなく、一旦そのままのJSONでMySQLに格納してしまいます。

以下のようなテーブルを用意します。

mysql> show create table messages\G
*************************** 1. row ***************************
       Table: messages
Create Table: CREATE TABLE `messages` (
  `message_id` varchar(255) COLLATE utf8mb4_bin NOT NULL,
  `received_message` json NOT NULL,
  `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`message_id`),
  KEY `on_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC

メッセージのINSERTです。コンシューマーであるバッチ処理でCloud PubSubからメッセージをpullしてINSERTします。Cloud PubSubはat least onceであるためメッセージが重複する可能性があ流ので ON DUPLICATE KEY を使って対応します。

INSERT INTO messages(message_id, received_message)
VALUES ('111111111111111', '{ "ackId": "dummy", "message": ...<省略>... }')
    ON DUPLICATE KEY UPDATE created_at = created_at

INSERT INTO ... SELECT ... で加工・集計対象のテーブルに書き込むときは、以下のようなSQLになります。

INSERT INTO targets (request_id, user_name, processed_at)
SELECT JSON_UNQUOTE(JSON_EXTRACT(t.msg, '$.requestId')) AS request_id
      ,JSON_UNQUOTE(JSON_EXTRACT(t.msg, '$.userName')) AS user_name
      ,JSON_UNQUOTE(JSON_EXTRACT(t.msg, '$.processedAt')) AS processed_at
  FROM ( SELECT CONVERT(FROM_BASE64(JSON_UNQUOTE(JSON_EXTRACT(received_message, '$.message.data'))) USING utf8mb4) AS msg
           FROM messages
          WHERE created_at >= '2018-11-27 11:00:00'
            AND created_at < '2018-11-27 12:00:00'
       ) t
    ON DUPLICATE KEY UPDATE request_id = request_id
;

なかなか読み辛いです...

  • JSON_EXTRACT 関数を使うとJSON pathを使って値を取り出せる JSON_EXTRACT('{ message: { data: "hogehoge" } }', '$.message.data) => "hogehoge"
  • ただし、 JSON_EXTRACT で取り出した文字列はダブルクォーテーションで囲まれていて、そのまま他の関数に渡すと期待と違う挙動をしてしまう。そこで、 JSON_UNQUOTE 関数を使ってダブルクォーテーションを取り除く。 JSON_UNQUOTE('{"k1":"v1"}') => {"k1":"v1"}

読み辛いだけでなく、書くのも結構面倒ですね。そこで、MySQLにはJSON型用に演算子が用意されております。

これらの演算子を使って先ほどのSQLを書き換えると以下のようになります。

INSERT INTO targets (request_id, user_name, processed_at)
SELECT t.msg->>'$.request_id' AS request_id
      ,t.msg->>'$.user_name' AS user_name
      ,t.msg->>'$.processed_at' AS processed_at
  FROM ( SELECT CONVERT(FROM_BASE64(received_message->>'$.message.data' USING utf8mb4) AS msg
           FROM messages
          WHERE created_at >= '2018-11-27 11:00:00'
            AND created_at < '2018-11-27 12:00:00'
       ) t
    ON DUPLICATE KEY UPDATE request_id = request_id
;

これでtargetsテーブルとしてがっちりと構造化されたデータが出来上がったので、加工・集計を施してレポートの生成などを行います。

仮にレポートをリアルタイムで更新する必要があったとしても、リアルタイムな処理とは別ラインでバッチ処理できるようにしておくと、過去に遡ってやり直しがやりやすくて安心感増しますね。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?