Python で Azure DataLake 上の巨大なファイルを読み込んで処理する機会があり、メモリを圧迫しない方法をとることが求められました。 Azure SDK に少しクセがありどうすれば可能なのか悩みましたが、いろいろ試して方法が見つかったため纏めます。
TL;DR
Python のストリーム(ファイルオブジェクト)を用いると、データを一度に読み込むのを避けて少しずつ処理でき、メモリ消費を抑えられます。
あとは DataLake に対するストリームをどう取得するかというだけですが、
- Azure SDK のクラス
StorageStreamDownloader
をそのままストリームとして扱うことはできません - そのクラスの簡単なラッパーを自作するとストリームとして使えます
- Azure SDK でなく fsspec を用いるともっと簡単です
サンプルアプリケーション
DataLake 上に保存してあるテキストファイルを読み込み、指定した語句を含む行を抽出して返す Azure Functions を作ってみました。
最初に何の工夫も無い方法を示します。 DataLake のファイルクライアントオブジェクトを取得したら、中身のデータをバイトオブジェクトとして取得し、文字列に変換し1、1行ずつに分解して処理します。
import os
from logging import getLogger
import azure.functions as func
from azure.storage.filedatalake import DataLakeServiceClient
logger = getLogger(__name__)
logger.setLevel("DEBUG")
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
# 検索キーワードをリクエストパラメータから取得する
keyword = req.params.get("s", "")
limit = int(req.params.get("limit", 10))
logger.info(f"keyword: {keyword!r}, limit: {limit}")
# ファイルクライアントを取得する
conn_str = os.environ["AZURE_STORAGE_CONNECTION_STRING"]
file_system = os.environ["DATALAKE_FILE_SYSTEM"]
file_path = os.environ["DATALAKE_FILE_PATH"]
service_client = DataLakeServiceClient.from_connection_string(conn_str)
file_client = service_client.get_file_client(file_system, file_path)
# バイナリデータをメモリ上にダウンロードする
downloader = file_client.download_file()
data = downloader.readall()
# テキストとして処理する
lines = str(data, encoding="utf-8").splitlines(keepends=True)
filtered = []
n = 0
for line in lines:
if keyword in line:
logger.debug(line.rstrip("\n"))
filtered.append(line)
n += 1
if n >= limit:
break
return func.HttpResponse("".join(filtered))
小惑星リストのテキストファイル(74万行、約 63MiB)を配置したときの実行例を以下に示します。
$ curl http://localhost:7071/api/http_trigger?s=COIAS
(697402) Ao 2017 BX232 2017 01 23* Maunakea COIAS
(718492) 2017 FZ233 2017 03 22* Maunakea COIAS
(719612) 2019 UW157 2019 10 27* Maunakea COIAS
(小惑星 Ao, Quro, Hoshizaki の命名、おめでとうございます!)
読み込み方法の変更
サンプルアプリケーションを段階的に改善していきます。
ファイルオブジェクト(ストリーム)を用いる
ファイルオブジェクトはデータの読み書きをする際によく使います。一連のデータを順番に処理していけばいいときに便利で、大抵はメモリ上にデータの一部だけを保持してメモリを圧迫しないようになっています。
名前通りファイルが一番イメージしやすいですが、同じインタフェースになっていればファイル以外のデータも同じように扱えます。本記事ではローカルファイルは扱わないので、ストリームという呼称を用います。
Azure SDK の StorageStreamDownloader
には readinto
というメソッドがあり、データを他のストリームへ書き出すことができます。ローカルファイルへ書き出さずメモリ上にデータを保持するためには、 BytesIO
のインスタンスを作成します。
バイナリから文字列への変換や、抽出した文字列の結合などもストリームへ置き換えれば、以下のようになります。ストリームのまま for 文で1行ずつ取り出せるというのがなかなか便利です。
@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
...
file_client = service_client.get_file_client(file_system, file_path)
# バイナリデータをメモリ上にダウンロードする
buffer = BytesIO()
downloader = file_client.download_file()
downloader.readinto(buffer)
buffer.seek(0)
# テキストとして処理する
stream = TextIOWrapper(buffer, encoding="utf-8")
filtered = StringIO()
n = 0
for line in stream:
if keyword in line:
logger.debug(line.rstrip("\n"))
filtered.write(line)
n += 1
if n >= limit:
break
return func.HttpResponse(filtered.getvalue())
しかしこの例では全データをメモリに乗せてから後続の処理をしているため、根本的な問題は解決していません。 for 文のところで適宜データを追加ダウンロードしてもらうようにする必要があります。
StorageStreamDownloader 自体をストリームとして扱う(失敗)
以前 AWS S3 のデータを Boto3 で読み込んだときは2、レスポンスから取得した StreamingBody
は read
するだけでなくそのままストリームとして扱えました。 Azure SDK の StorageStreamDownloader
も同じように扱えれば楽です。
しかし、実際に stream = TextIOWrapper(downloader)
と試してみたところ、エラーになってしまいました。
AttributeError: 'StorageStreamDownloader' object has no attribute 'readable'
ラッパーを自作する
StorageStreamDownloader
クラスは、 read
など各ストリームで実装すべきメソッドは実装しているものの、 IOBase
を継承していません。そのためストリームのインタフェース仕様に沿っていなく、ファイルなどと同様には扱えません。
そこで、ラッパークラスを作成してストリームに適合させてみます。
from io import IOBase
from azure.storage.filedatalake import StorageStreamDownloader
# IOBase の実装クラスとしてラッパーを作成する
class DownloadWrapper(IOBase):
def __init__(self, downloader: StorageStreamDownloader) -> None:
self.downloader = downloader
# このストリームが読み取り可能かどうか伝える
def readable(self) -> bool:
return True
# 実際の処理は StorageStreamDownloader に任せる
def read(self, size: int = -1) -> bytes:
return self.downloader.read(size)
# その他、必要なメソッドがあれば追加する
@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
...
file_client = service_client.get_file_client(file_system, file_path)
# StorageStreamDownloader をラップして IO に変える
downloader = file_client.download_file()
buffer = DownloadWrapper(downloader)
# テキストとして処理する
stream = TextIOWrapper(buffer, encoding="utf-8")
...
実際に動かしてみると、 Azure SDK による DataLake からのダウンロードの合間に for 文内の処理がログ出力されていて、データが必要になってからダウンロードしていることが確認できます。
(あとはメモリが増大していないことの確認も必要ですが、 1GiB 近いファイルを全行処理しても Azure Functions のメモリが数百MBで済んでいることは確認しました。)
fsspec を利用する
pandas のドキュメントを見ていると、パスにローカルファイルでなく s3://...
のような URI も指定できることが書かれています。
IO tools (text, CSV, HDF5, …) # Reading/writing remote files
これは fsspec というライブラリを利用しているようです。 Microsoft の公式ドキュメントにも例が無いか探したところ、データ分析サービスのほうで見つかりました。(リンクは参考に記載しています)
fsspec 本体に加えて Azure 向けの実装が必要なので、 pip install adlfs
でインストールして利用します。コードではファイルクライアントの取得は必要なくなり、組み込みの open
に似た形でストリームを取得できます。
import os
from io import StringIO, TextIOWrapper
from logging import getLogger
import azure.functions as func
import fsspec
logger = getLogger(__name__)
logger.setLevel("DEBUG")
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
# 検索キーワードをリクエストパラメータから取得する
keyword = req.params.get("s", "")
limit = int(req.params.get("limit", 10))
logger.info(f"keyword: {keyword!r}, limit: {limit}")
# リモートファイルの IO を取得する
# (認証情報は環境変数 AZURE_STORAGE_CONNECTION_STRING に格納済み)
file_system = os.environ["DATALAKE_FILE_SYSTEM"]
file_path = os.environ["DATALAKE_FILE_PATH"].lstrip("/")
with fsspec.open(f"az://{file_system}/{file_path}") as f:
# テキストとして処理する
stream = TextIOWrapper(f)
filtered = StringIO()
n = 0
for line in stream:
if keyword in line:
logger.debug(line.rstrip("\n"))
filtered.write(line)
n += 1
if n >= limit:
break
return func.HttpResponse(filtered.getvalue())
まとめ
Azure DataLake 上のファイルを Python でメモリ節約して読み込む方法を調査しました。
データを少しずつ読み込むにはストリームを使うのが便利です。しかし Azure SDK のクラスは残念ながら IOBase
を継承していなく、そのままではストリームとして扱えませんでした。とはいえ簡単なラッパーを作成することでその問題を解消できました。
ストリームを取得する別の方法では fsspec がありました。 URI を指定してローカルファイルのように扱うので、 Azure SDK とは雰囲気が異なります。今回は深入りしていませんが、データ処理などのライブラリによっては URI を直接渡したりワイルドカード指定もできるようなので、ファイルの扱い全般が楽になりそうです。
参考
- Microsoft Learn
- Azure SDK for Python 関連
- fsspec 関連
- Azure Functions
- Python / ライブラリ
小惑星リストのテキストファイルは、 IAU Minor Planet Center の NumberedMPs.txt
(2024/11/14更新版)を使用しました。
https://www.minorplanetcenter.net/data
-
SDK 側で文字列に変換させるオプションもありますが、本筋ではないので割愛します。 ↩