Athenaでクエリを投げる時に色々と苦労したので、まとめてみる。
利用環境はMacです。
【背景】
- Athenaがクエリを並列で処理できるのが20並列と書いてあったが(AWSのドキュメント)実際に確かめたい
- athenaに大量のリクエストを送った場合、エラーが出るのか、queueを管理してくれるのか、してくれるならどのようにしてくれるのか知りたい
- ただ、実際にどう実現すればいいかわからない
こんな課題感で手を動かし始めました。
【実際にやったこと】
- Athenaのconsoleからクエリを投げてみた
- cliからクエリを大量に投げてみた
これらをどのようにしていったかまとめます。(いろんな記事を参考にしました)
AthenaのConsoleからクエリを投げてみた
以下簡単に、手順をまとめます。
- S3にデータを配置する
- AthenaにDBとTableを作成する
- Athenaからクエリを叩く
S3にデータを配置する
athenaから具体的にクエリを叩きたい元になるファイルをS3にアップロードします。
任意のディレクトリを作成し、その中に保存しました。
今回ファイル形式としてcsvファイル(bzip圧縮)とparquetファイルに対してクエリを叩きました。
AthenaにDBとTableを作成する
DBを作成する際には公式のドキュメントの手段通りに行いました。
Table作成には
CREATE EXTERNAL TABLE
構文を用いました。
CREATE EXTERNAL TABLE IF NOT EXISTS DB_name.table_name (
`e_1` int,
`e_2` int,
`e_3` int,
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 'S3 URI'
TBLPROPERTIES ('has_encrypted_data'='false');
もしくはここのCreate tableからも作成することができます。
こちらの記事がとても参考になります。AWS Athena で CREATE TABLE する
Athenaからクエリを叩く
select
var_pop(CAST(e_1 AS DECIMAL(4, 3))) as var_pop_qe1,
var_pop(CAST(e_2 AS DECIMAL(4, 3))) as var_pop_qe2,
var_pop(CAST(e_3 AS DECIMAL(4, 3))) as var_pop_qe3
from DB_name.table_name
;
今回はe_1, e_2, e_3カラムのデータをDECIMALに変換したのちに分散を計算しています。
このように、Athenaでサポートされている関数や演算子もまとまっていますので、公式ドキュメントをご参照ください
以上が、Amazon Athena Management Consoleからのクエリの叩き方でした。
cliからクエリを大量に投げてみた
athenaのmanagement consoleから大量のクエリを投げるのがどのように行えばいいのかがわからなかったので、cliからlambdaを経由して、大量のクエリを送りました。
以下参考文献です。
今回は上記の参考文献を参考にしてpythonで以下のように記述しました。
import time
import boto3
# athena constant
DATABASE = 'athenaでアクセスしたいDB名'
TABLE = 'athenaでアクセスしたいtable名'
# S3 constant
S3_OUTPUT = '結果を出力したいS3のURI'
S3_BUCKET = 'バケット名'
# number of retries
RETRY_COUNT = 10
def lambda_handler(event, context):
# created query
query = (
"""
SELECT
AVG(CAST(e_1 AS DECIMAL(4, 3))) as avg_qe1,
AVG(CAST(e_2 AS DECIMAL(4, 3))) as avg_qe2,
AVG(CAST(e_3 AS DECIMAL(4, 3))) as avg_qe3
FROM %s.%s;
"""
) % (DATABASE, TABLE)
# athena client
client = boto3.client('athena')
# Execution
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': DATABASE
},
ResultConfiguration={
'OutputLocation': S3_OUTPUT,
}
)
# get query execution id
query_execution_id = response['QueryExecutionId']
print(query_execution_id)
# get execution status
for i in range(1, 1 + RETRY_COUNT):
# get query execution
query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
query_execution_status = query_status['QueryExecution']['Status']['State']
if query_execution_status == 'SUCCEEDED':
print("STATUS:" + query_execution_status)
break
if query_execution_status == 'FAILED':
raise Exception("STATUS:" + query_execution_status)
else:
print("STATUS:" + query_execution_status)
time.sleep(i)
else:
client.stop_query_execution(QueryExecutionId=query_execution_id)
raise Exception('TIME OVER')
# get query results
result = client.get_query_results(QueryExecutionId=query_execution_id)
print(result)
# get data
result_output = result['ResultSet']['Rows'][1]['Data'][1]['VarCharValue']
return result
今回、工夫した点は、for文を用いて繰り返し処理を行った点です。
実際に30件ほど送ったらAthenaでqueueされつつ全て実行されました。
一方で、数千件送った場合、cliに以下のようなエラーが返ってきました。
An error occurred (TooManyRequestsException) when calling the StartQueryExecution operation: You have exceeded the limit for the number of queries you can run concurrently. Please reduce the number of concurrent queries submitted by this account. Contact customer support to request a concurrent query limit increase.
一クエリ15秒程度かかる大きなクエリを投げた時に、1分間で実行されたクエリの数(athenaのhistoryで確認)と、実際のクエリにかかった秒数(athenaのhistoryで確認)でおおよその並列数を計算したところ3~4件程度でした。
Athenaはリクエストが投げられてからAWS側でリソースを確保するため、常に最大パフォーマンスが担保されるわけではないそうです。
まとめ
今回明らかになったこと
- Athenaで並列に同時実行するために、cliからlambdaを経由することで実現できた。
- 大量のリクエストを送った場合、athena側で一定queue管理してくれるものの、一定以上のリクエストを送るとToo Many Requestというresponseが返ってきて実行されない
また、何か学びがあれば共有していきたいです