0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

S3 上のログファイルを Glue, Athena を使ってクエリ検索可能にする

Last updated at Posted at 2024-08-25

はじめに

アプリや Nginx などのログを集約している S3 Bucket はあるのだが、分析するための環境がなく、S3 からローカルに落としてゴニョゴニョするか、サーバーに接続して grep コマンドなどを駆使するという辛すぎる運用があり、どうにかして欲しいと依頼があったため作成したものの記録。
※ここで使用する AWS サービスそのものの説明は割愛する。

目標

  • 兎にも角にも Atnena でクエリ検索ができる
  • LTSV や JSON 型のログファイルをいい感じに整形してクエリ検索しやすくする
  • なるべく高速なクエリ検索
  • 既存のリソースには手を加えない
  • コストをあまりかけない
  • Terraform で構築する(まずは手動構築、Terraform 化は後々。。。)

先に構成図

S3 に出力されたログを Athena で検索する.drawio のコピー.drawio.png

構成図の説明をすると以下。

  1. 集約されたログファイルを Glue Job で整形し、別の S3 に出力
  2. Glue Table を作成してこの S3 を テーブル構造化し、Athena でクエリ検索可能に
  3. Glue Table に対し Partition Projection を実行し、フォルダ名で自動パーティショニング
  4. 都度 S3 に転送される各アプリのログファイルへの "1" の処理は Glue Job Schedules で定期実行

Glue DB, Glue Table を Athena で作成する

Athena で S3 に対しクエリ検索をするためには、Glue DB, Glue Table を作ってやる必要がある。
これにより S3 がテーブル構造化され、Athena によるクエリ検索が可能になる。

DB はポチポチ作るのみで、肝は Table の作成。
これは色々な設定値があって初見では難しい、、、

Table の作成法は主に以下3種類ある。

  1. シンプルに Table を作成
  2. Glue Crawler にクローリングさせて自動作成
  3. Athena でクエリ発行

1 に関しては、コンソールからでも CLI でも直接作成する方法。これは言うまでもないため 2 について。
Crawler はデータソースのデータ構造をクローリング(スキャン)することで Table を作成してくれるもの。
クローリングするにはデータソースへのアクセス権(IAM)や Connections という Glue のリソース(NW の疎通に必要)を作成する必要がある。
データソースが既にテーブル構造化されていて、型やパーティションなどに手を加える必要がない場合(ヘッダーのある CSV など)、Crawler に Table を作成させるのが一番お手軽だと思う。
または、手を加える必要があってもひとまず Crawler に作成させ、そこに手を加えていく形でも良いかと思う。

3 については、SQL でテーブルを作成する要領で Athena で Table を作成する方法。
今回は Table 作成法をテンプレート化したかったため、Athena からクエリを発行して作成する方針とした。(手動で構築している今回は、1, 2 の方法だとテンプレート化できない)
以下のクエリで作成。

CREATE EXTERNAL TABLE IF NOT EXISTS <Glue_DB>.<Glue_Table> (
    time TIMESTAMP,
    request_id STRING,
    unix_epoch BIGINT,
    pid INT,
    agent STRING,
    ip STRING,
    request STRING,
    response STRUCT<
        status:INT,
        body:STRING
    >
)
PARTITIONED BY (
    month STRING,
    service_name STRING,
    environment STRING
)
STORED AS PARQUET
LOCATION 's3://log-store-for-athena-query/'
TBLPROPERTIES (
	'projection.enabled' = 'true',
    'projection.month.type' = 'date',
	'projection.month.format' = 'yyyyMM',
	'projection.month.interval' = '1',
	'projection.month.interval.unit' = 'MONTHS',
	'projection.month.range' = '201801,NOW',
	'projection.service_name.type' = 'enum',
    'projection.service_name.values' = 'service1,service2,service3',
    'projection.environment.type' = 'enum',
    'projection.environment.values' = 'production,staging,development',
	'storage.location.template' = 's3://log-store-for-athena-query/${environment}/${service_name}/web/rails/api/${month}/'
)

上記クエリを分割して解説していく。
※ Athena は Trino? Presto? という SQL エンジンが採用されているため、独自の記法がある

◼️ Table の作成とカラムの定義

CREATE EXTERNAL TABLE IF NOT EXISTS <Glue_DB>.<Glue_Table> (
    time TIMESTAMP,
    request_id STRING,
    unix_epoch BIGINT,
    pid INT,
    agent STRING,
    ip STRING,
    request STRING,
    response STRUCT<
        status:INT,
        body:STRING
    >
)

まず初めに、Table を作成してカラムの定義をするところから。
ここで Glue DB を指定すると、指定された DB に Table が作成される。
そしてテーブル構造化したい S3 上のファイルが持つカラム名とその型を指定する。

今回扱うデータソースは JSON 型のため、ネストが発生し得る。
ネスト部分は STRUCT とすることで表現可能。

指定可能な型は以下ドキュメント参考。

◼️パーティション化

PARTITIONED BY (
    month STRING,
    service_name STRING,
    environment STRING
)

### 中略 ###

TBLPROPERTIES (
	'projection.enabled' = 'true',
    'projection.month.type' = 'date',
	'projection.month.format' = 'yyyyMM',
	'projection.month.interval' = '1',
	'projection.month.interval.unit' = 'MONTHS',
	'projection.month.range' = '201801,NOW',
	'projection.service_name.type' = 'enum',
    'projection.service_name.values' = 'service1,service2,service3',
    'projection.environment.type' = 'enum',
    'projection.environment.values' = 'production,staging,development',
	'storage.location.template' = 's3://log-store-for-athena-query/${environment}/${service_name}/web/rails/api/${month}/'
)

【S3(Glue Table) のパーティション化】

例えば S3 の構造が以下のように同じバケットの中で環境・サービス名・年月日などでフォルダ分けされている場合、毎回 Athena で全量に対しクエリを発行するのはクエリのパフォーマンス的にもコスト的にもあまりに無駄が多い。
s3://log-store-for-athena-query/${environment}/${service_name}/web/rails/api/${month}/

そこで、フォルダごとにパーティション化してやることで、無駄のないクエリをバシバシ発行してやろうという魂胆。

ではパーティションはどのように作成するのか。
方法は二つあって、Apache Hive スタイルと Hive 以外のスタイルで分けられる。
以下、ドキュメントから抜粋。

Athena では Apache Hive スタイルのパーティションを使用できます。このパーティションのデータパスには、等号で連結されたキーと値のペア (例えば country=us/... または year=2021/month=01/day=26/...) が含まれています。つまり、それぞれのパスにより、パーティションのキーと値、両方の名前が表されます。新しい Hive パーティションをパーティションされたテーブルにロードするには、(Hive スタイルのパーティションのみで機能する) MSCK REPAIR TABLE コマンドを使用します。

Athena では、Hive 以外のスタイルのパーティション化スキームを使用することも可能です。たとえば、CloudTrail ログと Firehose 配信ストリームは、data/2021/01/26/us/6fc7845e.json など、日付部分に個別のパスコンポーネントを使用します。これらの Hive スタイルではないパーティションの場合、ALTER TABLE ADD PARTITION を使用して手動でパーティションを追加します。

Apache Hive スタイルだけ例を挙げておくと、S3 のパスが例えば以下のようになっていれば、 PARTITIONED BY 句でパーティション名と型を指定した上で、 MSCK REPAIR TABLE コマンドを使用することで Athena はパーティションとして認識できる。
s3://log-store-for-athena-query/year=2024/month=01/day=01

【Partition Projection】

では、いずれかの方法でさあ作成だ、となった時、おそらく手が止まるはずだ。
もちろんフォルダ構成にもよるが、例えば年月日などでフォルダ分けされている場合、検索対象となり得る過去の全フォルダをパーティション化したいし、未来永劫更新され続けるフォルダもパーティション化したいからだ。

具体的には、 2024/082020/12/1009-10-2029 もパーティション化したいから。
となると、過去の年月日のパーティションを全て作成するの?毎日新たにパーティションを作成するの?という疑問が生まれる。

そこで登場するのが、Partition Projection
Projection とは「投影」「射影」という意味で、身近な使われ方としてはプロジェクションマッピングなどだろうか。

Partition Projection は、設定時に与えられたルール、法則を元に自動でパーティション化してくれる機能のこと。
抜粋してクエリを再掲すると以下部分が該当。

TBLPROPERTIES (
	'projection.enabled' = 'true',
    'projection.month.type' = 'date',
	'projection.month.format' = 'yyyyMM',
	'projection.month.interval' = '1',
	'projection.month.interval.unit' = 'MONTHS',
	'projection.month.range' = '201801,NOW',
	'projection.service_name.type' = 'enum',
    'projection.service_name.values' = 'service1,service2,service3',
    'projection.environment.type' = 'enum',
    'projection.environment.values' = 'production,staging,development',
	'storage.location.template' = 's3://log-store-for-athena-query/${environment}/${service_name}/web/rails/api/${month}/'
)

一つずつ見てみる。
尚、以下に記述する <columnName> の部分はパーティション名になるため、既存のカラム名ではなく PARTITIONED BY 句で指定した名前になる。
※詳細は以下公式ドキュメント参照

projection.enabled

Partition Projection を有効化。

projection.<columnName>.type

使用する射影型。

projection.<columnName>.format

日付フォーマット。
ここでは 202408 のような形式でフォルダが切られているので yyyyMM を指定。

projection.<columnName>.interval

連続するパーティション値の間隔。
ここでは 202408 -> 202409 のように毎月 1 ずつ増えるので 1 を指定。

projection.<columnName>.interval.unit

projection.<columnName>.interval の単位。
「月」が増えるので MONTHS を指定。

projection.<columnName>.range

最小および最大 range 値。
ここでは 201801 ~ 現在までを指定。
(参考にした記事の例で 201801 となっていたのでそのまま引用して作成してしまったが、本来であれば実際のフォルダ構成に合わせるべきだった)

projection.<columnName>.values

パーティション値のカンマ区切りリスト。
projection.<columnName>.typeenum の場合、すなわち日付や数字のように一定の間隔で増加するものではない場合に設定。

storage.location.template

Partition Projection した S3 のパスは日時などによって変動するため、プレースホルダーを使ったテンプレートパスを定義してやる必要がある。
プレースホルダーには PARTITIONED BY 句で指定したパーティション名を入れてやる。
尚、以下抜粋にある通り、テンプレートパスはスラッシュで終わる必要がある。

テンプレート化された場所は、パーティション化されたデータファイルがパーティションごとの「フォルダ」に格納されるように、スラッシュで終わる必要があります。

これでフォルダが増えても TBLPROPERTIES で定義したルールに則っている限りは自動的にパーティション化される。

◼️ S3 上のファイルフォーマットの定義

STORED AS PARQUET

STORED AS では、S3 上のファイルがどのようなフォーマットであるかを定義している。

"Parquet とは"、という部分についてはスキップするが、簡単に説明すると列志向のファイルフォーマットのため、クエリ実行時に行方向に全ての列のデータを読み込むのではなく、クエリに必要な列だけを読み込むことで、I/O の負荷を減らし、パフォーマンスを向上させることができる。
対比で説明すると、例えば CSV は行志向のフォーマットなので、不要なカラムであっても必ず読み込まなければいけない。

イメージとしては、会員データの更新など、レコードを丸ごと更新するような、いわば CRUD 操作に対応するには行志向、Athena のようにデータの検索、分析に使用するのであれば、特定のカラムの値のみ抽出できる列志向が向いているといった感じだろうか。

【データ読み込み形式の指定】

今回はファイル形式を Parquet としたのみでその他指定事項はないが、データを読み込ませる上で追加で施したい処理がある場合、例えば以下のように記述することができる。(処理を施すといってもデータの加工をするわけではなく、どのようなルールで読み込むか、ということ)

ROW FORMAT SERDE  
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
WITH SERDEPROPERTIES (  
'parquet.ignore.statistics' = 'true')  
STORED AS PARQUET

以下に解説をしていくが、調べ方が悪いのか、各ファイル形式ごとの SerDe(シリアライザー/デシリアライザー)ライブラリについて詳しく書かれたドキュメントが見つからず、ChatGPT 君に教えてもらった情報も並ぶので内容の確からしさについては保証できかねるので悪しからず、、、

そもそもまず SerDe とは以下。

SerDe は、Athena で使用されているデータカタログにデータの処理方法を指示するカスタムライブラリです。SerDe タイプは、Athena で CREATE TABLE ステートメントの ROW FORMAT 部分に SerDe タイプを明示的にリストすることによって指定します。Athena は、特定のタイプのデータ形式にデフォルトでいくつかの SerDe タイプを使用するため、SerDe 名を省略できる場合もあります。

各ファイル形式ごとに SerDe ライブラリは用意されており、Parquet であれば以下。
org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe

そして parquet.ignore.statistics' = 'true' は Parquet ファイル内の統計情報を無視するというオプション。
デフォルトでは false のため明示的に WITH SERDEPROPERTIEStrue を指定する。
逆にライブラリのデフォルトに準拠する場合は、 STORED AS PARQUET のように読み込むデータの型を指定するのみで OK。

また、CSV や TSV のようなテキストファイルであれば、デリミタ(データの区切り文字)などの指定のみで事足りる(Athena はこの形式を解釈するのに特別な設定は不要)ため、 SerDe ライブラリは必須ではない。
※正確には、どの SerDe も指定せずに ROW FORMAT DELIMITED のみを指定すると自動的に LazySimpleSerDe が使用される。以下参照。

一方 CSV や TSV でも、以下に該当する場合は SerDe を指定する必要がある。

引用符で囲まれた値がデータに含まれている場合や、TIMESTAMP に UNIX の数値形式 (例えば、1564610311) を使用している場合は、CSV を処理するための OpenCSVSerDe を使用します。

そして STORED AS はデフォルトで TEXTFILE であり、CSV や TSV などを扱う場合は以下のように省略可能。

ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  ESCAPED BY '\\'
  LINES TERMINATED BY '\n'
LOCATION 's3://athena-examples-myregion/flight/csv/';

Parquet 用の SerDe を使った例に戻るが、Parquet ファイルには、ファイルや各列の統計情報(min 値、max 値、null の数など)が含まれている。
そしてその統計情報は、クエリのパフォーマンス最適化に役立ち、例えばクエリ実行時に特定のデータブロックを読み込むかどうかを判断するのに使用される。
例えば以下のデータで考える。

ID Name Age Salary
1 Alice 30 60000
2 Bob 35 70000
3 Charlie 25 50000
4 David 28 55000
5 Eve 32 60000

Parquet ファイル内では以下のような統計情報が列ごとに格納される。

# ID 列
min値: 1
max値: 5
nullの数: 0

# Name 列
min値: "Alice"
max値: "Eve"
nullの数: 0

# Age 列
min値: 25
max値: 35
nullの数: 0

# Salary 列
min値: 50000
max値: 70000
nullの数: 0

この状態で以下のクエリを発行した場合、統計情報を使ってこのデータブロックをスキップ(読み取り不要)することができる。
これは、 Age の max 値(35)よりも大きく、条件に合致するデータが存在しないことが分かるため。

SELECT * FROM my_table WHERE Age = 40;

もし parquet.ignore.statistics = true を設定した場合、これらの統計情報が無視され全てのデータブロックを読み込み、結果として不要なデータブロックまで読み取ることになり、クエリ実行時間が長くなる。
ユースケースとしては、ファイルが何らかの理由で不正確な統計情報を持っている場合など。

とまあだいぶ長々と横道に外れてしまったがまとめると以下。

  • デフォルトのデータ読み込み形式で良い場合は STORED AS でファイル形式を指定するのみ
    • デフォルトで STORED AS TEXTFILE となるため、CSV や TSV などのテキストファイルの場合は指定不要
  • データ読み込み形式をカスタマイズしたい場合には SerDe ライブラリを使用
    • 場合によっては SerDe ライブラリを指定せずともデフォルトで適用される
    • SerDe ライブラリのデフォルトのプロパティを変更したい場合は SERDEPROPERTIES で指定

◼️テーブル構造化する S3 パスの指定

LOCATION 's3://log-store-for-athena-query/'

以下引用文の通り。
記法についていくつか注意点があるので詳細は公式ドキュメントへ。

Athena が、指定した Amazon S3 フォルダに保存されているすべてのデータを読み込みます。Athena に読み込ませたくないデータがある場合は、そのデータを Athena に読み込ませたいデータと同じ Amazon S3 フォルダに保存しないでください。

ログの加工・整形

今回の場合、アプリが S3 に転送した生ログを Glue Job によって加工・整形し、テーブル構造化するための準備をすることで、前段までで記載してきたクエリによってテーブル構造化される。
これをしてやらないと、レコード全体が単一のカラムに格納されている状態となるか、そもそもクエリを受け付けない。

(※本来であればまず Glue Job で加工・整形したログを S3 に出力し、それに対し前段までで記載してきたクエリを発行することでテーブル構造化するのだが、この記事のメインだったため順番が前後している、、、)

今回扱ったのは Nginx のアクセスログ(LTSV)とアプリのログ(JSON)。(上述までで例示したのはアプリログ用のテーブル作成クエリ)
テーブル構造化の準備のほか、加工・整形内容は以下。

  • Parquet 化
  • gzip 圧縮

これらによって受けられる恩恵は、①コスト圧縮 ②クエリ高速化の2点。

◼️コスト圧縮

【Athena の課金体系】

よって、Parquet 化、gzip 圧縮することで Athena のデータスキャン量が削減できるためコスト圧縮に繋がる。

【Glue Job の課金体系】

データの加工・整形をするためのコストもなるべく抑えたい。

今回の用途は日次のバッジ処理であり、リアルタイム性は不要だったので 2 DPU、 Flex execution を有効とした。

◼️クエリ高速化

これは前述の通りで、列志向のファイルフォーマットである Parquet に変換し gzip に圧縮することで、 Athena のクエリが効率化され、かつスキャン対象量が削減されるため。

これらを実現する Glue Job は、作成法が 2 つある。
とにかく必要最低限の要件が満たせれば良いのであれば GUI で、しっかり作り込むならスクリプトを書いて作成する。(と思っている)
今回は環境変数、エラーハンドリングなどを組み込みたかったのでスクリプトを書いた。
Spark, Python Shell, Ray の3つのジョブタイプが選べるが、データをガチャガチャいじる必要があったので、Spark を使用してみた。

実際に書いた Glue Job のスクリプトは以下に載せるが、さすがにボリュームがあるので解説は割愛。(アプリエンジニアではなく修行が足りないのでツッコミどころがあったら指摘ください)

GUI で作ると自動でスクリプト化されていくので、まずはある程度動くところまで GUI で作ってみて、そこに対しカスタマイズしていくというのが一番やりやすい気がする。(ただし一度スクリプトに手を入れると GUI での表示が不可になる)

個人的に最低限押さえているのは、以下の2つを使用したデバッグ法。
データ構造がどうなっているか逐一確認できて非常に助かったのでおすすめ。

df.printSchema() # テーブル構造を出力
df.show(n=5, truncate=False, vertical=True) # レコードを出力(後述の DynamicFrame と Dataframe とでオプションに違いあり)

後は、データ操作時に使用する2種類のオブジェクトである、Glue 独自の DynamicFrame と Apache Spark の Dataframe の使い分け。
それぞれに特徴があるので適切に使い分けるのだがあまり違いが分かっていない、、、
ただし、後続の処理への受け渡し時には DynamicFrame である必要があるはず。

他にもデータ操作時に使用する Glue 独自のオブジェクトがいくつかある。

アプリログ用(JSON)
import sys
import boto3
import logging
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import to_json, col, to_timestamp, from_utc_timestamp
from datetime import datetime
from pyspark.sql.types import TimestampType, StructType, StructField, StringType, LongType, IntegerType

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

# Job details で設定した環境変数の受け取り
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'S3_BUCKET_NAME',
    'S3_BUCKET_NAME_FOR_ATHENA_QUERY',
    'LOG_FILE_MIDDLE_PATH',
    'ENVIRONMENT',
    'SERVICE_NAMES',
    'COMPRESSION_CODEC'
])

# SparkContext を初期化
sc = SparkContext()

# GlueContext を初期化
# Apache Spark 上で動作するため引数に SparkContext を指定
glueContext = GlueContext(sc)

# データ操作のための SparkSession を取得(Glue から Spark を利用可能にする)
spark = glueContext.spark_session

# Job オブジェクトの初期化
# glueContext を引数として渡すことで、Job が SparkSession や Glue のサービスと連携できる
job = Job(glueContext)

# job の初期化、bookmark 機能利用、環境変数を渡す
job.init(args['JOB_NAME'], args)

# 環境変数を使える形に定義
bucket_name = args["S3_BUCKET_NAME"]
athena_query_bucket_name = args["S3_BUCKET_NAME_FOR_ATHENA_QUERY"]
log_file_middle_path = args["LOG_FILE_MIDDLE_PATH"]
environment = args["ENVIRONMENT"]
service_names = args["SERVICE_NAMES"]
compression_codec = args["COMPRESSION_CODEC"]

logger.info(f"S3_BUCKET_NAME '{bucket_name}'")
logger.info(f"S3_BUCKET_NAME_FOR_ATHENA_QUERY '{athena_query_bucket_name}'")
logger.info(f"LOG_FILE_MIDDLE_PATH '{log_file_middle_path}'")
logger.info(f"ENVIRONMENT '{environment}'")
logger.info(f"SERVICE_NAMES '{service_names}'")
logger.info(f"COMPRESSION_CODEC '{compression_codec}'")

try:
    # 環境変数の存在チェック
    env_vars = [
        "S3_BUCKET_NAME",
        "S3_BUCKET_NAME_FOR_ATHENA_QUERY",
        "LOG_FILE_MIDDLE_PATH",
        "ENVIRONMENT",
        "SERVICE_NAMES",
        "COMPRESSION_CODEC"
    ]
    for var in env_vars:
        if not args[var]:
            raise ValueError(f"{var} に値が設定されていません")
    
    if service_names:
        # service_names を "," で分割し、前後の空白を削除して新しい配列に格納
        service_names = [name.strip() for name in service_names.split(",")]
    else:
        service_names = []
    
    # yyyyMM 形式で現在の年月を取得
    current_month = datetime.now().strftime("%Y%m")

    for service_name in service_names:
        # 各 service_names ごとの S3 フォルダパスを定義
        common_prefix_path = f"{environment}/{service_name}/{log_file_middle_path}/{current_month}/"
        source_path = f"s3://{bucket_name}/{common_prefix_path}"
        destination_path = f"s3://{athena_query_bucket_name}/{common_prefix_path}"
        logger.info(f"Source path: {source_path}")
        logger.info(f"Destination path: {destination_path}")

        # Initialize S3 client
        s3 = boto3.client('s3')

        # Athena による検索用バケット配下に該当フォルダが存在しない場合新規作成
        # 毎月1日にデータソース側に yyyyMM フォルダが新しく作成されたり、
        # 新しくログを出し始めたサービスがあった場合などが該当
        response = s3.list_objects_v2(Bucket=athena_query_bucket_name, Prefix=common_prefix_path)
        if 'Contents' not in response:
            logger.info(f"{destination_path} が存在しないため新規作成します")
            s3.put_object(Bucket=athena_query_bucket_name, Key=f"{common_prefix_path}")    
        
        # request フィールド全体を JSON 文字列に変換しスキーマを再定義
        # time カラムを変更
        def ProcessLogData(glueContext, dfc) -> DynamicFrameCollection:
            # DynamicFrame から DataFrameに変換
            # DynamicFrameCollection で受け取るため、inxex0 の DynamicFrame を抽出
            df = dfc.select(list(dfc.keys())[0]).toDF()

            # スキーマ情報を出力
            logger.info("変換前のスキーマ情報を出力します")
            df.printSchema()
            
            # DataFrame を出力
            logger.info("変換前の DataFrame を出力します(前回実行時からの増分のみ)")
            df.show(n=5, truncate=False, vertical=True)
            
            logger.info("string 型の time カラムを timestamp 型(JST)に変換します")
            logger.info("変換前")
            df.select("time").show(n=5, truncate=False)
            
            # time カラムを timestamp 型に変換し日本時刻にする
            df = df.withColumn("time", to_timestamp(col("time"), "yyyy-MM-dd'T'HH:mm:ssXXX"))
            df = df.withColumn("time", from_utc_timestamp("time", "Asia/Tokyo"))
            
            logger.info("変換後")
            df.select("time").show(n=5, truncate=False)

            # request フィールド全体を JSON 文字列に変換
            # request.parameters の中のキーが固定ではなくスキーマの定義ができないため、文字列型で保持
            df = df.withColumn("request", to_json(col("request")))
            
            schema = StructType([
                StructField("time", TimestampType(), True),
                StructField("request_id", StringType(), True),
                StructField("unix_epoch", LongType(), True),
                StructField("pid", IntegerType(), True),
                StructField("agent", StringType(), True),
                StructField("ip", StringType(), True),
                StructField("request", StringType(), True), # JSON 文字列として保持
                StructField("response", StructType([
                    StructField("status", IntegerType(), True)
                ]), True)
            ])
            
            # DataFrame に新しいスキーマ定義を適用
            df = spark.createDataFrame(df.rdd, schema)

            # スキーマ情報を出力
            logger.info("request フィールドを JSON 文字列に変換します")
            logger.info("変換後のスキーマ情報を出力します")
            df.printSchema()

            # DataFrame を出力
            logger.info("変換後の DataFrame を出力します")
            df.show(n=5, truncate=False, vertical=True)

            # DataFrame を DynamicFrame に変換
            dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
            return DynamicFrameCollection({"CustomTransform0": dynamic_frame}, glueContext)
        
        # 処理①
        # S3 からオブジェクトを取得し DynamicFrame を生成
        CreateDynamicFrameByS3_node = glueContext.create_dynamic_frame.from_options(
            format_options={"multiline": False},
            connection_type="s3",
            format="json",
            connection_options={
                "paths": [f"{source_path}"],
                "recurse": True
            },
            transformation_ctx="CreateDynamicFrameByS3_node"
        )
        
        # 処理②
        # request フィールド全体を JSON 文字列に変換しスキーマを再定義
        CustomTransform_node = ProcessLogData(
            glueContext,
            DynamicFrameCollection({"CreateDynamicFrameByS3_node": CreateDynamicFrameByS3_node}, glueContext)
        )
        
        # 処理③
        # 処理②の出力結果が DynamicFrameCollection のため、
        # 後続の S3 が受け取れないため単一の DynamicFrame を取得して渡す
        SelectFromCollection_node = SelectFromCollection.apply(
            dfc=CustomTransform_node,
            key=list(CustomTransform_node.keys())[0],
            transformation_ctx="SelectFromCollection_node"
        )
        
        # 処理④
        # parquet に変換、 gzip に圧縮し S3 に出力
        WriteDynamicFrameForS3_node = glueContext.write_dynamic_frame.from_options(
            frame=SelectFromCollection_node,
            connection_type="s3",
            format="glueparquet",
            connection_options={
                "path": f"{destination_path}",
                "partitionKeys": []
            },
            format_options={"compression": "gzip"},
            transformation_ctx="WriteDynamicFrameForS3_node"
        )
    
    # job 終了、bookmark 更新
    job.commit()
    logger.info("Job committed successfully")

except Exception as e:
    logger.error(f"An error occurred: {e}")
    job.commit()
    raise

Nginx のアクセスログ用(LTSV)
import sys
import boto3
import logging
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, split, regexp_replace, when, to_timestamp, regexp_extract, from_utc_timestamp
from datetime import datetime

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

# Job details で設定した環境変数の受け取り
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'S3_BUCKET_NAME',
    'S3_BUCKET_NAME_FOR_ATHENA_QUERY',
    'LOG_FILE_MIDDLE_PATH',
    'ENVIRONMENT',
    'SERVICE_NAMES',
    'COMPRESSION_CODEC'
])

# SparkContext を初期化
sc = SparkContext()

# GlueContext を初期化
# Apache Spark 上で動作するため引数に SparkContext を指定
glueContext = GlueContext(sc)

# データ操作のための SparkSession を取得(Glue から Spark を利用可能にする)
spark = glueContext.spark_session

# Job オブジェクトの初期化
# glueContext を引数として渡すことで、Job が SparkSession や Glue のサービスと連携できる
job = Job(glueContext)

# job の初期化、bookmark 機能利用、環境変数を渡す
job.init(args['JOB_NAME'], args)

# 環境変数を使える形に定義
bucket_name = args["S3_BUCKET_NAME"]
athena_query_bucket_name = args["S3_BUCKET_NAME_FOR_ATHENA_QUERY"]
log_file_middle_path = args["LOG_FILE_MIDDLE_PATH"]
environment = args["ENVIRONMENT"]
service_names = args["SERVICE_NAMES"]
compression_codec = args["COMPRESSION_CODEC"]

logger.info(f"S3_BUCKET_NAME '{bucket_name}'")
logger.info(f"S3_BUCKET_NAME_FOR_ATHENA_QUERY '{athena_query_bucket_name}'")
logger.info(f"LOG_FILE_MIDDLE_PATH '{log_file_middle_path}'")
logger.info(f"ENVIRONMENT '{environment}'")
logger.info(f"SERVICE_NAMES '{service_names}'")
logger.info(f"COMPRESSION_CODEC '{compression_codec}'")

try:
    # 環境変数の存在チェック
    env_vars = [
        "S3_BUCKET_NAME",
        "S3_BUCKET_NAME_FOR_ATHENA_QUERY",
        "LOG_FILE_MIDDLE_PATH",
        "ENVIRONMENT",
        "SERVICE_NAMES",
        "COMPRESSION_CODEC"
    ]
    for var in env_vars:
        if not args[var]:
            raise ValueError(f"{var} に値が設定されていません")

    if service_names:
        # service_names を "," で分割し、前後の空白を削除して新しい配列に格納
        service_names = [name.strip() for name in service_names.split(",")]
    else:
        service_names = []
    
    # yyyyMM 形式で現在の年月を取得
    current_month = datetime.now().strftime("%Y%m")
    
    for service_name in service_names:
        # 各 service_names ごとの S3 フォルダパスを定義
        common_prefix_path = f"{environment}/{service_name}/{log_file_middle_path}/{current_month}/"
        source_path = f"s3://{bucket_name}/{common_prefix_path}"
        destination_path = f"s3://{athena_query_bucket_name}/{common_prefix_path}"
        logger.info(f"Source path: {source_path}")
        logger.info(f"Destination path: {destination_path}")

        # Initialize S3 client
        s3 = boto3.client('s3')

        # Athena による検索用バケット配下に該当フォルダが存在しない場合新規作成
        # 毎月1日にデータソース側に yyyyMM フォルダが新しく作成されたり、
        # 新しくログを出し始めたサービスがあった場合などが該当
        response = s3.list_objects_v2(Bucket=athena_query_bucket_name, Prefix=common_prefix_path)
        if 'Contents' not in response:
            logger.info(f"{destination_path} が存在しないため新規作成します")
            s3.put_object(Bucket=athena_query_bucket_name, Key=f"{common_prefix_path}")

        # LTSV 形式の値を整形
        # ①各カラムの値をコロンで分割し、右辺を抽出し上書き(request_method:GET -> GET)
        # ②time カラムのフォーマット変更
        # ③uri カラムを抽出してフィルタリングおよび置換
        def ProcessLogData(glueContext, dfc) -> DynamicFrameCollection:
            # DynamicFrame から DataFrameに変換
            # DynamicFrameCollection で受け取るため、inxex0 の DynamicFrame を抽出
            df = dfc.select(list(dfc.keys())[0]).toDF()
            
            # スキーマ情報を出力
            logger.info("スキーマ情報を出力します")
            df.printSchema()

            # DataFrame を出力
            logger.info("DataFrame を出力します(前回実行時からの増分のみ)")
            df.show(truncate=False)

            # ①各カラムの値をコロンで分割し、右辺を抽出し上書き(request_method:GET -> GET)
            # ②time カラムのフォーマット変更
            for column in df.columns:
                df = df.withColumn(column, regexp_extract(df[column], fr"{column}:(.*)", 1))
                if column == "time":
                    logger.info(f"{column} カラムのフォーマット変更前")
                    df.select(column).show(truncate=False)
                    
                    logger.info(f"{column} カラムのフォーマットを変更します")
                    df = df.withColumn(column, to_timestamp(column, "yyyy-MM-dd'T'HH:mm:ssXXX"))
                    df = df.withColumn(column, from_utc_timestamp(col(column), "Asia/Tokyo"))
                    
                    logger.info(f"{column} カラムのフォーマット変更後")
                    df.select(column).show(truncate=False)
            
            # ③uri カラムを抽出してフィルタリングおよび置換
            if "uri" in df.columns:
                # 値ごと 'uri' カラムをコピーして 'processed_uri' カラムを作成
                df = df.withColumn("processed_uri", col("uri"))
                
                # 'processed_uri' に '/user/tokens' を含むものを空文字列に変更
                df = df.withColumn(
                    "processed_uri",
                    when(df["processed_uri"].contains("/user/tokens"), "").otherwise(df["processed_uri"])
                )
                
                # 正規表現による置換
                df = df.withColumn(
                    "processed_uri",
                    regexp_replace(df["processed_uri"], r"[0-9]+\.[0-9]+", "<id>")
                )
            
            # スキーマ情報を出力
            logger.info("ProcessLogData による整形後のスキーマ情報を出力します")
            df.printSchema()
            
            # DataFrame を出力
            logger.info("ProcessLogData による整形後の DataFrame を出力します(前回実行時からの増分のみ)")
            df.show(truncate=False)

            # DataFrame を DynamicFrame に変換
            dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
            return DynamicFrameCollection({"CustomTransform0": dynamic_frame}, glueContext)
        
        # 処理①
        # S3 からオブジェクトを取得し DynamicFrame を生成
        CreateDynamicFrameByS3_node = glueContext.create_dynamic_frame.from_options(
            format_options={
                "quoteChar": "\"",
                "withHeader": False,
                "separator": "\t",
                "optimizePerformance": False
            },
            connection_type="s3",
            format="csv",
            connection_options={
                "paths": [f"{source_path}"],
                "recurse": True
            },
            transformation_ctx="CreateDynamicFrameByS3_node"
        )
        
        # 処理②
        # カラム名を設定し全て文字列型で指定
        # データソースにヘッダーがないため自動で採番されたカラム名になっている
        # 値は全て LTSV の label:value 形式のため文字列型でしか扱えない(後続の処理で型変換)
        ChangeSchema_node01 = ApplyMapping.apply(
            frame=CreateDynamicFrameByS3_node,
            mappings=[
                ("col0", "string", "time", "string"),
                ("col1", "string", "remote_addr", "string"),
                ("col2", "string", "request_method", "string"),
                ("col3", "string", "request_length", "string"),
                ("col4", "string", "request_uri", "string"),
                ("col5", "string", "request_id", "string"),
                ("col6", "string", "https", "string"),
                ("col7", "string", "uri", "string"),
                ("col8", "string", "query_string", "string"),
                ("col9", "string", "status", "string"),
                ("col10", "string", "bytes_sent", "string"),
                ("col11", "string", "body_bytes_sent", "string"),
                ("col12", "string", "referer", "string"),
                ("col13", "string", "useragent", "string"),
                ("col14", "string", "forwardedfor", "string"),
                ("col15", "string", "request_time", "string"),
                ("col16", "string", "upstream_response_time", "string")
            ],
            transformation_ctx="ChangeSchema_node01"
        )
        
        # 処理③
        # LTSV 形式の値をコロンで分割し右辺を抽出・上書き
        CustomTransform_node = ProcessLogData(
            glueContext, 
            DynamicFrameCollection(
                {"ChangeSchema_node01": ChangeSchema_node01}, 
                glueContext
            )
        )

        # 処理④
        # 処理③の出力結果が DynamicFrameCollection のため、
        # 後続の S3 が受け取れないため単一の DynamicFrame を取得して渡す
        SelectFromCollection_node = SelectFromCollection.apply(
            dfc=CustomTransform_node, 
            key=list(CustomTransform_node.keys())[0], 
            transformation_ctx="SelectFromCollection_node"
        )
        
        # 処理⑤
        # ここで適切な型に変換
        ChangeSchema_node02 = ApplyMapping.apply(
            frame=SelectFromCollection_node, 
            mappings=[
                ("time", "timestamp", "time", "timestamp"),
                ("remote_addr", "string", "remote_addr", "string"),
                ("request_method", "string", "request_method", "string"),
                ("request_length", "string", "request_length", "int"),
                ("request_uri", "string", "request_uri", "string"),
                ("request_id", "string", "request_id", "string"),
                ("https", "string", "https", "string"),
                ("uri", "string", "uri", "string"),
                ("query_string", "string", "query_string", "string"),
                ("status", "string", "status", "int"),
                ("bytes_sent", "string", "bytes_sent", "int"),
                ("body_bytes_sent", "string", "body_bytes_sent", "int"),
                ("referer", "string", "referer", "string"),
                ("useragent", "string", "useragent", "string"),
                ("forwardedfor", "string", "forwardedfor", "string"),
                ("request_time", "string", "request_time", "float"),
                ("upstream_response_time", "string", "upstream_response_time", "float"),
                ("processed_uri", "string", "processed_uri", "string")
            ], 
            transformation_ctx="ChangeSchema_node02"
        )
        
        # 処理⑥
        # parquet に変換、 gzip に圧縮し S3 に出力
        WriteDynamicFrameForS3_node = glueContext.write_dynamic_frame.from_options(
            frame=ChangeSchema_node02, 
            connection_type="s3", 
            format="glueparquet", 
            connection_options={
                "path": f"{destination_path}", 
                "partitionKeys": []
            }, 
            format_options={"compression": f"{compression_codec}"}, 
            transformation_ctx="AmazonS3_node"
        )
    
    # job 終了、bookmark 更新
    job.commit()
    logger.info("Job committed successfully")

except Exception as e:
    logger.error(f"An error occurred: {e}")
    job.commit()
    raise

最後に Job Schedules で定期実行の設定をしてやれば、毎日 1 日分のログを Athena による検索用の S3 に出力してくれる。

また、Job Bookmark 機能を有効化すると、既に処理したデータは次回以降は処理対象外となる。
逆に有効化しないと毎回全量に対し処理してしまうので、Job の実行時間もかかるし S3 に同じ内容のオブジェクトがどんどん溜まってしまうので要注意。
ブックマークキーは特に指定がなければプライマリキーを使用するが、カスタマイズも可能。

以上で Athena でのクエリ検索準備は完了。

Athena でクエリ検索する

パーティションを使用

作成したパーティションを WHERE 句で指定してやるだけ。

SELECT *
FROM log_analysis_prd.nginx_access_log
WHERE month = '202405'
  AND service_name = 'service1'
  AND environment = 'production'

ネストされた JSON データへのアクセス

以下では、JSON 型の文字列へのアクセスと、JSON 型のデータそれぞれに対するアクセスをしている。
アプリログ用のスクリプトにコメントしているが、ネストされた request.parameters の中のキーが固定ではなくスキーマの定義ができず、 request カラム全体を文字列型で保持しており、純粋な JSON データへのアクセス法が使えないため。
一方純粋な JSON データ(response カラム)は . でアクセス可能

SELECT
    time,
    request_id,
    unix_epoch,
    pid,
    agent,
    ip,
    json_extract_scalar(request, '$.method') AS request_method,
    json_extract_scalar(request, '$.path') AS request_path,
    json_extract_scalar(request, '$.parameters.refresh_token') AS refresh_token,
    json_extract_scalar(request, '$.parameters.grant_type') AS grant_type,
    json_extract_scalar(request, '$.parameters.controller') AS controller,
    json_extract_scalar(request, '$.parameters.action') AS action,
    json_extract_scalar(request, '$.parameters.scope') AS scope,
    json_extract_scalar(request, '$.parameters.id') AS id,
    response.status AS response_status,
    response.body AS response_body
FROM
    log_analysis_prd.api_log_prd

特定ファイルのみを対象とする

SELECT *
FROM log_analysis_prd.api_log_prd
WHERE "$path" = 's3://log-store-for-athena-query/production/service1/web/nginx/access/202408/run-AmazonS3_node-3-part-block-0-r-00234-gzip.parquet'

以上で Athena を使用したサクサクログ検索ライフが送れるようになった。
どこかで時間が取れたら Terraform か何かで管理したい、、、

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?