はじめに
機械学習やディープラーニングにおいて、モデルの推論性能は非常に重要な指標です。特に大規模なデータを扱う際に、モデルが実際にどのように動作するかを理解するのに役立ちます。この記事では、モデルの推論性能をテストする方法をステップバイステップで解説し、Pythonを使って簡単なコードを実装します。
モデルの推論性能とは?
モデルの推論性能は主に以下の2つの観点から評価されます:
- メモリ使用量:モデルが推論中に使用するメモリの量。
- モデルのスループット:モデルが単位時間あたりに処理できるトークンの数。
メモリ使用量
- RSS(Resident Set Size):プロセスが実際に物理メモリ(RAM)上に保持しているメモリの量。
- VMS(Virtual Memory Size):プロセスが仮想メモリ上で確保しているメモリの総量。
モデルのスループット
- Prefillフェーズ:モデルが事前に計算を行い、一部の自己注意力をキャッシュするプロセス。
- Decodeフェーズ:モデルが自己回帰的に新しいトークンを生成するプロセス。
モデルの推論性能をテストする手順
ここでは、2つのスクリプト monitor.py
と benchmark.py
を使用します。monitor.py
はメモリ使用量を監視し、benchmark.py
はモデルのスループットをテストします。
1. メモリ使用量を監視する
1-1. RSSとVMSの説明:メモリ使用量の理解
モデルの推論性能をテストする際に、メモリ使用量を監視するために RSS と VMS という2つの指標がよく使用されます。これらは、プロセスが使用するメモリの種類を表す重要な指標です。
RSS(Resident Set Size)
RSS は、プロセスが実際に物理メモリ(RAM)上に保持しているメモリの量を表します。つまり、プロセスが現在使用している物理メモリのサイズです。この値が大きいほど、プロセスが多くのRAMを消費していることを意味します。
RSSの特徴
- 物理メモリ:RAM上に実際に存在するメモリ量。
- パフォーマンスへの影響:RSSが大きいと、システムの物理メモリが不足し、パフォーマンスが低下する可能性があります。
- 単位:通常はMB(メガバイト)やGB(ギガバイト)で表示されます。
VMS(Virtual Memory Size)
VMS は、プロセスが仮想メモリ上で確保しているメモリの総量を表します。仮想メモリは、物理メモリ(RAM)とディスク上のスワップ領域を組み合わせたものです。プロセスが使用するメモリの総量を示しますが、必ずしもすべてが物理メモリ上に存在するわけではありません。
VMSの特徴
- 仮想メモリ:物理メモリとスワップ領域を含む、プロセスが確保した全メモリ量。
- スワップの影響:VMSが物理メモリを超える場合、一部のデータはディスク上のスワップ領域に保存されます。これにより、アクセス速度が遅くなる可能性があります。
- 単位:通常はMB(メガバイト)やGB(ギガバイト)で表示されます。
1-2. RSSとVMSの違い
指標 | 説明 | 物理メモリ | スワップ領域 | パフォーマンスへの影響 |
---|---|---|---|---|
RSS | プロセスが実際に物理メモリ上に保持しているメモリ量 | 含む | 含まない | 直接的な影響あり |
VMS | プロセスが仮想メモリ上で確保しているメモリの総量 | 一部含む | 含む | 間接的な影響あり |
1-3. なぜRSSとVMSを監視するのか?
-
メモリ使用量の最適化:
- RSSを監視することで、プロセスが実際に使用している物理メモリ量を把握できます。これにより、メモリリークや過剰なメモリ使用を検出できます。
- VMSを監視することで、プロセスが確保している全メモリ量を確認できます。これにより、仮想メモリの使用状況を把握し、スワップ領域の使用を最小限に抑えることができます。
-
パフォーマンスの向上:
- RSSが大きすぎると、システムの物理メモリが不足し、パフォーマンスが低下する可能性があります。
- VMSが大きすぎると、スワップ領域が頻繁に使用され、ディスクI/Oが増加してパフォーマンスが低下する可能性があります。
-
リソース管理:
- RSSとVMSを監視することで、システム全体のメモリ使用状況を把握し、リソースの適切な割り当てを行うことができます。
1-4. メモリ使用量を監視するスクリプト
以下は、メモリ使用量を監視するためのPythonスクリプトです。
import psutil
import time
import os
import signal
import sys
import atexit
import json
current_pid = os.getpid()
print(f"現在のプロセスPID: {current_pid}")
monitored = sys.argv[1]
print(f"監視対象のプロセス: {monitored}")
target_pid = None
# 残留プロセスをクリア
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
if monitored in proc.info['cmdline'] and proc.pid != current_pid:
print(f"残留プロセス: {proc.pid}")
proc.kill()
print("残留プロセスをクリアしました")
print("\r\n")
print("指定されたプロセスを探しています。スループットテストスクリプトを起動し、テスト完了後このスクリプトを閉じてください:")
# ターゲットプロセスを探す
while not target_pid:
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
if monitored in proc.info['cmdline'] and proc.pid != current_pid:
target_pid = proc.pid
break
if not target_pid:
print("指定されたプロセスが見つかりません")
time.sleep(1)
print("\r\n")
print(f"指定されたプロセスPID: {target_pid}")
process = psutil.Process(target_pid)
process_cmdline = process.cmdline()
print(f"PIDに対応するプロセス名: {process_cmdline}")
max_memory_stats = {
"max_rss": 0,
"max_vms": 0
}
# メモリ使用量をJSONファイルに保存
def write_memory_max_to_json():
with open('memory_results.json', 'w') as report_file:
json.dump(max_memory_stats, report_file, indent=2)
print("\nメモリ使用量を 'memory_results.json' ファイルに保存しました。")
atexit.register(write_memory_max_to_json) # プロセス終了時に結果を保存
# メモリ使用量をリアルタイムで監視
while 1:
time.sleep(0.1)
mem_info = process.memory_info()
rss = mem_info.rss / 1024 ** 2
vms = mem_info.vms / 1024 ** 2
max_memory_stats["max_rss"] = max(max_memory_stats["max_rss"], rss)
max_memory_stats["max_vms"] = max(max_memory_stats["max_vms"], vms)
print(f"現在のRSSメモリ: {rss:.2f} MB | 現在のVMSメモリ: {vms:.2f} MB")
print(f"最大RSSメモリ: {max_memory_stats['max_rss']:.2f} MB | 最大VMSメモリ: {max_memory_stats['max_vms']:.2f} MB\n")
2. モデルのスループットをテストする
以下は、モデルのスループットをテストするためのPythonスクリプトです。
import torch
import time
from transformers import AutoModelForCausalLM, AutoTokenizer
import json
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1" # CPUを使用して推論
# トークナイザーとモデルをロード
print("トークナイザーをロード中\r\n")
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-0.5B", trust_remote_code=True)
print("モデルをロード中\r\n")
model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2.5-0.5B", device_map="auto", trust_remote_code=True).eval()
print("モデルのロードが完了しました\r\n")
# 指定されたテキストをプロンプトとしてロード
print("プロンプトをロード中\r\n")
with open('prompt.txt', 'r', encoding='utf-8') as file:
prompt = file.read()
# プロンプトをトークン化
print("トークナイザーでトークン化中\r\n")
inputs = tokenizer(prompt, return_tensors='pt')
inputs = inputs.to(model.device)
# Prefillフェーズのスループットテスト
print("prefillフェーズのスループットテスト中:\r\n")
batch_size = 1 # 単一のプロンプト、バッチサイズは1
total_prompts = 10 # 10回テスト
total_tokens = inputs['input_ids'].shape[1] # トークン数
start_time_prefill = time.time()
for _ in range(total_prompts):
with torch.no_grad(): # 推論性能向上のため勾配計算を無効化
outputs = model(**inputs)
end_time_prefill = time.time()
elapsed_time_prefill = end_time_prefill - start_time_prefill # 推論の総時間
throughput_prefill = total_prompts * total_tokens / elapsed_time_prefill # prefillスループット、1秒あたりのトークン数
print(f"トークン総数: {total_tokens}")
print(f"テスト回数: {total_prompts}")
print(f"総時間: {elapsed_time_prefill}")
print(f"モデルのprefillフェーズのスループット: {throughput_prefill:.2f} tokens/s\r\n")
print("prefillフェーズのスループットテストが完了しました\r\n")
# Decodeフェーズのスループットテスト
print("decodeフェーズのスループットテスト中:\r\n")
max_new_tokens = 50 # 生成する新しいトークンの総数
total_prompts = 10 # 10回テスト
start_time_decode = time.time()
for _ in range(total_prompts):
with torch.no_grad(): # 推論性能向上のため勾配計算を無効化
outputs = model.generate(**inputs, min_new_tokens=max_new_tokens, max_new_tokens=max_new_tokens)
print(f"新しいトークンが50生成されたことを確認してください {total_tokens} --> {outputs.shape}")
end_time_decode = time.time()
elapsed_time_decode = end_time_decode - start_time_decode # 推論の総時間
throughput_decode = total_prompts * max_new_tokens / elapsed_time_decode # decodeスループット、1秒あたりの新しいトークン数
print(f"生成された新しいトークン数: {max_new_tokens}")
print(f"テスト回数: {total_prompts}")
print(f"総時間: {elapsed_time_decode}")
print(f"モデルのdecodeフェーズのスループット: {throughput_decode:.2f} tokens/s\r\n")
print("decodeフェーズのスループットテストが完了しました\r\n")
# スループットテストの結果を保存
results = {
"prefill_throughput": throughput_prefill,
"decode_throughput": throughput_decode
}
with open('throughput_results.json', 'w') as output_file:
json.dump(results, output_file, indent=4)
print("\nスループットテストの結果を 'throughput_results.json' ファイルに保存しました。")
テストを実行する
-
まず、
monitor.py
スクリプトを実行し、監視対象のスクリプトとしてbenchmark.py
を指定します:python monitor.py benchmark.py
-
次に、新しいターミナルウィンドウを開き、
benchmark.py
スクリプトを実行します:python benchmark.py
-
benchmark.py
の実行が終了すると、monitor.py
も自動的に終了し、メモリ使用量とスループットテストの結果がそれぞれmemory_results.json
とthroughput_results.json
ファイルに保存されます。
まとめ
この記事では、モデルの推論性能をテストする方法を紹介しました。メモリ使用量やモデルのスループットを測定することで、モデルの実際のパフォーマンスをより深く理解することができます。このガイドが、今後のプロジェクトで役立つことを願っています。