LoginSignup
6
6

More than 5 years have passed since last update.

AthenaのテーブルにLambda(Python3)でパーティションを設定する

Last updated at Posted at 2018-08-22

ELBログなど既に保存済みのログに対して、後からAthenaのテーブルを作成した際に、
手でパーティションを設定するのは大変だなぁと思い書いてみました。

Amazon Athena とは
https://docs.aws.amazon.com/ja_jp/athena/latest/ug/what-is.html

AWS Lambda とは
https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/welcome.html

やりたいこと

・Athenaのテーブルにパーティションを設定する。
・年、月、日と分けられたフォルダのパーティションを追加する。

フォルダ構成イメージ
bucket/dir1/dir2/dirN
 └ YYYY
 └ YYYY
  └MM
  └MM
   └DD
   └DD
    └log_file01.txt
    └log_file02.txt
    └log_fileNN.txt

書いたものその1

結果からいうと、1つめはデータ量の問題でタイムアウトしてしまい使い物になりませんでした。
せっかくなのでログとして残しておきます。

ロール

使用するIAMロールに設定するポリシーは以下。
・AmazonAthenaFullAccess
・AmazonS3ReadOnlyAccess
・AWSLambdaExecute

環境変数

以下を設定。
・S3_BUCKET_NAME     例:bucket-name
・S3_BUCKET_PREFIX    例:dir_1/dir_2
・RESULT_OUTPUT_S3_DIR  例:s3://aws-athena-query-results-**************-ap-northeast-1/
・TARGET_DB        例:test_db
・TARGET_LOCATION     例:s3://bucket-name/dir_1/dir_2
・TARGET_TABLE       例:test_table
・RETRY_COUNT       例:20

タイムアウト

最大値(5分)

コード

書いたLambdaのコードは以下。Python3を使用。

フォルダ数が多いとタイムアウトする
import boto3
import os
import re
import time

# S3のファイル一覧からAthenaのパーティション設定用Query作成し実行する
def lambda_handler(event, context):
    file_pass_list = s3_resource_files()
    target_date_list = extract_date_from_file_path(file_pass_list)
    athena_querys = build_athena_query_list(target_date_list)
    for x in athena_querys:
        query_response = athena_query_execution(x)
        athena_query_result(query_response['QueryExecutionId'])

# S3バケット内のファイル一覧取得
def s3_resource_files():
    s3_client = boto3.client('s3')
    S3_BUCKET_NAME = os.environ['S3_BUCKET_NAME']
    S3_BUCKET_PREFIX = os.environ['S3_BUCKET_PREFIX']
    contents = []
    next_token = ''
    while True:
        if next_token == '':
            response = s3_client.list_objects_v2(
                            Bucket=S3_BUCKET_NAME,
                            Prefix=S3_BUCKET_PREFIX
                        )
        else:
            response = s3_client.list_objects_v2(
                            Bucket=S3_BUCKET_NAME,
                            Prefix=S3_BUCKET_PREFIX,
                            ContinuationToken=next_token
                        )
        # endswithで末尾が「/」は除く(ファイルが存在するフォルダのみ抽出)
        file_pass_list = [
            content.get('Key')
            for content in response.get('Contents')
            if not content.get('Key').endswith('/')
        ]
        contents.extend(file_pass_list)
        if 'NextContinuationToken' in response:
            next_token = response['NextContinuationToken']
        else:
            break
    print(len(contents))
    return contents

# ファイルパスのリストから日付辞書リスト作成
def extract_date_from_file_path(file_pass_list):
    pattern = r"^.+/(\d{4})/(\d{2})/(\d{2})/.+$"
    splited_file_pass_list = [re.match(pattern, x) for x in file_pass_list]
    # 正規表現にマッチしなかった場合、Noneになるので除外しつつ抽出
    target_date_list = [
            {
                "year":x[1],
                "month":x[2],
                "day":x[3]
            }
            for x in splited_file_pass_list
            if not x is None
        ]
    print(len(target_date_list))
    return target_date_list

# Athenaのパーティション設定用クエリリスト作成
def build_athena_query_list(target_date_list):
    # setを使って重複するQueryは除く
    query_set = set()
    if len(target_date_list) > 0:
        for target_date in target_date_list:
            query = build_athena_add_partition_query(target_date)
            query_set.add(query)
    print(len(query_set))
    return query_set

# Athenaのパーティション設定用クエリ作成
def build_athena_add_partition_query(target_date):
    table = os.environ['TARGET_TABLE']
    location_prefix = os.environ['TARGET_LOCATION']

    # 「IF NOT EXISTS」で存在しない場合のみパーティションを追加するクエリにしている
    # https://docs.aws.amazon.com/ja_jp/athena/latest/ug/alter-table-add-partition.html
    query_form = "ALTER TABLE {table} " \
                + "ADD IF NOT EXISTS PARTITION (year='{year}', month='{month}', day='{day}') "\
                + "location '{location_prefix}/{year}/{month}/{day}/';"
    query = query_form.format(
        table = table,
        year = target_date["year"],
        month = target_date["month"],
        day = target_date["day"],
        location_prefix = location_prefix
    )
    return query

# Athenaへクエリ実行
def athena_query_execution(query):
    print("QUERY : " + query)
    DB = os.environ['TARGET_DB']
    RESULT_OUTPUT_S3_DIR = os.environ['RESULT_OUTPUT_S3_DIR']
    client = boto3.client('athena')

    query_response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': DB
        },
        ResultConfiguration={
            'OutputLocation': RESULT_OUTPUT_S3_DIR,
        }
    )
    return query_response

# Athenaへのクエリ実行状態確認
def athena_query_result(query_execution_id):
    RETRY_COUNT = int(os.environ['RETRY_COUNT'])
    client = boto3.client('athena')
    # ステータス確認
    for i in range(0, RETRY_COUNT):
        query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
        #print(query_status)
        query_execution_status = query_status['QueryExecution']['Status']['State']
        print("STATUS:" + query_execution_status)

        if query_execution_status == 'SUCCEEDED':
            break
        if query_execution_status == 'FAILED':
            # raise Exception('FAILED')
            break
        else:
            time.sleep(1)
    else:
        client.stop_query_execution(QueryExecutionId=query_execution_id)
        print('TIME OUT')

処理概要

1.S3のファイル一覧取得
2.ファイル一覧から年月日を抽出
3.抽出した年月日からパーティション設定用のクエリ一覧作成
4.クエリ一覧を順次実行

テスト環境で実行している時は問題なく動いていたのですが、
本番環境で実行すると、データが多すぎてタイムアウトしてしまいました。
既に3年分もたまっており、そりゃそうですよねぇな結果に・・。
あと無駄な処理もあると思われ・・。

と言う訳で、妥協案が以下です。

書いたものその2 (妥協案)

妥協案は日付を指定して、フォルダあるなし関係なしに設定するものです。
毎日日付フォルダが作成される前提で作成しています(妥協)。

ロール

使用するIAMロールに設定するポリシーは以下。
・AmazonAthenaFullAccess
・AmazonS3ReadOnlyAccess
・AWSLambdaExecute

環境変数

以下を設定。
・RESULT_OUTPUT_S3_DIR 例:s3://aws-athena-query-results-**************-ap-northeast-1/
・TARGET_DB       例:test_db
・TARGET_TABLE     例:test_table
・TARGET_LOCATION    例:s3://bucket-name/dir_1/dir_2
・START_DATE      例:2017/01/01
・END_DATE       例:2018/01/01

タイムアウト

最大値(5分)

コード

書いたLambdaのコードは以下。Python3を使用。

START_DATE~END_DATEで指定した対象日付を設定
import boto3
import os
import datetime
from dateutil.relativedelta import relativedelta

# S3のファイル一覧からAthenaのパーティション設定用Query作成し実行する
def lambda_handler(event, context):
    target_date_list = build_target_date_list()
    athena_querys = build_athena_query_list(target_date_list)
    for x in athena_querys:
        query_response = athena_query_execution(x)

# 開始日、終了日から日付辞書リスト作成
def build_target_date_list():
    start_date = os.environ['START_DATE'].split('/')
    target_date = datetime.date(int(start_date[0]), int(start_date[1]), int(start_date[2]))

    end_date = os.environ['END_DATE'].split('/')
    target_end_date = datetime.date(int(end_date[0]), int(end_date[1]), int(end_date[2]))

    print(target_date)
    print(target_end_date)

    target_date_list = []
    while True:
        target_date_list.append(
                {
                "year":target_date.strftime("%Y"),
                "month":target_date.strftime("%m"),
                "day":target_date.strftime("%d")
                }
            )

        # 次の日取得
        target_date = target_date + relativedelta(days=1)
        if (target_date >= target_end_date):
            break

    print(len(target_date_list))
    return target_date_list

# Athenaのパーティション設定用クエリリスト作成
def build_athena_query_list(target_date_list):
    # setを使って重複するQueryは除く
    query_set = set()
    if len(target_date_list) > 0:
        for target_date in target_date_list:
            query = build_athena_add_partition_query(target_date)
            query_set.add(query)
    print(len(query_set))
    return query_set

# Athenaのパーティション設定用クエリ作成
def build_athena_add_partition_query(target_date):
    table = os.environ['TARGET_TABLE']
    location_prefix = os.environ['TARGET_LOCATION']

    # 「IF NOT EXISTS」で存在しない場合のみパーティションを追加するクエリ
    query_form = "ALTER TABLE {table} " \
                + "ADD IF NOT EXISTS PARTITION (year='{year}', month='{month}', day='{day}') "\
                + "location '{location_prefix}/{year}/{month}/{day}/';"
    query = query_form.format(
        table = table,
        year = target_date["year"],
        month = target_date["month"],
        day = target_date["day"],
        location_prefix = location_prefix
    )
    return query

# Athenaへクエリ実行
def athena_query_execution(query):
    print("QUERY : " + query)
    DB = os.environ['TARGET_DB']
    RESULT_OUTPUT_S3_DIR = os.environ['RESULT_OUTPUT_S3_DIR']
    client = boto3.client('athena')

    query_response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': DB
        },
        ResultConfiguration={
            'OutputLocation': RESULT_OUTPUT_S3_DIR,
        }
    )
    return query_response

処理概要

1.開始日、終了日から日付辞書リスト作成
2.日付辞書リストからパーティション設定用のクエリ一覧作成
3.クエリ一覧を順次実行

これを1年間単位で実行することで何とか設定できました。

あとはこれをもとに改良して当日分の設定用を作成、
CloudWatchのイベントで毎日起動してやればAthenaでクエリーがいつでも投げられる!

パーティション数の制限について

日単位でパーティションを設定しているので、
制限に引っかからないか心配になったので調べてみました。

結論から言うと、大丈夫!たぶん!(いつまでこのシステムを動かすかによりますが・・。)

Athenaのドキュメントでは制限について「テーブルあたりのパーティション数は 20,000 です。」とありますが、
その前段では以下の記述があり、Glueの制限が適用されるとのこと。

AWS Glue が提供されているリージョンで Athena を使用する場合は、AWS Glue カタログに移行します。
AWS Glue に移行済みである場合、Athena のテーブル、データベース、およびパーティションに対するサービスの制限については、「AWS Glue の制限」を参照してください。

AWS Glueの制限だと「テーブルあたりのパーティション数は1,000,000」でしたー。

Amazon Athena - ユーザーガイド - サービス制限
AWS Glue - 開発者ガイド - AWS Glue の制限

おわり

いまいちスマートではないですが設定できました。
でかいデータを扱うのはやっぱり難しいですね。

とりあえず、今回は以上です。

6
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
6