0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

[翻訳] Databricksでのdbutils.fs.cpとスレッドによるADLSでの高速なコピー

Last updated at Posted at 2025-02-21

Faster copying in adls through dbutils.fs.cp along with threads in datbricks | by ADITYA RANJAN MOHANTY | Mediumの翻訳です。

本書は著者が手動で翻訳したものであり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

我々はAzure Databricks環境でワークロードを処理していました。同じADLSコンテナにある別のフォルダに大規模なフォルダ(約50GB)をコピーしていました。そして、直接dbutilsのcopy機能でそれを行なっていました。

dbutils.fs.cp(sourceFolder, destFolder)

これは完了に約20分を要していました。

IOバウンドのタスクにおいてスレッドを用いることでより優れたパフォーマンスを達成するであろうことを知っていました。

我々が用いた二つ目のアプローチは、sourceFolderの1レベル目のサブフォルダを読み込み、サブフォルダのパスと対応するディスティネーションフォルダのパスを格納するタプルのリストにパスを格納するというものでした。sourceFolderのフォルダ構造が以下のようなものだとします:

path-to-sourceFolder
│
├───sub_folder_1
├───sub_folder_2
├───sub_folder_3
├───…

sorceDestPathListは以下のようになります:

[ ("/path-to-sourceFolder/sub_folder_1", "/path-to-destFolder/sub_folder_1"), ("/path-to-sourceFolder/sub_folder_2", "/path-to-destFolder/sub_folder_2"), ("/path-to-sourceFolder/sub_folder_3", "/path-to-destFolder/sub_folder_3"),  ]

コピーにおいては、2つの要素を持つタプルとしての入力を受け付ける、dbutilsのcopy機能に対するラッパーを作成しました。最初の要素はソースフォルダのパスで、二つ目の要素はディスティネーションフォルダのパスです。

def copyWrapperFunc(inpPaths: tuple[str]):
    dbutils.fs.cp( inpPaths[0], inpPaths[1], True  )

スレッディングにおいては、Pytohnのconcurrent.futuresモジュールのThreadPoolExecutorクラスを用いました。スレッドプールエグゼキューターに対して、sorceDestPathListとリストのそれぞれのアイテムに処理を行うcopyWrapperFunc関数を送信しました。

threadPoolSize = 3
thPoolExec = concurrent.futures.ThreadPoolExecutor(max_workers=threadPoolSize)

futures = [  thPoolExec.submit( copyWrapperFunc, item ) for item in  sorceDestPathList ]

futureResults = []
for future in concurrent.futures.as_completed(futures):
    try:
        futureResult = future.result()
        futureResults.append( futureResult  )
    except Exception as exc:
        raise exc

これで、3つのワーカースレッドを用いることで5-6分で完了しました。

ネットワーク使用量をチェックしたという証明として

2つのアプローチにおけるドライバーノードのネットワーク使用量を比較します。さまざまな使用量の統計情報を取得できるpsutilというPythonのモジュールがあります。bytes_sent, bytes_recv, packets_sent, packets_recvなどの属性を持つ名前付きタプルとしてネットワーク使用量の統計情報を返却するpsutil.net_io_countersという関数を用いました。この関数が受け付ける名前付けの引数にpernicがあり、これをTrueにするとこの関数はシステムに存在するそれぞれのネットワークカードの統計情報をディクショナリーとして返却します。ここでは我々はeth0(イーサーネット)に興味があります。この関数の戻り値のサンプルは以下のようになります:

psutil.net_io_counters(pernic = True)[“eth0”]
Sample Value = snetio(bytes_sent=67546317141, bytes_recv=73595487783, packets_sent=40527220, packets_recv=62647634, errin=0, errout=0, dropin=0, dropout=0)

以下のコードを用いることで、5秒間隔で受信した平均バイト数を表示します。

import psutil
import time
import datetime as dt

prevTotalBytesRecv = psutil.net_io_counters(pernic = True)["eth0"].bytes_recv
prevTime = dt.datetime.now()

while True:
    curTotalBytesRecv = psutil.net_io_counters(pernic = True)["eth0"].bytes_recv

    curTime = dt.datetime.now()
    secondsElapsed:float = (curTime - prevTime).total_seconds()
    avgBytesReceivedPerSecond = (curTotalBytesRecv - prevTotalBytesRecv) / secondsElapsed
    print(round(avgBytesReceivedPerSecond / (1024*1024), 2), "MB", sep="", end=" ")

    prevTotalBytesRecv = curTotalBytesRecv
    prevTime = curTime

    time.sleep(5)

上のコードから得られる結果の画像を添付しています。7GB程度の小規模なサブセットに対して2つの異なるアプローチでコピーを行なっています。

直接のコピー:

2つのワーカーのスレッドによるコピー:

画像から明らかに2つ目のアプローチでの秒間の受信バイト数が高いことが分かり、最終的にはコピーが高速に完了しています。

まとめ

スレッドを用いてサブフォルダをコピーすることは、ADLSとさらに多くの接続を確立することで高速になりました。我々のケースでは、ワーカー数を増やすことでもより高速にタスクを完了することができました。しかし、そこには制限があります。

さらに探索するには:

  • 秒あたりの受信バイト数、送信バイト数の最大値は何か、それらには何が影響を与えるのか。
  • 画像から観測すると、接続あたりのbytesRecvが60MBなのはなぜか。

リファレンス

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?