🎄 本記事は ZOZO Advent Calendar 2024 シリーズ 2 の 9 日目です。
推薦基盤ブロックの1日目の記事は 荒木さんによる 強化学習で作る最強のCCレモンAI~強化学習基礎編~でした。
はじめに
こんにちは!スミスと申します。
機械学習を扱うアプリケーションや研究、kaggleを行う際、推論時間が長くなってしまうことってありますよね?
この記事では、Python の標準ライブラリで提供される concurrent.futures モジュールを活用し、機械学習モデルで推論を並列化する方法を簡単に説明します。
concurrent.futuresとは?
Python 標準ライブラリで提供される並列・並行処理 & 非同期実行をサポートするモジュールです。このモジュールを使用することで、複雑なプロセスやスレッドの管理を抽象化してくれるので、シンプルな記載方法で、並列・並行タスクが実行できます。
使用するメリット
- 非同期実行をサポートしている
- 関数やメソッドなどの呼び出し可能オブジェクトを非同期に実行可能
- 複数のタスクを同時に実行し、結果が準備でき次第取得可能
- 高水準インターフェース
- スレッドやプロセスの管理を容易にし、実装者が細かい制御に煩わされることなく並列処理を実装できる
- 柔軟性が高い
- タスクに応じて、並列処理の対象をスレッドかプロセスの中から選択できる
プロセスとスレッドの選択
concurrent.futures モジュールの大きな特徴の1つとして、複数のスレッドを使用するか、複数のプロセスを使用するかを選択できる点が挙げられます。
このモジュールでは、以下の2つのクラスがそれぞれの方式に対応しています:
- ThreadPoolExecutor: 複数のスレッドを作成して並列処理を行う
- ProcessPoolExecutor: 複数のプロセスを作成して並列処理を行う
Executor | 用途 | 特徴 |
---|---|---|
ThreadPoolExecutor | I/O バウンドタスク(ファイル操作、ネットワーク通信など) | スレッドを作成して並列処理を実現。主に I/O 待機時間が長い処理に適している。e.g., APIリクエストの送信とレスポンス処理 |
ProcessPoolExecutor | CPU バウンドタスク(数値計算、データ処理など) | プロセスを作成して並列処理を実現。大規模な行列計算のような計算負荷が高い処理や同じ作業を繰り返す処理に適している。e.g., 機械学習モデルの推論 |
並列処理によるスピードアップの効果を検証
実際にCPUに負荷をかけるタスクを模擬し、並列処理によるスピードアップを検証してみましょう。
CPUバウンドなタスクの例
以下の関数は、CPUを集中的に使用するタスク(大規模な計算)を模擬しています。
# CPUバウンドなタスク (例: 大規模な計算)
def cpu_bound_task(n):
print(f"Process {os.getpid()} is working on task with input {n}\n")
result = sum(i * i for i in range(n))
return result
逐次処理(ノーマル)
すべてのタスクを順番に実行する場合です。
start_time = time.time()
results = [cpu_bound_task(n) for n in task_inputs]
end_time = time.time()
print(f"Execution completed in {end_time - start_time:.2f} seconds")
ThreadPoolExecutor を使用した並行処理
スレッドごとにタスクを並列で実行する場合です。
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_bound_task, task_inputs))
end_time = time.time()
print(f"Execution completed in {end_time - start_time:.2f} seconds")
ProcessPoolExecutor を使用した並列処理
プロセスごとにタスクを並列で実行する場合です。
start_time = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_bound_task, task_inputs))
end_time = time.time()
print(f"Execution completed in {end_time - start_time:.2f} seconds")
このベンチマーク結果は次になります。
みてみるとわかる通り、ProcessPoolExecutorが4.70[s]となり、ノーマルとThreadPoolExecutorよりも2倍以上実行時間が早いです。この理由は、ProcessPoolExecutorはプロセスを並列で走らせるためになります。
Executor | 時間 |
---|---|
ノーマル | 11.00[s] |
ThreadPoolExecutor | 11.18[s] |
ProcessPoolExecutor | 4.70[s] |
タイタニック号生存予測における ProcessPoolExecutor の適用
最後は、有名なkaggleコンペのタイタニック号生存予測に ProcessPoolExecutor を使用して並列推論の例を示します!データの前処理、モデルのトレーニング、そして推論結果の生成まで一連の流れを示します。
並列推論の適用方法
ProcessPoolExecutor を使用する際の主なポイントは次のとおりです:
バッチ分割
並列処理を適切に行うために、データをバッチに分割する必要があります。このバッチが各プロセスに対応します。
推論関数の設計
推論関数には、データ変換(例:スケーリングやエンコード)を含めないようにします。変換を含めるとプロセスの初期化が頻繁に行われ、全体の処理速度が大幅に低下する可能性があります。
import os
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from concurrent.futures import ProcessPoolExecutor
def preprocess(df):
# Create a copy of the dataframe to avoid modifying the original
df = df.copy()
# Fill missing values for Age, Embarked, and Fare columns
df["Age"] = df["Age"].fillna(df["Age"].median())
df["Embarked"] = df["Embarked"].fillna(df["Embarked"].mode()[0])
df["Fare"] = df["Fare"].fillna(df["Fare"].median())
# Map categorical variables to numerical values
df["Sex"] = df["Sex"].map({"male": 0, "female": 1})
df["Embarked"] = df["Embarked"].map({"C": 0, "Q": 1, "S": 2})
# Drop unnecessary columns
return df.drop(columns=["Name", "Ticket", "Cabin"], errors='ignore')
def make_submission(path, predictions):
# Save the predictions to a CSV file
predictions.to_csv(path, index=False)
print(f"Submission saved to {path}")
# Load and preprocess data
train_path = '/home/jupyter/data/Titanic-Machine-Learning-from-Disaster/train.csv'
test_path = '/home/jupyter/data/Titanic-Machine-Learning-from-Disaster/test.csv'
train_df = preprocess(pd.read_csv(train_path))
test_df = preprocess(pd.read_csv(test_path))
# Model training
input_features = [col for col in train_df.columns if col not in ["PassengerId", "Survived"]]
X_train = StandardScaler().fit_transform(train_df[input_features])
y_train = train_df["Survived"]
model = RandomForestClassifier(random_state=42, n_jobs=-1)
model.fit(X_train, y_train)
# Batch processing for predictions
batch_size = 100
scaler = StandardScaler().fit(train_df[input_features])
# Preprocess and scale the test data
X_test = scaler.transform(test_df[input_features])
# Split data into batches
batches = [X_test[i:i + batch_size] for i in range(0, len(X_test), batch_size)]
def predict_batch(batch):
# Predict the batch using the trained model
print(f"Process {os.getpid()} is working on task\n")
return model.predict(batch).tolist()
predictions = []
with ProcessPoolExecutor(max_workers=8) as executor:
# Use ProcessPoolExecutor to process batches in parallel
for result in executor.map(predict_batch, batches):
predictions.extend(result)
# Save predictions
submission = pd.DataFrame({
"PassengerId": test_df["PassengerId"],
"Survived": predictions
})
make_submission('/home/jupyter/data/Titanic-Machine-Learning-from-Disaster/submission.csv', submission)