ELBログなど既に保存済みのログに対して、後からAthenaのテーブルを作成した際に、
手でパーティションを設定するのは大変だなぁと思い書いてみました。
Amazon Athena とは
https://docs.aws.amazon.com/ja_jp/athena/latest/ug/what-is.html
AWS Lambda とは
https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/welcome.html
やりたいこと
・Athenaのテーブルにパーティションを設定する。
・年、月、日と分けられたフォルダのパーティションを追加する。
bucket/dir1/dir2/dirN
└ YYYY
└ YYYY
└MM
└MM
└DD
└DD
└log_file01.txt
└log_file02.txt
└log_fileNN.txt
書いたものその1
結果からいうと、1つめはデータ量の問題でタイムアウトしてしまい使い物になりませんでした。
せっかくなのでログとして残しておきます。
ロール
使用するIAMロールに設定するポリシーは以下。
・AmazonAthenaFullAccess
・AmazonS3ReadOnlyAccess
・AWSLambdaExecute
環境変数
以下を設定。
・S3_BUCKET_NAME 例:bucket-name
・S3_BUCKET_PREFIX 例:dir_1/dir_2
・RESULT_OUTPUT_S3_DIR 例:s3://aws-athena-query-results-**************-ap-northeast-1/
・TARGET_DB 例:test_db
・TARGET_LOCATION 例:s3://bucket-name/dir_1/dir_2
・TARGET_TABLE 例:test_table
・RETRY_COUNT 例:20
タイムアウト
最大値(5分)
コード
書いたLambdaのコードは以下。Python3を使用。
import boto3
import os
import re
import time
# S3のファイル一覧からAthenaのパーティション設定用Query作成し実行する
def lambda_handler(event, context):
file_pass_list = s3_resource_files()
target_date_list = extract_date_from_file_path(file_pass_list)
athena_querys = build_athena_query_list(target_date_list)
for x in athena_querys:
query_response = athena_query_execution(x)
athena_query_result(query_response['QueryExecutionId'])
# S3バケット内のファイル一覧取得
def s3_resource_files():
s3_client = boto3.client('s3')
S3_BUCKET_NAME = os.environ['S3_BUCKET_NAME']
S3_BUCKET_PREFIX = os.environ['S3_BUCKET_PREFIX']
contents = []
next_token = ''
while True:
if next_token == '':
response = s3_client.list_objects_v2(
Bucket=S3_BUCKET_NAME,
Prefix=S3_BUCKET_PREFIX
)
else:
response = s3_client.list_objects_v2(
Bucket=S3_BUCKET_NAME,
Prefix=S3_BUCKET_PREFIX,
ContinuationToken=next_token
)
# endswithで末尾が「/」は除く(ファイルが存在するフォルダのみ抽出)
file_pass_list = [
content.get('Key')
for content in response.get('Contents')
if not content.get('Key').endswith('/')
]
contents.extend(file_pass_list)
if 'NextContinuationToken' in response:
next_token = response['NextContinuationToken']
else:
break
print(len(contents))
return contents
# ファイルパスのリストから日付辞書リスト作成
def extract_date_from_file_path(file_pass_list):
pattern = r"^.+/(\d{4})/(\d{2})/(\d{2})/.+$"
splited_file_pass_list = [re.match(pattern, x) for x in file_pass_list]
# 正規表現にマッチしなかった場合、Noneになるので除外しつつ抽出
target_date_list = [
{
"year":x[1],
"month":x[2],
"day":x[3]
}
for x in splited_file_pass_list
if not x is None
]
print(len(target_date_list))
return target_date_list
# Athenaのパーティション設定用クエリリスト作成
def build_athena_query_list(target_date_list):
# setを使って重複するQueryは除く
query_set = set()
if len(target_date_list) > 0:
for target_date in target_date_list:
query = build_athena_add_partition_query(target_date)
query_set.add(query)
print(len(query_set))
return query_set
# Athenaのパーティション設定用クエリ作成
def build_athena_add_partition_query(target_date):
table = os.environ['TARGET_TABLE']
location_prefix = os.environ['TARGET_LOCATION']
# 「IF NOT EXISTS」で存在しない場合のみパーティションを追加するクエリにしている
# https://docs.aws.amazon.com/ja_jp/athena/latest/ug/alter-table-add-partition.html
query_form = "ALTER TABLE {table} " \
+ "ADD IF NOT EXISTS PARTITION (year='{year}', month='{month}', day='{day}') "\
+ "location '{location_prefix}/{year}/{month}/{day}/';"
query = query_form.format(
table = table,
year = target_date["year"],
month = target_date["month"],
day = target_date["day"],
location_prefix = location_prefix
)
return query
# Athenaへクエリ実行
def athena_query_execution(query):
print("QUERY : " + query)
DB = os.environ['TARGET_DB']
RESULT_OUTPUT_S3_DIR = os.environ['RESULT_OUTPUT_S3_DIR']
client = boto3.client('athena')
query_response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': DB
},
ResultConfiguration={
'OutputLocation': RESULT_OUTPUT_S3_DIR,
}
)
return query_response
# Athenaへのクエリ実行状態確認
def athena_query_result(query_execution_id):
RETRY_COUNT = int(os.environ['RETRY_COUNT'])
client = boto3.client('athena')
# ステータス確認
for i in range(0, RETRY_COUNT):
query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
#print(query_status)
query_execution_status = query_status['QueryExecution']['Status']['State']
print("STATUS:" + query_execution_status)
if query_execution_status == 'SUCCEEDED':
break
if query_execution_status == 'FAILED':
# raise Exception('FAILED')
break
else:
time.sleep(1)
else:
client.stop_query_execution(QueryExecutionId=query_execution_id)
print('TIME OUT')
処理概要
1.S3のファイル一覧取得
2.ファイル一覧から年月日を抽出
3.抽出した年月日からパーティション設定用のクエリ一覧作成
4.クエリ一覧を順次実行
テスト環境で実行している時は問題なく動いていたのですが、
本番環境で実行すると、データが多すぎてタイムアウトしてしまいました。
既に3年分もたまっており、そりゃそうですよねぇな結果に・・。
あと無駄な処理もあると思われ・・。
と言う訳で、妥協案が以下です。
書いたものその2 (妥協案)
妥協案は日付を指定して、フォルダあるなし関係なしに設定するものです。
毎日日付フォルダが作成される前提で作成しています(妥協)。
ロール
使用するIAMロールに設定するポリシーは以下。
・AmazonAthenaFullAccess
・AmazonS3ReadOnlyAccess
・AWSLambdaExecute
環境変数
以下を設定。
・RESULT_OUTPUT_S3_DIR 例:s3://aws-athena-query-results-**************-ap-northeast-1/
・TARGET_DB 例:test_db
・TARGET_TABLE 例:test_table
・TARGET_LOCATION 例:s3://bucket-name/dir_1/dir_2
・START_DATE 例:2017/01/01
・END_DATE 例:2018/01/01
タイムアウト
最大値(5分)
コード
書いたLambdaのコードは以下。Python3を使用。
import boto3
import os
import datetime
from dateutil.relativedelta import relativedelta
# S3のファイル一覧からAthenaのパーティション設定用Query作成し実行する
def lambda_handler(event, context):
target_date_list = build_target_date_list()
athena_querys = build_athena_query_list(target_date_list)
for x in athena_querys:
query_response = athena_query_execution(x)
# 開始日、終了日から日付辞書リスト作成
def build_target_date_list():
start_date = os.environ['START_DATE'].split('/')
target_date = datetime.date(int(start_date[0]), int(start_date[1]), int(start_date[2]))
end_date = os.environ['END_DATE'].split('/')
target_end_date = datetime.date(int(end_date[0]), int(end_date[1]), int(end_date[2]))
print(target_date)
print(target_end_date)
target_date_list = []
while True:
target_date_list.append(
{
"year":target_date.strftime("%Y"),
"month":target_date.strftime("%m"),
"day":target_date.strftime("%d")
}
)
# 次の日取得
target_date = target_date + relativedelta(days=1)
if (target_date >= target_end_date):
break
print(len(target_date_list))
return target_date_list
# Athenaのパーティション設定用クエリリスト作成
def build_athena_query_list(target_date_list):
# setを使って重複するQueryは除く
query_set = set()
if len(target_date_list) > 0:
for target_date in target_date_list:
query = build_athena_add_partition_query(target_date)
query_set.add(query)
print(len(query_set))
return query_set
# Athenaのパーティション設定用クエリ作成
def build_athena_add_partition_query(target_date):
table = os.environ['TARGET_TABLE']
location_prefix = os.environ['TARGET_LOCATION']
# 「IF NOT EXISTS」で存在しない場合のみパーティションを追加するクエリ
query_form = "ALTER TABLE {table} " \
+ "ADD IF NOT EXISTS PARTITION (year='{year}', month='{month}', day='{day}') "\
+ "location '{location_prefix}/{year}/{month}/{day}/';"
query = query_form.format(
table = table,
year = target_date["year"],
month = target_date["month"],
day = target_date["day"],
location_prefix = location_prefix
)
return query
# Athenaへクエリ実行
def athena_query_execution(query):
print("QUERY : " + query)
DB = os.environ['TARGET_DB']
RESULT_OUTPUT_S3_DIR = os.environ['RESULT_OUTPUT_S3_DIR']
client = boto3.client('athena')
query_response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': DB
},
ResultConfiguration={
'OutputLocation': RESULT_OUTPUT_S3_DIR,
}
)
return query_response
処理概要
1.開始日、終了日から日付辞書リスト作成
2.日付辞書リストからパーティション設定用のクエリ一覧作成
3.クエリ一覧を順次実行
これを1年間単位で実行することで何とか設定できました。
あとはこれをもとに改良して当日分の設定用を作成、
CloudWatchのイベントで毎日起動してやればAthenaでクエリーがいつでも投げられる!
パーティション数の制限について
日単位でパーティションを設定しているので、
制限に引っかからないか心配になったので調べてみました。
結論から言うと、大丈夫!たぶん!(いつまでこのシステムを動かすかによりますが・・。)
Athenaのドキュメントでは制限について「テーブルあたりのパーティション数は 20,000 です。」とありますが、
その前段では以下の記述があり、Glueの制限が適用されるとのこと。
AWS Glue が提供されているリージョンで Athena を使用する場合は、AWS Glue カタログに移行します。
AWS Glue に移行済みである場合、Athena のテーブル、データベース、およびパーティションに対するサービスの制限については、「AWS Glue の制限」を参照してください。
AWS Glueの制限だと「テーブルあたりのパーティション数は1,000,000」でしたー。
Amazon Athena - ユーザーガイド - サービス制限
AWS Glue - 開発者ガイド - AWS Glue の制限
おわり
いまいちスマートではないですが設定できました。
でかいデータを扱うのはやっぱり難しいですね。
とりあえず、今回は以上です。