発端はこちらのソリューションアクセラレータを翻訳したことでした。データの準備でスレッドでファイルコピーを並列処理することで高速化するテクニックが紹介されていました。いままで知りませんでした...。
こちらの記事でも同じテクニックが紹介されていました。
大量の小さいファイルが圧縮されている状態から、解凍、ストレージへの保存を行うというパターンは一般的なものだと思いますが、これまでは私自身どのストレージでどのような処理を行えばいいのかをきちんと理解できていませんでした。
先日書いたこちらの記事と合わせて、どのような流れで処理を行うと最も高速なのかをまとめます。
想定シナリオ
Databricksで以下の処理を行うものとします。
- インターネットで公開されている
gz
ファイルをダウンロードする。 -
gz
を解凍する。 - 解凍したファイルを永続化する。
この流れで処理を行う際、こちらでも触れているようにどのストレージを使うのかに関してはオプションが存在します。
処理のオプション
オプション1: ボリュームで解凍する
オブジェクトストレージであるボリュームでファイルを解凍します。この場合、解凍されたファイルは永続化されます。しかし、オブジェクトストレージでの解凍処理は時間を要します。
オプション2: ドライバーノードで解凍する
ブロックストレージである、クラスターのドライバーノードのストレージで解凍します。オブジェクトストレージと比べて、ブロックストレージでの解凍処理は高速です。しかし、クラスターのストレージは停止するとクリアされるので、永続化するために解凍したファイルをボリュームにコピーする必要があります。
オプション2-1: ドライバーノードで解凍しボリュームにコピーする
ボリュームにファイルをコピーすることで永続化されます。しかし、小さいファイルがたくさんある場合、このコピーも非常に時間のかかる処理となります。
オプション2-2: ドライバーノードで解凍しボリュームに並列でコピーする
結論から言えば、この処理がもっとも高速となります。スレッド数を増やせば増やすほど、コピー処理が並列化されます。
検証
注意
- この検証は、AWS上でDBR ML 15.4 LTS、
r6id.xlarge
のシングルノードクラスターを用いて行なっています。 - 非常に時間を要する処理は外挿して推定しています。
- 処理速度は環境によって変動します。参考値としてごらんください。
オプション1: ドライバーノードで解凍する
%sh
# ターゲットボリュームディレクトリに移動
cd /Volumes/takaakiyayoi_catalog/item_onboarding/landing_zone_volume_only
# 画像ファイルを解凍
# (imagesというフォルダに解凍される)
wget -q https://amazon-berkeley-objects.s3.amazonaws.com/archives/abo-images-small.tar
# 画像ファイルを解凍
echo "画像を解凍中"
tar -xf ./abo-images-small.tar --no-same-owner
gzip -df ./images/metadata/images.csv.gz
echo "完了"
元のファイルは3,840個のフォルダから構成される画像ファイル群です。
この処理は100分待っても54フォルダしか作成されませんでした。ここでは、外挿してどのくらいの時間を要するかを見積もります。
1時間あたり32(100/54
)個のフォルダが作成されるとすると、3,840個のフォルダを解凍するには119時間かかる計算となります。
オプション2-1: ドライバーノードで解凍しボリュームにコピーする
%sh
# 一時ディレクトリを作成
mkdir /tmp_landing_zone
# ターゲットディレクトリに移動
cd /tmp_landing_zone
# 画像ファイルをダウンロード
echo "画像をダウンロード中"
wget -q https://amazon-berkeley-objects.s3.amazonaws.com/archives/abo-images-small.tar
# 画像ファイルを解凍
# (imagesというフォルダに解凍される)
echo "画像を解凍中"
tar -xf ./abo-images-small.tar --no-same-owner
gzip -df ./images/metadata/images.csv.gz
echo "完了"
画像の解凍は3分程度で完了しました。
dbutils.fs.cpでボリュームにファイルをコピーします。
# パスを指定
source_dir = "file:/tmp_landing_zone"
target_dir = "/Volumes/takaakiyayoi_catalog/item_onboarding/landing_zone_dbutils/"
dbutils.fs.cp(source_dir, target_dir, recurse=True)
これも処理が終わらないので外挿します。
この処理は100分待ってもボリュームには40フォルダしか作成されませんでした。
1時間あたり24(100/40
)個のフォルダが作成されるとすると、3,840個のフォルダを解凍するには160時間かかる計算となります。
オプション2-2: ドライバーノードで解凍しボリュームに並列でコピーする
%sh
# 一時ディレクトリを作成
mkdir /tmp_landing_zone
# ターゲットディレクトリに移動
cd /tmp_landing_zone
# 画像ファイルをダウンロード
echo "画像をダウンロード中"
wget -q https://amazon-berkeley-objects.s3.amazonaws.com/archives/abo-images-small.tar
# 画像ファイルを解凍
# (imagesというフォルダに解凍される)
echo "画像を解凍中"
tar -xf ./abo-images-small.tar --no-same-owner
gzip -df ./images/metadata/images.csv.gz
echo "完了"
ここまでは2-1と同じです。以降では、スレッドを用いてdbutils.fs.cp
をサブフォルダごとに実行します。
# Standard Imports
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
# External Imports
from tqdm import tqdm
# TODO: 最適なスレッド数を確認する
def threaded_dbutils_copy(source_directory, target_directory, n_threads=10):
"""
スレッドを使用してソースディレクトリをターゲットディレクトリにコピーします。
この関数はスレッドを使用して複数のコピーコマンドを実行し、コピー処理を高速化します。
特に画像のような小さなファイルが多数ある場合に便利です。
:param source_directory: ファイルがコピーされる元のディレクトリ
:param target_directory: ファイルがコピーされる先のディレクトリ
:param n_threads: 使用するスレッド数。数が多いほどプロセスが速くなります。
注意事項
- パスの末尾にバックスラッシュを含めないでください。
- n_threadsを増やすとドライバに負荷がかかるため、メトリクスを監視してドライバが過負荷にならないようにしてください。
- 100スレッドは適切なドライバに適度な負荷をかけます。
"""
print("すべてのパスをリストしています")
# すべてのファイルのための空のリストを作成
all_files = []
# すべてのファイルを発見するための再帰的な検索関数
# TODO: これをジェネレータに変える
def recursive_search(_path):
file_paths = dbutils.fs.ls(_path)
for file_path in file_paths:
if file_path.isFile():
all_files.append(file_path.path)
else:
recursive_search(file_path.path)
# ソースディレクトリに再帰的な検索を適用
recursive_search(source_directory)
# パス文字列のフォーマット
all_files = [path.split(source_directory)[-1][1:] for path in all_files]
n_files = len(all_files)
print(f"{n_files} ファイルが見つかりました")
print(f"{n_threads} スレッドでコピーを開始します")
# 進行状況バーを作成するためのスレッドロックを使用してTQDMを初期化
p_bar = tqdm(total=n_files, unit=" コピー")
bar_lock = Lock()
# 単一スレッドで実行される作業を定義
def single_thread_copy(file_sub_path):
dbutils.fs.cp(f"{source_directory}/{file_sub_path}", f"{target_directory}/{file_sub_path}")
with bar_lock:
p_bar.update(1)
# すべてのパスにスレッド作業をマッピング
with ThreadPoolExecutor(max_workers=n_threads, thread_name_prefix="copy_thread") as ex:
ex.map(single_thread_copy, all_files)
# 進行状況バーを閉じる
p_bar.close()
print("コピー完了")
return
150スレッドで並列コピーすると39分で完了しました。
# パスを指定
source_dir = "file:/tmp_landing_zone"
target_dir = "/Volumes/takaakiyayoi_catalog/item_onboarding/landing_zone_threading/"
# コピーを実行
threaded_dbutils_copy(
source_directory=source_dir,
target_directory=target_dir,
n_threads=150 # 同時に実行するスレッド数はどれくらいにしますか? 数を増やすことを恐れないでください。
)
まとめ
オプション | 処理時間 |
---|---|
オプション1 ボリュームでの解凍 | 119時間 (外挿値)、オプション2-2の183倍 |
オプション2-1 ドライバーノードでの解凍 + dbutils.fs.cpのみでコピー | 160時間 (外挿値)、オプション2-2の246倍 |
オプション2-2 ドライバーノードでの解凍 + dbutils.fs.cpをスレッドで並列化してコピー | 39分 |