LoginSignup
3
1

More than 1 year has passed since last update.

PyAthenaの速度を改善する方法

Last updated at Posted at 2022-09-03

はじめに

PyAthenaを使用して、Pandas.DataFrameにデータを展開する場合に速度が遅くて困ったことはありませんか?
もしかしたら、PyAthenaの使い方を見直すことで改善することができるかもしれませんよ!
私がPyAthenaの速度改善するために調べたことをまとめたいと思います。

PyAthena

PyAthenaは、PythonからAmazon Athenaを使用するためのPython Clientになります。
PyAthenaを使用して、Pandas.DataFrameにデータを展開する方法は複数あり、使い方によって速度に大きな差が出る場合があります。

使い方

PyAthenaを使用してPandas.DataFrameにデータを展開する方法を3つ紹介したいと思います。

 ① Pandas : read_sql_query
 ② PyAthena : PandasCursor
 ③ PyAthena : AsyncPandasCursor

PyAthenaの使い方で調べると①の方法がよくヒットするのですが、実は①read_sql_queryの方法はパフォーマンスがあまり良くないのです…
PyAthenaには、別の方法として②PandasCursorも用意されており、こちらを使用することで速度の改善を図ることができます。

① Pandas:read_sql_query

PyAthenaで取得したクエリ結果をPandasのread_sql_queryを使用して、DataFrameにデータを展開する方法です。
これが、非常に遅くデータ展開にかなりの時間が掛かります…
なので、この方法を使用している人は、②PandasCursorまたは③AsyncPandasCursorの方法を試してみてください。

read_sql_query
from pyathena import connect
import pandas as pd

conn = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
               region_name="ap-northeast-1")
df = pd.read_sql_query("SELECT * FROM many_rows", conn)

② PyAthena:PandasCursor

PandasCursorは、Athenaのクエリ結果としてS3に保存されるCSVファイルからDataFrameに直接データを展開する方法です。
S3に保存されるCSVファイルから直接データを読み込むため、①read_sql_queryの方法と比較してパフォーマンスに優れています。

PandasCursor
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
                 region_name="ap-northeast-1",
                 cursor_class=PandasCursor).cursor()

df = cursor.execute("SELECT * FROM many_rows").as_pandas()

このPandasCursorは、Athenaに対して順番にクエリを発行を行います。
なので、複数から同時にクエリを発行した場合、先発のクエリが終了するまで後発のクエリは実行待ちになり、パフォーマンスのボトルネックとなる場合があります。
その場合は、③AsyncPandasCursorの方法を試してみてください。

③ PyAthena : AsyncPandasCursor

AsyncPandasCursorは、PandasCursorと同様にS3に保存されるCSVファイルからDataFrameに直接データを展開する方法ですが、Athenaに対して複数のクエリを並列実行することができます。
その為、複数のクエリを同時実行する場合、②PandasCursorの方法と比較してパフォーマンスに優れています。(実行環境によっては、並列化することでパフォーマンスが劣化する可能性もありますので、ご注意ください。)

AsyncPandasCursor
from pyathena import connect
from pyathena.pandas.async_cursor import AsyncPandasCursor

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
                 region_name="ap-northeast-1",
                 cursor_class=AsyncPandasCursor).cursor(max_workers=10)

query_id, future = cursor.execute("SELECT * FROM many_rows")
result_set = future.result()
df = result_set.as_pandas()

ワーカー数は、"max_workers"を指定することで変更することが可能です。
デフォルトは、5 または CPU数 * 5 になります。

クエリIDは、クエリをキャンセルする場合に使用します。

AsyncPandasCursor
from pyathena import connect
from pyathena.pandas.async_cursor import AsyncPandasCursor

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
                 region_name="ap-northeast-1",
                 cursor_class=AsyncPandasCursor).cursor()

query_id, future = cursor.execute("SELECT * FROM many_rows")
cursor.cancel(query_id)

まとめ

下表に紹介した3つの方法をまとめます。

展開方法 展開速度 並列実行
① read_sql_query × ×
② PandasCursor ×
③ AsyncPandasCursor

基本は、② PandasCursorを使用するで問題ないと思いますが、
並列実行する場合は、③ AsyncPandasCursorを検討すると良いと思います。

さいごに

PyAthenaについて調べてみると色々な使い方があることがわかりました。
PyAthenaが遅くて…と感じた時に参考にして貰えたらと思います。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1