背景
サービスを運用していると, ログに対して解析を行いたいと思う事は多いと思います.
しかし, ログを出力するコードを実装している時には解析の要件が正確には固まっていなかったり, 解析する側と出力側の人間が別だったりすると最初からスキーマをきちんと決めて個別のログを出力することは難しかったりすると思います.
そんなとき, 出力側は適当にログを出しておき, 解析側は要望ベースで事後的に解析したりできると色々便利です.
要はスキーマレスにログを出力し, スキーマレスなログにたいしてクエリを走らせられるようなログシステムを作ります.
採用技術
ログの格納先としては S3 を使用します. 好きなだけログを放り込めますしまず消えません.
解析のためには Athena を使用します. S3 のファイルに直接 SQL でクエリを発行できます.
設計
きちんと個々のログの内容を設計せず必要そうなデータを適当に出力しておき, 後から解析する関係上, どんな種類のログがどのようなスキーマで出力されるかは事前に決定されません. とはいえ Athena でクエリを書く以上はスキーマが固定でなければいけません.
ではどうするか.
全ログで固定のフォーマットを一つだけ決め, すべてのオプション項目は JSON のテキストとして特定のフィールドに押し込める.
これで OK です.
どういう事かというと, テーブル的には 以下の 3 カラムしか作りません.
- tag
- 文字列. ログの種類. action.quest.execute など. この情報で body の中身のフォーマットが大体決定されるようにする.
- time
- 文字列. ログが出力された時刻を ISO 8601 フォーマットで. timezone などを考えると time 型よりも柔軟性があったので.
- body
- 文字列. JSON でログの本体データ. ユーザー ID や行動内容など.
つまり S3 上のファイルには
{"tag": "action.quest.execute", "time": "2017-12-01T11:28:11.377847+00:00", "body": "{\\"user_id\\": \\"xxxxxxxxxxxxxxx\\", \\"quest_id\\": 10}"}
{"tag": "action.quest.execute", "time": "2017-12-01T11:28:22.377847+00:00", "body": "{\\"user_id\\": \\"xxxxxxxxxxxxxxx\\", \\"quest_id\\": 20}"}
{"tag": "action.quest.execute", "time": "2017-12-01T11:28:27.377847+00:00", "body": "{\\"user_id\\": \\"xxxxxxxxxxxxxxx\\", \\"quest_id\\": 20}"}
{"tag": "action.quest.execute", "time": "2017-12-01T11:28:41.377847+00:00", "body": "{\\"user_id\\": \\"xxxxxxxxxxxxxxx\\", \\"quest_id\\": 10}"}
みたいに格納したファイルが大量に並びます.
JSONの中にさらに文字列化されたJSONが入っています.
気持ち悪いと思いますが, このおかげで athena のテーブルとしては スキーマを固定できるのが全てです.
こうしておく事で Athena の DDL は以下の1種類のみで全ログを扱えます.
CREATE EXTERNAL TABLE IF NOT EXISTS test.log.table (
tag string ,
time string ,
body string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://{bucketname}/path/to/log/'
クエリ
さて, このままでは body の中はただの文字列なので検索や集計などで扱いが面倒です. ここで生きてくるのが JSON_EXTRACT という JSON をクエリ実行時パースできる関数です.
SELECT
tag,
time,
json_extract_scalar(body, '$.user_id') AS user_id,
json_extract_scalar(body, '$.quest_id') AS quest_id
FROM test.log.table
こんな感じのクエリをなげると, 以下のようにきちんと個別のカラムとして抽出できます. 当然合計などの集計処理も可能です.
tag | time | user_id | quest_id |
---|---|---|---|
action.quest.execute | 2017-12-01T11:28:11.377847+00:00 | xxxxxxxxxxxxxxx | 10 |
action.quest.execute | 2017-12-01T11:28:22.377847+00:00 | xxxxxxxxxxxxxxx | 20 |
action.quest.execute | 2017-12-01T11:28:27.377847+00:00 | xxxxxxxxxxxxxxx | 20 |
action.quest.execute | 2017-12-01T11:28:41.377847+00:00 | xxxxxxxxxxxxxxx | 10 |
存在しないフィールドは null になるだけなので, 多少フィールドの過不足があっても特に問題なく処理可能です.
ファイルパス
あとはひたすら S3 にある程度, 個々のサイズが小さくなるようにファイルを配置していけば良いわけですが, 量が増えてくるとパーティショニングが必要になってきます.
だいたいのことは classmethod さんの記事に書いてあります.
https://dev.classmethod.jp/cloud/aws/athena-partition-reinvent/
結論からいうと, パーティション追加のたびにクエリとか面倒なので, Hive フォーマットで配置するのが便利です.
その場合でもパーティションが増えるタイミングで
MSCK REPAIR TABLE {table name}
は発行する必要があるので scheduled lambda 等で定期的に呼び出すようにすると楽ですね.
以下のようにカテゴリもパーティションで運用するとテーブルも一つで済むので便利です.
s3://{bucketname}/log/category={log.category}/dt={datetime}/xxxxx.gz
s3://mybucket/log/category=action.quest.execute/dt=2017-12-01/2017-12-01-11-01-fef94876-9037-415f-8042-95d42f7e3c3d.gz
S3へのログの格納
ログを格納する側ですが, 直接S3に出力しても良いですが, Kinesis と lambda を使って S3 に出力するといい感じにバッファリングもできてオススメです.