本書は著者が手動で翻訳したものであり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
我々はAzure Databricks環境でワークロードを処理していました。同じADLSコンテナにある別のフォルダに大規模なフォルダ(約50GB)をコピーしていました。そして、直接dbutilsのcopy機能でそれを行なっていました。
dbutils.fs.cp(sourceFolder, destFolder)
これは完了に約20分を要していました。
IOバウンドのタスクにおいてスレッドを用いることでより優れたパフォーマンスを達成するであろうことを知っていました。
我々が用いた二つ目のアプローチは、sourceFolder
の1レベル目のサブフォルダを読み込み、サブフォルダのパスと対応するディスティネーションフォルダのパスを格納するタプルのリストにパスを格納するというものでした。sourceFolder
のフォルダ構造が以下のようなものだとします:
path-to-sourceFolder
│
├───sub_folder_1
├───sub_folder_2
├───sub_folder_3
├───…
sorceDestPathList
は以下のようになります:
[ ("/path-to-sourceFolder/sub_folder_1", "/path-to-destFolder/sub_folder_1"), ("/path-to-sourceFolder/sub_folder_2", "/path-to-destFolder/sub_folder_2"), ("/path-to-sourceFolder/sub_folder_3", "/path-to-destFolder/sub_folder_3"), … ]
コピーにおいては、2つの要素を持つタプルとしての入力を受け付ける、dbutilsのcopy機能に対するラッパーを作成しました。最初の要素はソースフォルダのパスで、二つ目の要素はディスティネーションフォルダのパスです。
def copyWrapperFunc(inpPaths: tuple[str]):
dbutils.fs.cp( inpPaths[0], inpPaths[1], True )
スレッディングにおいては、Pytohnのconcurrent.futures
モジュールのThreadPoolExecutor
クラスを用いました。スレッドプールエグゼキューターに対して、sorceDestPathList
とリストのそれぞれのアイテムに処理を行うcopyWrapperFunc
関数を送信しました。
threadPoolSize = 3
thPoolExec = concurrent.futures.ThreadPoolExecutor(max_workers=threadPoolSize)
futures = [ thPoolExec.submit( copyWrapperFunc, item ) for item in sorceDestPathList ]
futureResults = []
for future in concurrent.futures.as_completed(futures):
try:
futureResult = future.result()
futureResults.append( futureResult )
except Exception as exc:
raise exc
これで、3つのワーカースレッドを用いることで5-6分で完了しました。
ネットワーク使用量をチェックしたという証明として
2つのアプローチにおけるドライバーノードのネットワーク使用量を比較します。さまざまな使用量の統計情報を取得できるpsutilというPythonのモジュールがあります。bytes_sent, bytes_recv, packets_sent, packets_recvなどの属性を持つ名前付きタプルとしてネットワーク使用量の統計情報を返却するpsutil.net_io_countersという関数を用いました。この関数が受け付ける名前付けの引数にpernicがあり、これをTrue
にするとこの関数はシステムに存在するそれぞれのネットワークカードの統計情報をディクショナリーとして返却します。ここでは我々はeth0(イーサーネット)に興味があります。この関数の戻り値のサンプルは以下のようになります:
psutil.net_io_counters(pernic = True)[“eth0”]
Sample Value = snetio(bytes_sent=67546317141, bytes_recv=73595487783, packets_sent=40527220, packets_recv=62647634, errin=0, errout=0, dropin=0, dropout=0)
以下のコードを用いることで、5秒間隔で受信した平均バイト数を表示します。
import psutil
import time
import datetime as dt
prevTotalBytesRecv = psutil.net_io_counters(pernic = True)["eth0"].bytes_recv
prevTime = dt.datetime.now()
while True:
curTotalBytesRecv = psutil.net_io_counters(pernic = True)["eth0"].bytes_recv
curTime = dt.datetime.now()
secondsElapsed:float = (curTime - prevTime).total_seconds()
avgBytesReceivedPerSecond = (curTotalBytesRecv - prevTotalBytesRecv) / secondsElapsed
print(round(avgBytesReceivedPerSecond / (1024*1024), 2), "MB", sep="", end=" ")
prevTotalBytesRecv = curTotalBytesRecv
prevTime = curTime
time.sleep(5)
上のコードから得られる結果の画像を添付しています。7GB程度の小規模なサブセットに対して2つの異なるアプローチでコピーを行なっています。
直接のコピー:
2つのワーカーのスレッドによるコピー:
画像から明らかに2つ目のアプローチでの秒間の受信バイト数が高いことが分かり、最終的にはコピーが高速に完了しています。
まとめ
スレッドを用いてサブフォルダをコピーすることは、ADLSとさらに多くの接続を確立することで高速になりました。我々のケースでは、ワーカー数を増やすことでもより高速にタスクを完了することができました。しかし、そこには制限があります。
さらに探索するには:
- 秒あたりの受信バイト数、送信バイト数の最大値は何か、それらには何が影響を与えるのか。
- 画像から観測すると、接続あたりのbytesRecvが60MBなのはなぜか。
リファレンス