1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS AthenaでUDF(Python編)- athena-python-udfを使ってみた

Posted at

AWS AthenaでUDF(Python編)- athena-python-udfを使ってみた

はじめに

Amazon Athenaでは、ユーザー定義関数(UDF)を使ってカスタム処理をSQLクエリ内で実行できます。公式にはJavaでの実装が推奨されていますが、athena-python-udfライブラリを使えばPythonで簡単にUDFを作成できます。

この記事では、athena-python-udfを使ったUDFの作成からデプロイ、実行までを解説します。

athena-python-udfとは

athena-python-udfは、AthenaのUDFプロトコルをPythonで実装したライブラリです。

メリット

  • Javaが不要 - Pythonだけで完結
  • 豊富なライブラリ - numpy、pandas、その他Pythonライブラリを活用可能
  • 簡単な実装 - 数行のコードでUDFが作れる

制約事項

項目 制限
データチャンク Athenaは約1MBのチャンクでデータを送信
最大レスポンスサイズ 1チャンクあたり6MB
ZIPファイルサイズ PyArrowを使用するため約50MB

実装手順

1. ライブラリのインストール

pip install athena-python-udf

2. UDFの実装

文字列を小文字に変換するシンプルなUDFを作成してみます。

# lower.py
from typing import Any

from athena_udf import BaseAthenaUDF
from pyarrow import Schema


class SimpleVarcharUDF(BaseAthenaUDF):
    @staticmethod
    def handle_athena_record(
        input_schema: Schema, output_schema: Schema, arguments: list[Any]
    ) -> Any:
        """
        文字列を小文字に変換するUDF。

        Args:
            input_schema: 入力スキーマ
            output_schema: 出力スキーマ
            arguments: 引数リスト(最初の要素が対象文字列)

        Returns:
            小文字に変換された文字列
        """
        return arguments[0].lower()


# Lambda ハンドラ
lambda_handler = SimpleVarcharUDF().lambda_handler

3. ポイント解説

BaseAthenaUDFクラス

athena_udf.BaseAthenaUDFを継承し、handle_athena_recordメソッドを実装します。

class SimpleVarcharUDF(BaseAthenaUDF):
    @staticmethod
    def handle_athena_record(
        input_schema: Schema, 
        output_schema: Schema, 
        arguments: list[Any]
    ) -> Any:
        # ここにカスタム処理を実装
        return arguments[0].lower()

パラメータ

パラメータ 説明
input_schema 入力データのPyArrowスキーマ
output_schema 出力データのPyArrowスキーマ
arguments UDFに渡された引数のリスト

Lambdaハンドラ

最後に、Lambda関数として呼び出すためのハンドラを定義します。

lambda_handler = SimpleVarcharUDF().lambda_handler

4. デプロイ

依存関係をパッケージ化

# 依存関係をインストール
pip install athena-python-udf -t ./package

# ソースコードを追加
cp lower.py ./package/

# ZIPファイルを作成
cd package && zip -r ../lower.zip .

AWS Lambdaにデプロイ

aws lambda create-function \
  --function-name lower \
  --runtime python3.11 \
  --handler lower.lambda_handler \
  --zip-file fileb://lower.zip \
  --role arn:aws:iam::ACCOUNT_ID:role/lambda-execution-role \
  --timeout 60 \
  --memory-size 512

または、AWS SAM / Serverless Frameworkを使ってデプロイすることも可能です。

5. Athenaでの使用

クエリエディタの最上部にUSING EXTERNAL FUNCTION句を記述することで、自作のPythonロジックをSQL関数として呼び出せます。

USING EXTERNAL FUNCTION lower(col1 varchar) RETURNS varchar LAMBDA 'lower'

SELECT 
    lower('FooBar') AS result

実行結果

result
foobar

USING EXTERNAL FUNCTION句の構文

USING EXTERNAL FUNCTION 関数名(引数名 データ型, ...) 
RETURNS 戻り値のデータ型 
LAMBDA 'Lambda関数名またはARN'
要素 説明
関数名 SQL内で使用する関数名
引数名 データ型 引数の名前と型(varchar, integer, double など)
RETURNS 戻り値のデータ型
LAMBDA UDFを実装したLambda関数の名前またはARN

実践的な使用例

-- 1. Python で作成した Lambda 関数を定義
USING EXTERNAL FUNCTION my_python_func(input_str VARCHAR) 
RETURNS VARCHAR 
LAMBDA 'arn:aws:lambda:ap-northeast-1:123456789012:function:my-python-udf'

-- 2. 通常の SQL 関数のように呼び出す
SELECT 
    name, 
    my_python_func(name) AS processed_name
FROM 
    my_database.my_table;

このように、USING EXTERNAL FUNCTION句でUDFを定義すれば、あとは通常のSQL関数と同じ感覚で使用できます。

USING句を毎回書くのを回避する方法

「クエリの冒頭に毎回書く」のが基本ですが、ビュー(View)に定義を閉じ込めることで回避できます。

一度USING句を含んだクエリを「ビュー」として保存してしまえば、以降はそのビューをSELECTするだけで済みます。

-- 初回のみ:ビューを作成
CREATE OR REPLACE VIEW processed_data_view AS
USING EXTERNAL FUNCTION my_python_func(input_str VARCHAR) 
    RETURNS VARCHAR 
    LAMBDA 'arn:aws:lambda:...'
SELECT 
    *, 
    my_python_func(raw_column) AS processed_column
FROM 
    my_table;
-- 2回目以降:これだけでOK(USING句は不要)
SELECT * FROM processed_data_view WHERE id = 100;

ビューを使うことで、UDFの定義を1箇所にまとめられ、クエリがシンプルになります。

なぜ「最上部」なのか

Athena(Presto/Trinoベース)の仕様上、USING EXTERNAL FUNCTIONは**クエリ実行の宣言(Declaration)**として扱われます。

項目 説明
スコープ そのクエリの実行中のみ有効
永続化 標準のSQL関数のようにデータベース(Glue Catalog)に「関数そのもの」を登録・永続化することは現在できません

関数を永続化できないことが、ビューを推奨する最大の理由です。ビューに定義を閉じ込めることで、実質的に永続化と同じ効果を得られます。

ローカル環境からUDFを実行する

ローカル環境(自分のPC)からでも、PythonやCLIを使ってAthenaのUDFを呼び出すことが可能です。

Python (boto3) から実行する

エンジニアが最もよく使う方法です。boto3ライブラリを使用して、SQL文の先頭にUSING句を結合して送信します。

import boto3

athena = boto3.client('athena', region_name='ap-northeast-1')

# SQL文の中にUSING句を含める
query = """
USING EXTERNAL FUNCTION my_func(col1 VARCHAR) 
RETURNS VARCHAR 
LAMBDA 'arn:aws:lambda:ap-northeast-1:123456789012:function:my-python-udf'
SELECT my_func(name) FROM my_db.my_table LIMIT 10
"""

response = athena.start_query_execution(
    QueryString=query,
    QueryExecutionContext={'Database': 'my_db'},
    ResultConfiguration={'OutputLocation': 's3://my-athena-results-bucket/'}
)

print(f"Query Execution ID: {response['QueryExecutionId']}")

ローカル実行時のポイント

① 認証(IAM)

ローカルから実行する場合、実行しているIAMユーザー/ロールに以下の権限が必要です。

権限 説明
athena:StartQueryExecution Athenaクエリを実行する
athena:GetQueryExecution クエリの状態を確認する
athena:GetQueryResults クエリ結果を取得する
lambda:InvokeFunction UDFとして指定したLambdaを叩く
s3:GetObject / s3:PutObject 結果の読み書き

② AWS認証情報の設定

# 環境変数で設定
export AWS_ACCESS_KEY_ID=your_access_key
export AWS_SECRET_ACCESS_KEY=your_secret_key
export AWS_DEFAULT_REGION=ap-northeast-1

# または ~/.aws/credentials を使用
aws configure

③ 結果の取得

start_query_executionは非同期で実行されるため、結果を取得するには別途ポーリングが必要です。

import time

def wait_for_query(athena, execution_id):
    """クエリ完了を待機"""
    while True:
        response = athena.get_query_execution(QueryExecutionId=execution_id)
        state = response['QueryExecution']['Status']['State']
        
        if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            return state
        
        time.sleep(1)

def get_query_results(athena, execution_id):
    """クエリ結果を取得"""
    response = athena.get_query_results(QueryExecutionId=execution_id)
    return response['ResultSet']['Rows']

# 使用例
execution_id = response['QueryExecutionId']
state = wait_for_query(athena, execution_id)

if state == 'SUCCEEDED':
    results = get_query_results(athena, execution_id)
    for row in results:
        print(row)

実践例:複数引数のUDF

複数の引数を受け取るUDFも簡単に作成できます。

from typing import Any

from athena_udf import BaseAthenaUDF
from pyarrow import Schema


class ConcatUDF(BaseAthenaUDF):
    @staticmethod
    def handle_athena_record(
        input_schema: Schema, output_schema: Schema, arguments: list[Any]
    ) -> Any:
        """複数の文字列を連結するUDF"""
        separator = arguments[0] if arguments[0] else ""
        values = [str(arg) for arg in arguments[1:] if arg is not None]
        return separator.join(values)


lambda_handler = ConcatUDF().lambda_handler
USING EXTERNAL FUNCTION concat_with_sep(sep varchar, s1 varchar, s2 varchar) 
RETURNS varchar LAMBDA 'concat_udf'

SELECT concat_with_sep('-', 'Hello', 'World') AS result
-- 結果: Hello-World

ログ出力

UDF内でログを出力する場合は、Pythonの標準loggingモジュールを使用します。

import logging
from typing import Any

from athena_udf import BaseAthenaUDF
from pyarrow import Schema

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class LoggingUDF(BaseAthenaUDF):
    @staticmethod
    def handle_athena_record(
        input_schema: Schema, output_schema: Schema, arguments: list[Any]
    ) -> Any:
        result = arguments[0].lower()
        logger.info(f"Input: {arguments[0]} -> Output: {result}")
        return result


lambda_handler = LoggingUDF().lambda_handler

UDFのログは、呼び出し元のLambda関数(APIバックエンドなど)ではなく、UDF用Lambda関数のCloudWatch Logsグループ/aws/lambda/<関数名>)に出力されます。

トラブルシューティング

1. インポートエラー

インポート "athena_udf" を解決できませんでした

原因: パッケージがインストールされていない、またはIDEが認識していない

解決策:

pip install athena-python-udf

2. 型スタブエラー

"athena_udf" の stub ファイルが見つかりません

原因: athena-python-udfには型情報(.pyiファイル)が含まれていない

解決策: pyproject.tomlに以下を追加

[tool.pyright]
reportMissingTypeStubs = false

または、インポート行に# type: ignoreを追加

from athena_udf import BaseAthenaUDF  # type: ignore[import-untyped]

3. Lambda関数のタイムアウト

原因: デフォルトの3秒では処理が完了しない

解決策: Lambda関数のタイムアウトを60秒以上に設定

4. メモリ不足

原因: PyArrowの処理に十分なメモリがない

解決策: Lambda関数のメモリを512MB以上に設定

IAMポリシー

UDFを使用するには、以下のIAM権限が必要です。

Athena側(クエリ実行者)

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Resource": "arn:aws:lambda:*:*:function:lower"
        }
    ]
}

Lambda側(UDF実行)

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        }
    ]
}

まとめ

athena-python-udfを使えば、Pythonで簡単にAthena UDFを作成できます。

ポイント 内容
シンプルな実装 BaseAthenaUDFを継承してhandle_athena_recordを実装するだけ
Pythonエコシステム pandas、numpyなどのライブラリが使える
柔軟なデプロイ AWS Lambda + ZIPファイル、SAM、Serverless Frameworkなど
ビューで永続化 USING句をビューに閉じ込めることで実質的に永続化

JavaでのUDF開発に抵抗がある方は、ぜひ試してみてください!

参考リンク

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?