この記事は、ニフティグループ Advent Calendar 2020 22日目の記事です。
昨日は@matsu-matsuさんで「PaizaIO API でコードジャッジシステムを作ってみた」でした。
私も競技プログラミングをしていたのですが、もし社内でコンテストを開きたいときに非常に役立ちそうですね。
はじめに
以前にも増して社内でのデータ活用が活発になってきており、ETL処理の知識が重要になってきました。
社内で利用されている1.5年程前に開発されたシステムの一部でAthenaをつかってETL処理をしている箇所があるのですが、下記の問題が発生しています。
- 日次でデータが膨大に増える+クエリを複雑にしすぎてAthenaがTimeout
- View機能の使いすぎ
- 日次でパーティションが増えすぎて、Glueオブジェクトのコストが増加
- パーティション分割をしすぎ
- 読み込みデータ量削減のため、中間処理にもパーティションを作成している
- StepFunction内でLambda Function内からAthenaを読んでいるため、ステートマシンが複雑
- LambdaでAthenaクエリを実行して、Waitもしくはループでクエリの終了を判定
これらの問題はAthenaの既存機能や2020年内のアップデートで解決しやすくなったと思います。そこで本記事では、それらの機能を簡単にまとめてみました。
前提条件
本記事の例では下記のようなデータに対するクエリ記述しています。
Date,Categories,User,Cost
2018/11/01,Digital,Honda,0
2018/11/01,Book,Sasaki,669.54332
2018/11/01,Food,Sato,40.67488
2018/11/01,Digital,Sato,3644.17432
また、クエリ対象となるS3のディレクトリ構造は下記のようにHive形式となっています。
- s3://sample_data/input/year={year}/month={month}/hoge.csv
CREATE TABLE AS SELECT (CTAS)によるETL
複雑過ぎてTimeoutしてしまうクエリは、中間処理にCREATE TABLE AS SELECT (CTAS)を使うことで解決する可能性があります。
CTASはSELECTクエリの結果から新たにテーブルを作成する機能です。
その他にパーティショニングや列データ化(Parquet等)も設定することができるので、簡単なETL処理にも使われています。
似たような機能にViewがありますが、こちらは仮想的なテーブル記述のため実体を持たずクエリで仮想テーブルを用いた場合、その記述が展開されます。
そのため、Viewを多用し過ぎると非常に時間のかかるクエリになる可能性があります。
一方でCTASは実際にs3上にファイルを出力し、実テーブルを生成します。
CREATE TABLE "sample_db"."sample_ctas"
WITH (
format='PARQUET',
external_location='s3://sample-data/ctas',
partitioned_by=ARRAY['year','month']
) AS
SELECT "date",
"categories",
"user",
"cost",
element_at(split(date,'/'),1) AS year ,
element_at(split(date,'/'),2) AS month
FROM "sample_db"."sample_data"
上記の例では次のようなことを設定しています。
- 新たに
s3://sample-data/ctas/year=\${year}/${month}
を出力する - 出力するファイルフォーマットは
parquet
- yearとmonthについてパーティション分割
Partition Projectionによるパーティション数の抑制
日次でパーティション数が増大していき、Glueのオブジェクトに対するコストが高くなるケースには2020年6月に登場したPartition Projectionが役立つと思います。
Partition ProjectionはGlueデータカタログを用いず、設定からパーティションが計算されて用いられます。オブジェクトとして実体を持たないため、Glueオブジェクトのコストは発生しません。
また、date
型のパーティションに対しては(NOW-3YEARS,NOW)
のような現在からの範囲を設定することができます。
これにより、Glueクローラーやクエリにより行っていた定期的なパーティション管理が簡略化されます。
Partition Projectionを使うに当たり、下記の点には注意が必要です。
- Partition Projectionによるパーティション設定はAthenaにしか適用されない
- Redsift等からはGlueデータカタログによるパーティション設定が必要
- 空のパーティションが多すぎるとクエリが遅くなる可能性がある
CREATE EXTERNAL TABLE "sample_db"."sample_projection" (
`date` string,
`categories` string,
`user` string,
`cost` double)
PARTITIONED BY (
`year` int,
`month` int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://sample_data/input'
TBLPROPERTIES (
'classification'='csv',
'delimiter'=',',
'projection.enabled'='true',
'projection.month.interval'='1',
'projection.month.range'='1,12',
'projection.month.type'='integer',
'projection.year.digits'='4',
'projection.year.interval'='1',
'projection.year.range'='2017,2021',
'projection.year.type'='integer',
'serialization.encoding'='utf8',
'skip.header.line.count'='1',
'storage.location.template'='s3://sample_data/input/year=${year}/month=${month}/',
'transient_lastDdlTime'='1608650935',
'typeOfData'='file')
上記の例では次のようなことを設定しています。
設定名 | 値 | 説明 |
---|---|---|
projection.enabled | true | Partition Projectionの有効化 |
storage.location.template | s3://sample_data/input/year=${year}/month=${month}/ | s3パスに対するパーティション箇所の記述 |
projection.{partition_name}.type | integer | パーティションのデータ型(整数型、日付型、列挙型など) |
projection.{partition_name}.range | 1,12 | パーティションの範囲 |
projection.{partition_name}.interval | 1 | パーティション値の間隔 |
projection.year.digits | 4 | (整数型の場合)パーティション値の桁数。先頭からゼロ埋め。 |
StepFunctionsのAthena統合によるステートマシンへの組み込み
2020年10月のアップデートでLambda Functionを介さずにStepFunctionsを使用してAthenaにクエリを実行できるようになりました。
現在、利用可能なAPIは以下の4つです。それぞれこのページの権限をStepFunctionsに設定されているIAMロールに付与する必要があります。
- StartQueryExecution
- StopQueryExecution
- GetQueryExecution
- GetQueryResults
Athenaは非同期実行のため、Lambda Functionから処理を開始する場合、今まで社内で利用しているシステムではポーリングや待機をしていました。
この機能の追加により、StepFunctions内ではStartQueryExecution.sync
を利用することで簡単にクエリ完了まで待機することができるようになりました。
{
"StartAt": "Athena クエリの実行を開始します。",
"States": {
"Athena クエリの実行を開始します。": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:startQueryExecution.sync",
"Parameters": {
"QueryString": "SELECT\n\"user\",\n\"categories\",\nCAST(SUM(cost) AS decimal(12, 4)) cost\nFROM\n\"sample_db\".\"sample_projection\"\nWHERE\nuser = 'Tanaka'\nGROUP BY\n\"categories\",\n\"user\"\nORDER BY\ncost DESC",
"WorkGroup": "sample_workgroup"
},
"Next": "Athena クエリの実行結果を取得する"
},
"Athena クエリの実行結果を取得する": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:getQueryResults",
"Parameters": {
"QueryExecutionId.$": "$.QueryExecution.QueryExecutionId"
},
"End": true
}
}
}
上記の例では単純にAthenaクエリを実行して、完了まで待機したあとで実行結果を取得しています。
以下が各ステップの出力です。
「Athena クエリの実行を開始します。」のステップ出力
{
"QueryExecution": {
"Query": "SELECT\n\"user\",\n\"categories\",\nCAST(SUM(cost) AS decimal(12, 4)) cost\nFROM\n\"sample_db\".\"sample_projection\"\nWHERE\nuser = 'Tanaka'\nGROUP BY\n\"categories\",\n\"user\"\nORDER BY\ncost DESC",
"QueryExecutionContext": {},
"QueryExecutionId": "********-****-****-****-************",
"ResultConfiguration": {
"OutputLocation": "s3://sample_data/output/********-****-****-****-************.csv"
},
"StatementType": "DML",
"Statistics": {
"DataScannedInBytes": 7874805,
"EngineExecutionTimeInMillis": 952,
"QueryPlanningTimeInMillis": 279,
"QueryQueueTimeInMillis": 213,
"ServiceProcessingTimeInMillis": 30,
"TotalExecutionTimeInMillis": 1195
},
"Status": {
"CompletionDateTime": 1608659190086,
"State": "SUCCEEDED",
"SubmissionDateTime": 1608659188891
},
"WorkGroup": "sample_workgroup"
}
}
「Athena クエリの実行結果を取得する」のステップ出力
{
"ResultSet": {
"ResultSetMetadata": {
"ColumnInfo": [
{
"CaseSensitive": true,
"CatalogName": "hive",
"Label": "user",
"Name": "user",
"Nullable": "UNKNOWN",
"Precision": 2147483647,
"Scale": 0,
"SchemaName": "",
"TableName": "",
"Type": "varchar"
},
{
"CaseSensitive": true,
"CatalogName": "hive",
"Label": "categories",
"Name": "categories",
"Nullable": "UNKNOWN",
"Precision": 2147483647,
"Scale": 0,
"SchemaName": "",
"TableName": "",
"Type": "varchar"
},
{
"CaseSensitive": false,
"CatalogName": "hive",
"Label": "cost",
"Name": "cost",
"Nullable": "UNKNOWN",
"Precision": 12,
"Scale": 4,
"SchemaName": "",
"TableName": "",
"Type": "decimal"
}
]
},
"Rows": [
{
"Data": [
{
"VarCharValue": "user"
},
{
"VarCharValue": "categories"
},
{
"VarCharValue": "cost"
}
]
},
{
"Data": [
{
"VarCharValue": "Tanaka"
},
{
"VarCharValue": "Book"
},
{
"VarCharValue": "2669.8389"
}
]
},
...(略)...
]
},
"UpdateCount": 0
}
今回は特に何もせずにステートマシンを終了していますが、本来はメール/Slack通知などの後続処理をつけると思います。
利用例としてはCloudtrailやCroudFrontログの定期レポートやアラートなどができそうです。
おわりに
今回はお手軽にできるETLということでAthenaを利用しましたが、AWSでのETL手段はGlueやLambdaなど多彩にありますので要件にあったサービスを選択する必要があります。また、AWSのサービスはアップデートが多く、気がついたら課題解決が簡単になっている場合があるので情報を素早くキャッチしていきたいですね。
明日は@uyuqui1718さんです。お楽しみに!