1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

エーピーコミュニケーションズAdvent Calendar 2024

Day 4

Azure Data Lake Storage 上のファイルを Python でメモリ節約して読み込む

Last updated at Posted at 2024-12-03

Python で Azure DataLake 上の巨大なファイルを読み込んで処理する機会があり、メモリを圧迫しない方法をとることが求められました。 Azure SDK に少しクセがありどうすれば可能なのか悩みましたが、いろいろ試して方法が見つかったため纏めます。

TL;DR

Python のストリーム(ファイルオブジェクト)を用いると、データを一度に読み込むのを避けて少しずつ処理でき、メモリ消費を抑えられます。

あとは DataLake に対するストリームをどう取得するかというだけですが、

  • Azure SDK のクラス StorageStreamDownloader をそのままストリームとして扱うことはできません
  • そのクラスの簡単なラッパーを自作するとストリームとして使えます
  • Azure SDK でなく fsspec を用いるともっと簡単です

サンプルアプリケーション

DataLake 上に保存してあるテキストファイルを読み込み、指定した語句を含む行を抽出して返す Azure Functions を作ってみました。

最初に何の工夫も無い方法を示します。 DataLake のファイルクライアントオブジェクトを取得したら、中身のデータをバイトオブジェクトとして取得し、文字列に変換し1、1行ずつに分解して処理します。

function_app.py
import os
from logging import getLogger

import azure.functions as func
from azure.storage.filedatalake import DataLakeServiceClient

logger = getLogger(__name__)
logger.setLevel("DEBUG")

app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)


@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:

    # 検索キーワードをリクエストパラメータから取得する
    keyword = req.params.get("s", "")
    limit = int(req.params.get("limit", 10))
    logger.info(f"keyword: {keyword!r}, limit: {limit}")

    # ファイルクライアントを取得する
    conn_str = os.environ["AZURE_STORAGE_CONNECTION_STRING"]
    file_system = os.environ["DATALAKE_FILE_SYSTEM"]
    file_path = os.environ["DATALAKE_FILE_PATH"]
    service_client = DataLakeServiceClient.from_connection_string(conn_str)
    file_client = service_client.get_file_client(file_system, file_path)

    # バイナリデータをメモリ上にダウンロードする
    downloader = file_client.download_file()
    data = downloader.readall()

    # テキストとして処理する
    lines = str(data, encoding="utf-8").splitlines(keepends=True)
    filtered = []
    n = 0
    for line in lines:
        if keyword in line:
            logger.debug(line.rstrip("\n"))
            filtered.append(line)
            n += 1
            if n >= limit:
                break

    return func.HttpResponse("".join(filtered))

小惑星リストのテキストファイル(74万行、約 63MiB)を配置したときの実行例を以下に示します。

実行例
$ curl http://localhost:7071/api/http_trigger?s=COIAS
(697402) Ao                  2017 BX232  2017 01 23* Maunakea                 COIAS
(718492)                     2017 FZ233  2017 03 22* Maunakea                 COIAS
(719612)                     2019 UW157  2019 10 27* Maunakea                 COIAS

(小惑星 Ao, Quro, Hoshizaki の命名、おめでとうございます!)

読み込み方法の変更

サンプルアプリケーションを段階的に改善していきます。

ファイルオブジェクト(ストリーム)を用いる

ファイルオブジェクトはデータの読み書きをする際によく使います。一連のデータを順番に処理していけばいいときに便利で、大抵はメモリ上にデータの一部だけを保持してメモリを圧迫しないようになっています。
名前通りファイルが一番イメージしやすいですが、同じインタフェースになっていればファイル以外のデータも同じように扱えます。本記事ではローカルファイルは扱わないので、ストリームという呼称を用います。

Azure SDK の StorageStreamDownloader には readinto というメソッドがあり、データを他のストリームへ書き出すことができます。ローカルファイルへ書き出さずメモリ上にデータを保持するためには、 BytesIO のインスタンスを作成します。

バイナリから文字列への変換や、抽出した文字列の結合などもストリームへ置き換えれば、以下のようになります。ストリームのまま for 文で1行ずつ取り出せるというのがなかなか便利です。

function_app.py(抜粋)
@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
    ...
    file_client = service_client.get_file_client(file_system, file_path)

    # バイナリデータをメモリ上にダウンロードする
    buffer = BytesIO()
    downloader = file_client.download_file()
    downloader.readinto(buffer)
    buffer.seek(0)

    # テキストとして処理する
    stream = TextIOWrapper(buffer, encoding="utf-8")
    filtered = StringIO()
    n = 0
    for line in stream:
        if keyword in line:
            logger.debug(line.rstrip("\n"))
            filtered.write(line)
            n += 1
            if n >= limit:
                break

    return func.HttpResponse(filtered.getvalue())

しかしこの例では全データをメモリに乗せてから後続の処理をしているため、根本的な問題は解決していません。 for 文のところで適宜データを追加ダウンロードしてもらうようにする必要があります。

StorageStreamDownloader 自体をストリームとして扱う(失敗)

以前 AWS S3 のデータを Boto3 で読み込んだときは2、レスポンスから取得した StreamingBodyread するだけでなくそのままストリームとして扱えました。 Azure SDK の StorageStreamDownloader も同じように扱えれば楽です。

しかし、実際に stream = TextIOWrapper(downloader) と試してみたところ、エラーになってしまいました。

AttributeError: 'StorageStreamDownloader' object has no attribute 'readable'

ラッパーを自作する

StorageStreamDownloader クラスは、 read など各ストリームで実装すべきメソッドは実装しているものの、 IOBase を継承していません。そのためストリームのインタフェース仕様に沿っていなく、ファイルなどと同様には扱えません。

そこで、ラッパークラスを作成してストリームに適合させてみます。

download_wrapper.py
from io import IOBase

from azure.storage.filedatalake import StorageStreamDownloader


# IOBase の実装クラスとしてラッパーを作成する
class DownloadWrapper(IOBase):

    def __init__(self, downloader: StorageStreamDownloader) -> None:
        self.downloader = downloader

    # このストリームが読み取り可能かどうか伝える
    def readable(self) -> bool:
        return True

    # 実際の処理は StorageStreamDownloader に任せる
    def read(self, size: int = -1) -> bytes:
        return self.downloader.read(size)

    # その他、必要なメソッドがあれば追加する
function_app.py(抜粋)
@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
    ...
    file_client = service_client.get_file_client(file_system, file_path)

    # StorageStreamDownloader をラップして IO に変える
    downloader = file_client.download_file()
    buffer = DownloadWrapper(downloader)

    # テキストとして処理する
    stream = TextIOWrapper(buffer, encoding="utf-8")
    ...

実際に動かしてみると、 Azure SDK による DataLake からのダウンロードの合間に for 文内の処理がログ出力されていて、データが必要になってからダウンロードしていることが確認できます。
(あとはメモリが増大していないことの確認も必要ですが、 1GiB 近いファイルを全行処理しても Azure Functions のメモリが数百MBで済んでいることは確認しました。)

fsspec を利用する

pandas のドキュメントを見ていると、パスにローカルファイルでなく s3://... のような URI も指定できることが書かれています。
IO tools (text, CSV, HDF5, …) # Reading/writing remote files

これは fsspec というライブラリを利用しているようです。 Microsoft の公式ドキュメントにも例が無いか探したところ、データ分析サービスのほうで見つかりました。(リンクは参考に記載しています)

fsspec 本体に加えて Azure 向けの実装が必要なので、 pip install adlfs でインストールして利用します。コードではファイルクライアントの取得は必要なくなり、組み込みの open に似た形でストリームを取得できます。

function_app.py
import os
from io import StringIO, TextIOWrapper
from logging import getLogger

import azure.functions as func
import fsspec

logger = getLogger(__name__)
logger.setLevel("DEBUG")

app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)


@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:

    # 検索キーワードをリクエストパラメータから取得する
    keyword = req.params.get("s", "")
    limit = int(req.params.get("limit", 10))
    logger.info(f"keyword: {keyword!r}, limit: {limit}")

    # リモートファイルの IO を取得する
    # (認証情報は環境変数 AZURE_STORAGE_CONNECTION_STRING に格納済み)
    file_system = os.environ["DATALAKE_FILE_SYSTEM"]
    file_path = os.environ["DATALAKE_FILE_PATH"].lstrip("/")

    with fsspec.open(f"az://{file_system}/{file_path}") as f:

        # テキストとして処理する
        stream = TextIOWrapper(f)
        filtered = StringIO()
        n = 0
        for line in stream:
            if keyword in line:
                logger.debug(line.rstrip("\n"))
                filtered.write(line)
                n += 1
                if n >= limit:
                    break

    return func.HttpResponse(filtered.getvalue())

まとめ

Azure DataLake 上のファイルを Python でメモリ節約して読み込む方法を調査しました。

データを少しずつ読み込むにはストリームを使うのが便利です。しかし Azure SDK のクラスは残念ながら IOBase を継承していなく、そのままではストリームとして扱えませんでした。とはいえ簡単なラッパーを作成することでその問題を解消できました。

ストリームを取得する別の方法では fsspec がありました。 URI を指定してローカルファイルのように扱うので、 Azure SDK とは雰囲気が異なります。今回は深入りしていませんが、データ処理などのライブラリによっては URI を直接渡したりワイルドカード指定もできるようなので、ファイルの扱い全般が楽になりそうです。

参考

小惑星リストのテキストファイルは、 IAU Minor Planet Center の NumberedMPs.txt (2024/11/14更新版)を使用しました。
https://www.minorplanetcenter.net/data

  1. SDK 側で文字列に変換させるオプションもありますが、本筋ではないので割愛します。

  2. 記事「pandas を用いて JSON Lines 形式で S3 に保存する

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?