はじめに
アプリ開発では、使用状況を可視化したいという要求がよくあり、
事前に運用を想定して分析しやすいようなデータベースへのデータ格納にすることができていればよいのですが、最初はスピード優先やミニマムに開発してといったプロジェクトの場合、
JSONデータをスキーマレスで格納してしまうといったことがあるかと思います。
その場合、通常はDBからデータを取得して、
ログイン回数やUU数、アプリの各種イベントの状況、ゲームの達成状況など、
分析用に加工処理して出力する部分のプログラミング開発が必要かと思います。
Glueを使えば、DBからのデータ抽出部分のプログラミングが不要になり、
Athenaを使えば、分析用の加工をSQLで行え、
QuickSightを使えば、表示用のWebフロント側をプログラミングする必要がない、
ということで、やってみました。
DynamoDBのデータを分析できるようにするには
AWS Glueを使用してDynamoDBテーブルからデータを抽出し、Amazon AthenaでSQLクエリを使用して分析します。
- AWS Glue クローラを設定
- DynamoDBからスキーマを検出し、AWS Glue Data Catalogにメタデータを設定
- AWS Glue ETLジョブを設定
- DynamoDBテーブルからデータを抽出し、S3に格納
- AWS Glue Data Catalogによって提供される、定義済みのスキーマに結果を格納
- Athenaの設定
- Amazon S3で抽出されたデータの外部スキーマストアとしてAWS Glue Data Catalogを使用
- SQLを使用してAmazon S3を直接クエリするために、Athenaを利用することで、データ分析を実行
Athenaで分析するには、上記だけでよいのですが、開発者以外の関係者がKPIを見たいといった場合、
4. QuickSightの設定
5. 分析用データを定期的に更新
といったことが必要になると思います。
AWS Glueでデータ抽出
AWS Glueクローラを作成
DynamoDBからデータを抽出し、データカタログを作成します。
- [AWS Glueコンソール]を開きます。
- ナビゲーションパネルで[クローラ]を選択し、[クローラの追加]を選択します。
- クローラ名(dynamodb-crawler)を入力し、[次へ]を選択します。
- [データストアの選択] リストで、[DynamoDB] を選択します。
- [テーブル名]ボックスの横にある小さなフォルダアイコンを選択し、DynamoDBが作成したテーブルを選択します。[選択] をクリックしてメニューを閉じ、[次へ] を選択します。
- [別のデータストアを追加]ページで、デフォルト設定を[いいえ]のままにして、[次へ]を選択します。
- Table name UserActivityLog
- 最後に、[別のデータストアを追加]ページで、デフォルト設定を[いいえ]のままにして、[次へ]を選択します。
- [IAMロールを選択]ページで、[新規のIAMロールを選択]をクリックし、該当IAMロールを追加し、完了したら、[次へ] を選択します。
- [このクローラのスケジュールを作成する]ページで、既定の設定を[オンデマンドで実行]とし、[次へ]をクリックします。(定期的に実行する場合は、スケジュールの実行をcron設定します。)
- クローラの出力を設定する必要があります。データベース作成で、glue_outputにし、[次へ] を選択します。(注意: アンダースコアを使うこと。)
これで、クローラがDynamoDBテーブルに対して実行され、
データカタログにテーブルのためのエントリーが作成されたので、
DynamoDBテーブルをソースとしてETLジョブを作成して実行することができます。
ETLジョブ作成と実行
以下の手順では、DynamoDBからデータを抽出し、
Amazon S3バケットに格納するAWS Glue ETLジョブを作成します。
ジョブの追加
- ジョブ名: dynamodb-etl-job-useractivitylog
- プロパティはほぼデフォルト
- ジョブのブックマークを有効化
- sourceをDyanamoDBのテーブル
- targetでデータテーブルを作成
- データストアで、S3
- useractivitylog
- s3://glue-etl-output/useractivitylog
- useractivitylog
- 形式で、JSON
- データストアで、S3
Crawlerで抽出したデータカタログを使用する場合は、
ターゲットで、「データカタログのテーブルを使用し、データターゲットを更新する」を選びます。
ジョブの実行を行うと、指定したS3のパスにファイルができます。
Athenaで分析する
Glueの設定で、DynamoDBからデータが抽出され、Amazon S3に格納されるはずです。
SQLを使用してAthenaでそのデータを分析することができます。
- Athenaコンソールに移動します。[データベース]ドロップダウンリストで、glue_outputを選択します。
テーブルを作成する
CREATE EXTERNAL TABLE `useractivitylog`(
`datetime` string,
`role` string,
`activity` string,
`value` bigint,
`userid` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://glue-etl-output/useractivitylog'
TBLPROPERTIES (
'has_encrypted_data'='false')
SELECT文の例
WITH useractivity_daygroup AS (
SELECT
DISTINCT userid,
activity,
value,
date_trunc('day', date_parse(datetime, '%Y-%m-%dT%H:%i:%s')) AS daygroup
FROM "glue_output"."useractivitylog"
WHERE activity = 'login'
)
SELECT
count(userid) AS UU,
daygroup
FROM
useractivity_daygroup
GROUP BY
daygroup
;
クエリ結果をS3に保存
- AthenaでSettingをクリックし、Query result locationにS3のバケット名「aws-athena-query-results--」を指定
- 先程のSELECTのクエリを保存しておきましょう。保存し実行した出力結果が、バケットに保存されます。
上記までで、日別のログイン回数をプログラムレスで分析できるようになりました。
JSONデータを分析する場合は、ネストした構造の場合はデシリアライズしたりする必要があるので、
こちらが参考になると思います。
Hive JSON SerDe
https://docs.aws.amazon.com/ja_jp/athena/latest/ug/json.html#hivejson
では、Athenaで分析した結果を可視化してみたいと思います。
QuickSight
QuickSightドキュメント
https://docs.aws.amazon.com/ja_jp/quicksight/latest/user/welcome.html
QuickSightのdata sourceに、Athenaのクエリ保存したS3バケットを指定し、
適当に名前をつけて、上記バケットを指定した、manifestファイルをアップロードします。
参考: manifestファイル
https://docs.aws.amazon.com/ja_jp/quicksight/latest/user/supported-manifest-file-format.html
例:
{
"fileLocations": [
{
"URIPrefixes": [
"s3://aws-athena-query-results-<id>-<region>/userActivityLogin/"
]
}
],
"globalUploadSettings": {
"format": "CSV",
"delimiter": ",",
"textqualifier": "'",
"containsHeader": "true"
}
}
データを読み込めれば、視覚化でフィールドリストを指定すると、こんな感じのグラフになります。
グラフのメニューからCSVエクスポートも可能です。
Athenaのクエリを定期実行
さて、これで、分析、視覚化というのはできましたが、
Athenaの分析を手動で行わないといけないことが問題です。
AWS CLIでクエリ実行してみる
クエリの自動実行はLambdaで行えばよいのですが、
その前に、クエリ実行時の挙動を詳しくみてみようと思います。
AWS CLIを使用して、Athenaのquery定期実行の仕様を考える
まず、AWS CLIのconfigを設定します。
$ export AWS_DEFAULT_PROFILE=my_aws_profile
AWS Consoleで保存したクエリのIDを出力する
$ aws athena list-named-queries
aws athena list-named-queries
{
"NamedQueryIds": [
"123456786-414d-44a9-82cc-8e4f9781c8a0"
]
}
QueryIDを指定してクエリの詳細を見る
$ aws athena get-named-query --named-query-id "123456786-414d-44a9-82cc-8e4f9781c8a0"
{
"NamedQuery": {
(略)
}
}
ここで指定されているQueryを指定して、クエリ実行してみる。
注意:データベース名に_(アンダースコア)以外の-(ハイフン)が入っているとエラーになる。
$ aws athena start-query-execution \
--query-string "(略)" \
--result-configuration OutputLocation=s3://aws-athena-query-results-<id>-<region>/login_uu_day-prod/
{
"QueryExecutionId": "67b1abba-3acc-4148-b541-420dfa46dd58"
}
結果確認
$ aws athena get-query-results --query-execution-id "67b1abba-3acc-4148-b541-420dfa46dd58"
上記を実行するたびに、S3にデータが増えていきます。
QuickSightで見た時に、どうなるか?ですが、
データが追加されてしまい、データが重複されて読み込まれてしまうので、
古いデータは削除する必要があります。
ですので、Lambdaでは、以下の処理が必要となります。
- 該当バケットのパス配下のファイル削除
- athenaのクエリ実行
- クエリ結果の確認
Athenaクエリを定期実行するLambda
Lambdaを実行するIAM Roleに、便宜上下記のアクセス権限を与えます。
-
CloudWatchLogsFullAccess
-
S3FullAccess
-
AmazonAthenaFullAccess
-
AWSGlueServiceRole
-
Lambdaの関数名: athena-loguin_uu_day
-
実行環境: Python3.8
-
Lambdaの実行時間を3分
-
トリガーに、CloudWatch Eventsを指定
- 名前: athena-lambda-trigger
- スケジュール式: rate(1 day)
Lambdaから関数エクスポートしたファイルを編集してCloudFormation化する
各ファイルをパッケージ化し、テンプレートとしてs3にアップロード
例: AthenaLoginUuDay.py
import os
import time
import boto3
RETRY_COUNT = 300
DATABASE = 'glue-output'
TABLE = 'useractivity'
PATH = 'login_uu_day/'
S3_BUCKET = 'aws-athena-query-results-<id>-<region>'
S3_OUTPUT = 's3://' + S3_BUCKET + '/' + PATH
QUERY = ("""
略(上記のクエリ)
""")
def lambda_handler(event, context):
query = QUERY
client = boto3.client('athena')
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': DATABASE
},
ResultConfiguration={
'OutputLocation': S3_OUTPUT,
}
)
query_execution_id = response['QueryExecutionId']
print(query_execution_id)
for i in range(1, 1 + RETRY_COUNT):
query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
query_execution_status = query_status['QueryExecution']['Status']['State']
if query_execution_status == 'SUCCEEDED':
print("STATUS:" + query_execution_status)
break
if query_execution_status == 'FAILED':
raise Exception("STATUS:" + query_execution_status)
else:
print("STATUS:" + query_execution_status)
time.sleep(i)
else:
client.stop_query_execution(QueryExecutionId=query_execution_id)
raise Exception('TIME OVER')
QUERY_FILE_FROM = PATH + query_execution_id + ".csv"
QUERY_FILE_TO = PATH + "out.csv"
# rename csv file
s3 = boto3.client('s3')
s3.copy_object(Bucket=S3_BUCKET, Key=QUERY_FILE_TO, CopySource={'Bucket': S3_BUCKET, 'Key': QUERY_FILE_FROM})
# created s3 object
s3_objects_key = []
s3_object_key_csv = QUERY_FILE_FROM
s3_objects_key.append({'Key': s3_object_key_csv})
s3_object_key_metadata = QUERY_FILE_FROM + '.metadata'
s3_objects_key.append({'Key': s3_object_key_metadata})
# delete s3 object
for i in range(1, 1 + RETRY_COUNT):
response = s3.delete_objects(
Bucket=S3_BUCKET,
Delete={
'Objects': s3_objects_key
}
)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print("delete %s complete" % s3_objects_key)
break
else:
print(response['ResponseMetadata']['HTTPStatusCode'])
time.sleep(i)
else:
raise Exception('object %s delete failed' % s3_objects_key)
後は、複数のクエリがある場合は、CloudFormationを使って、上記テンプレにしてクエリ分のFunctionを作成しておけばOK。
$ aws cloudformation package --template-file template.yaml --s3-bucket bucketname --output-template-file output-template.yaml
Successfully packaged artifacts and wrote output template to file output-template.yaml.
Execute the following command to deploy the packaged template
aws cloudformation deploy --template-file /<path-to-file>/output-template.yaml --stack-name <YOUR STACK NAME>
QuickSightで更新スケジュール設定
上記で設定した定期実行のLambadaにより、Athenaクエリ結果のデータが更新されるので、
QuickSightでもデータセットの更新スケジュールを設定します。
データの管理 > データセットを選択 > 更新スケジュール > 毎日
以上