AWS AthenaでUDF(Python編)- athena-python-udfを使ってみた
はじめに
Amazon Athenaでは、ユーザー定義関数(UDF)を使ってカスタム処理をSQLクエリ内で実行できます。公式にはJavaでの実装が推奨されていますが、athena-python-udfライブラリを使えばPythonで簡単にUDFを作成できます。
この記事では、athena-python-udfを使ったUDFの作成からデプロイ、実行までを解説します。
athena-python-udfとは
athena-python-udfは、AthenaのUDFプロトコルをPythonで実装したライブラリです。
メリット
- Javaが不要 - Pythonだけで完結
- 豊富なライブラリ - numpy、pandas、その他Pythonライブラリを活用可能
- 簡単な実装 - 数行のコードでUDFが作れる
制約事項
| 項目 | 制限 |
|---|---|
| データチャンク | Athenaは約1MBのチャンクでデータを送信 |
| 最大レスポンスサイズ | 1チャンクあたり6MB |
| ZIPファイルサイズ | PyArrowを使用するため約50MB |
実装手順
1. ライブラリのインストール
pip install athena-python-udf
2. UDFの実装
文字列を小文字に変換するシンプルなUDFを作成してみます。
# lower.py
from typing import Any
from athena_udf import BaseAthenaUDF
from pyarrow import Schema
class SimpleVarcharUDF(BaseAthenaUDF):
@staticmethod
def handle_athena_record(
input_schema: Schema, output_schema: Schema, arguments: list[Any]
) -> Any:
"""
文字列を小文字に変換するUDF。
Args:
input_schema: 入力スキーマ
output_schema: 出力スキーマ
arguments: 引数リスト(最初の要素が対象文字列)
Returns:
小文字に変換された文字列
"""
return arguments[0].lower()
# Lambda ハンドラ
lambda_handler = SimpleVarcharUDF().lambda_handler
3. ポイント解説
BaseAthenaUDFクラス
athena_udf.BaseAthenaUDFを継承し、handle_athena_recordメソッドを実装します。
class SimpleVarcharUDF(BaseAthenaUDF):
@staticmethod
def handle_athena_record(
input_schema: Schema,
output_schema: Schema,
arguments: list[Any]
) -> Any:
# ここにカスタム処理を実装
return arguments[0].lower()
パラメータ
| パラメータ | 説明 |
|---|---|
input_schema |
入力データのPyArrowスキーマ |
output_schema |
出力データのPyArrowスキーマ |
arguments |
UDFに渡された引数のリスト |
Lambdaハンドラ
最後に、Lambda関数として呼び出すためのハンドラを定義します。
lambda_handler = SimpleVarcharUDF().lambda_handler
4. デプロイ
依存関係をパッケージ化
# 依存関係をインストール
pip install athena-python-udf -t ./package
# ソースコードを追加
cp lower.py ./package/
# ZIPファイルを作成
cd package && zip -r ../lower.zip .
AWS Lambdaにデプロイ
aws lambda create-function \
--function-name lower \
--runtime python3.11 \
--handler lower.lambda_handler \
--zip-file fileb://lower.zip \
--role arn:aws:iam::ACCOUNT_ID:role/lambda-execution-role \
--timeout 60 \
--memory-size 512
または、AWS SAM / Serverless Frameworkを使ってデプロイすることも可能です。
5. Athenaでの使用
クエリエディタの最上部にUSING EXTERNAL FUNCTION句を記述することで、自作のPythonロジックをSQL関数として呼び出せます。
USING EXTERNAL FUNCTION lower(col1 varchar) RETURNS varchar LAMBDA 'lower'
SELECT
lower('FooBar') AS result
実行結果
| result |
|---|
| foobar |
USING EXTERNAL FUNCTION句の構文
USING EXTERNAL FUNCTION 関数名(引数名 データ型, ...)
RETURNS 戻り値のデータ型
LAMBDA 'Lambda関数名またはARN'
| 要素 | 説明 |
|---|---|
関数名 |
SQL内で使用する関数名 |
引数名 データ型 |
引数の名前と型(varchar, integer, double など) |
RETURNS |
戻り値のデータ型 |
LAMBDA |
UDFを実装したLambda関数の名前またはARN |
実践的な使用例
-- 1. Python で作成した Lambda 関数を定義
USING EXTERNAL FUNCTION my_python_func(input_str VARCHAR)
RETURNS VARCHAR
LAMBDA 'arn:aws:lambda:ap-northeast-1:123456789012:function:my-python-udf'
-- 2. 通常の SQL 関数のように呼び出す
SELECT
name,
my_python_func(name) AS processed_name
FROM
my_database.my_table;
このように、USING EXTERNAL FUNCTION句でUDFを定義すれば、あとは通常のSQL関数と同じ感覚で使用できます。
USING句を毎回書くのを回避する方法
「クエリの冒頭に毎回書く」のが基本ですが、ビュー(View)に定義を閉じ込めることで回避できます。
一度USING句を含んだクエリを「ビュー」として保存してしまえば、以降はそのビューをSELECTするだけで済みます。
-- 初回のみ:ビューを作成
CREATE OR REPLACE VIEW processed_data_view AS
USING EXTERNAL FUNCTION my_python_func(input_str VARCHAR)
RETURNS VARCHAR
LAMBDA 'arn:aws:lambda:...'
SELECT
*,
my_python_func(raw_column) AS processed_column
FROM
my_table;
-- 2回目以降:これだけでOK(USING句は不要)
SELECT * FROM processed_data_view WHERE id = 100;
ビューを使うことで、UDFの定義を1箇所にまとめられ、クエリがシンプルになります。
なぜ「最上部」なのか
Athena(Presto/Trinoベース)の仕様上、USING EXTERNAL FUNCTIONは**クエリ実行の宣言(Declaration)**として扱われます。
| 項目 | 説明 |
|---|---|
| スコープ | そのクエリの実行中のみ有効 |
| 永続化 | 標準のSQL関数のようにデータベース(Glue Catalog)に「関数そのもの」を登録・永続化することは現在できません |
関数を永続化できないことが、ビューを推奨する最大の理由です。ビューに定義を閉じ込めることで、実質的に永続化と同じ効果を得られます。
ローカル環境からUDFを実行する
ローカル環境(自分のPC)からでも、PythonやCLIを使ってAthenaのUDFを呼び出すことが可能です。
Python (boto3) から実行する
エンジニアが最もよく使う方法です。boto3ライブラリを使用して、SQL文の先頭にUSING句を結合して送信します。
import boto3
athena = boto3.client('athena', region_name='ap-northeast-1')
# SQL文の中にUSING句を含める
query = """
USING EXTERNAL FUNCTION my_func(col1 VARCHAR)
RETURNS VARCHAR
LAMBDA 'arn:aws:lambda:ap-northeast-1:123456789012:function:my-python-udf'
SELECT my_func(name) FROM my_db.my_table LIMIT 10
"""
response = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': 'my_db'},
ResultConfiguration={'OutputLocation': 's3://my-athena-results-bucket/'}
)
print(f"Query Execution ID: {response['QueryExecutionId']}")
ローカル実行時のポイント
① 認証(IAM)
ローカルから実行する場合、実行しているIAMユーザー/ロールに以下の権限が必要です。
| 権限 | 説明 |
|---|---|
athena:StartQueryExecution |
Athenaクエリを実行する |
athena:GetQueryExecution |
クエリの状態を確認する |
athena:GetQueryResults |
クエリ結果を取得する |
lambda:InvokeFunction |
UDFとして指定したLambdaを叩く |
s3:GetObject / s3:PutObject
|
結果の読み書き |
② AWS認証情報の設定
# 環境変数で設定
export AWS_ACCESS_KEY_ID=your_access_key
export AWS_SECRET_ACCESS_KEY=your_secret_key
export AWS_DEFAULT_REGION=ap-northeast-1
# または ~/.aws/credentials を使用
aws configure
③ 結果の取得
start_query_executionは非同期で実行されるため、結果を取得するには別途ポーリングが必要です。
import time
def wait_for_query(athena, execution_id):
"""クエリ完了を待機"""
while True:
response = athena.get_query_execution(QueryExecutionId=execution_id)
state = response['QueryExecution']['Status']['State']
if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
return state
time.sleep(1)
def get_query_results(athena, execution_id):
"""クエリ結果を取得"""
response = athena.get_query_results(QueryExecutionId=execution_id)
return response['ResultSet']['Rows']
# 使用例
execution_id = response['QueryExecutionId']
state = wait_for_query(athena, execution_id)
if state == 'SUCCEEDED':
results = get_query_results(athena, execution_id)
for row in results:
print(row)
実践例:複数引数のUDF
複数の引数を受け取るUDFも簡単に作成できます。
from typing import Any
from athena_udf import BaseAthenaUDF
from pyarrow import Schema
class ConcatUDF(BaseAthenaUDF):
@staticmethod
def handle_athena_record(
input_schema: Schema, output_schema: Schema, arguments: list[Any]
) -> Any:
"""複数の文字列を連結するUDF"""
separator = arguments[0] if arguments[0] else ""
values = [str(arg) for arg in arguments[1:] if arg is not None]
return separator.join(values)
lambda_handler = ConcatUDF().lambda_handler
USING EXTERNAL FUNCTION concat_with_sep(sep varchar, s1 varchar, s2 varchar)
RETURNS varchar LAMBDA 'concat_udf'
SELECT concat_with_sep('-', 'Hello', 'World') AS result
-- 結果: Hello-World
ログ出力
UDF内でログを出力する場合は、Pythonの標準loggingモジュールを使用します。
import logging
from typing import Any
from athena_udf import BaseAthenaUDF
from pyarrow import Schema
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class LoggingUDF(BaseAthenaUDF):
@staticmethod
def handle_athena_record(
input_schema: Schema, output_schema: Schema, arguments: list[Any]
) -> Any:
result = arguments[0].lower()
logger.info(f"Input: {arguments[0]} -> Output: {result}")
return result
lambda_handler = LoggingUDF().lambda_handler
UDFのログは、呼び出し元のLambda関数(APIバックエンドなど)ではなく、UDF用Lambda関数のCloudWatch Logsグループ(/aws/lambda/<関数名>)に出力されます。
トラブルシューティング
1. インポートエラー
インポート "athena_udf" を解決できませんでした
原因: パッケージがインストールされていない、またはIDEが認識していない
解決策:
pip install athena-python-udf
2. 型スタブエラー
"athena_udf" の stub ファイルが見つかりません
原因: athena-python-udfには型情報(.pyiファイル)が含まれていない
解決策: pyproject.tomlに以下を追加
[tool.pyright]
reportMissingTypeStubs = false
または、インポート行に# type: ignoreを追加
from athena_udf import BaseAthenaUDF # type: ignore[import-untyped]
3. Lambda関数のタイムアウト
原因: デフォルトの3秒では処理が完了しない
解決策: Lambda関数のタイムアウトを60秒以上に設定
4. メモリ不足
原因: PyArrowの処理に十分なメモリがない
解決策: Lambda関数のメモリを512MB以上に設定
IAMポリシー
UDFを使用するには、以下のIAM権限が必要です。
Athena側(クエリ実行者)
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": "arn:aws:lambda:*:*:function:lower"
}
]
}
Lambda側(UDF実行)
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*"
}
]
}
まとめ
athena-python-udfを使えば、Pythonで簡単にAthena UDFを作成できます。
| ポイント | 内容 |
|---|---|
| シンプルな実装 |
BaseAthenaUDFを継承してhandle_athena_recordを実装するだけ |
| Pythonエコシステム | pandas、numpyなどのライブラリが使える |
| 柔軟なデプロイ | AWS Lambda + ZIPファイル、SAM、Serverless Frameworkなど |
| ビューで永続化 |
USING句をビューに閉じ込めることで実質的に永続化 |
JavaでのUDF開発に抵抗がある方は、ぜひ試してみてください!