はじめに
PyAthenaを使用して、Pandas.DataFrameにデータを展開する場合に速度が遅くて困ったことはありませんか?
もしかしたら、PyAthenaの使い方を見直すことで改善することができるかもしれませんよ!
私がPyAthenaの速度改善するために調べたことをまとめたいと思います。
PyAthena
PyAthenaは、PythonからAmazon Athenaを使用するためのPython Clientになります。
PyAthenaを使用して、Pandas.DataFrameにデータを展開する方法は複数あり、使い方によって速度に大きな差が出る場合があります。
使い方
PyAthenaを使用してPandas.DataFrameにデータを展開する方法を3つ紹介したいと思います。
① Pandas : read_sql_query
② PyAthena : PandasCursor
③ PyAthena : AsyncPandasCursor
PyAthenaの使い方で調べると①の方法がよくヒットするのですが、実は①read_sql_queryの方法はパフォーマンスがあまり良くないのです…
PyAthenaには、別の方法として②PandasCursorも用意されており、こちらを使用することで速度の改善を図ることができます。
① Pandas:read_sql_query
PyAthenaで取得したクエリ結果をPandasのread_sql_queryを使用して、DataFrameにデータを展開する方法です。
これが、非常に遅くデータ展開にかなりの時間が掛かります…
なので、この方法を使用している人は、②PandasCursorまたは③AsyncPandasCursorの方法を試してみてください。
from pyathena import connect
import pandas as pd
conn = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="ap-northeast-1")
df = pd.read_sql_query("SELECT * FROM many_rows", conn)
② PyAthena:PandasCursor
PandasCursorは、Athenaのクエリ結果としてS3に保存されるCSVファイルからDataFrameに直接データを展開する方法です。
S3に保存されるCSVファイルから直接データを読み込むため、①read_sql_queryの方法と比較してパフォーマンスに優れています。
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="ap-northeast-1",
cursor_class=PandasCursor).cursor()
df = cursor.execute("SELECT * FROM many_rows").as_pandas()
このPandasCursorは、Athenaに対して順番にクエリを発行を行います。
なので、複数から同時にクエリを発行した場合、先発のクエリが終了するまで後発のクエリは実行待ちになり、パフォーマンスのボトルネックとなる場合があります。
その場合は、③AsyncPandasCursorの方法を試してみてください。
③ PyAthena : AsyncPandasCursor
AsyncPandasCursorは、PandasCursorと同様にS3に保存されるCSVファイルからDataFrameに直接データを展開する方法ですが、Athenaに対して複数のクエリを並列実行することができます。
その為、複数のクエリを同時実行する場合、②PandasCursorの方法と比較してパフォーマンスに優れています。(実行環境によっては、並列化することでパフォーマンスが劣化する可能性もありますので、ご注意ください。)
from pyathena import connect
from pyathena.pandas.async_cursor import AsyncPandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="ap-northeast-1",
cursor_class=AsyncPandasCursor).cursor(max_workers=10)
query_id, future = cursor.execute("SELECT * FROM many_rows")
result_set = future.result()
df = result_set.as_pandas()
ワーカー数は、"max_workers"を指定することで変更することが可能です。
デフォルトは、5 または CPU数 * 5 になります。
クエリIDは、クエリをキャンセルする場合に使用します。
from pyathena import connect
from pyathena.pandas.async_cursor import AsyncPandasCursor
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="ap-northeast-1",
cursor_class=AsyncPandasCursor).cursor()
query_id, future = cursor.execute("SELECT * FROM many_rows")
cursor.cancel(query_id)
まとめ
下表に紹介した3つの方法をまとめます。
展開方法 | 展開速度 | 並列実行 |
---|---|---|
① read_sql_query | × | × |
② PandasCursor | 〇 | × |
③ AsyncPandasCursor | 〇 | 〇 |
基本は、② PandasCursorを使用するで問題ないと思いますが、
並列実行する場合は、③ AsyncPandasCursorを検討すると良いと思います。
さいごに
PyAthenaについて調べてみると色々な使い方があることがわかりました。
PyAthenaが遅くて…と感じた時に参考にして貰えたらと思います。