はじめに
動画コンテンツが爆発的に増加する現代において、AIによる動画理解技術は極めて重要な分野となっています。今回紹介する LLaVA-NeXT-Video-7B-hf は、画像と動画の両方を理解し、自然言語で対話できる革新的なマルチモーダルAIモデルです。
このモデルは、静止画像のみで学習されたLLaVA-NeXTをベースに、動画理解能力を付加したもので、ゼロショット動画理解という画期的な能力を持ち、VideoMMEベンチマークでオープンソースモデル中の最高性能を記録しています。
LLaVA-NeXT-Videoとは何か?
基本アーキテクチャ
LLaVA-NeXT-Video-7B-hfは以下の構成要素からなる**Large Multimodal Model(LMM)**です:
- ベースLLM: Vicuna-7B-v1.5(7Bパラメータ)
- ビジョンエンコーダー: CLIP ViT
- 学習データ: 画像558K + 動画100K + テキスト指示データ
- 学習時期: 2024年4月
- ライセンス: LLAMA 2 Community License
革新的技術:AnyRes技術
AnyRes(Any Resolution)技術は、LLaVA-NeXT-Videoの核心技術です:
# AnyRes技術の概念的動作
def anyres_processing(high_res_image, target_patches=576):
"""
高解像度画像を複数の低解像度パッチに分割
ViTが処理可能な形式に変換
"""
# 画像を複数のグリッドに分割
patches = divide_image_to_patches(high_res_image, grid_size="dynamic")
# 各パッチをViTで処理
patch_features = []
for patch in patches:
features = vision_encoder(patch)
patch_features.append(features)
# パッチ特徴量を結合
concatenated_features = concatenate_patches(patch_features)
return concatenated_features
この技術により、動画フレームも画像の集合として処理できるため、画像のみで学習したモデルが動画タスクで優秀な性能を発揮します。
実装ガイド:基本的な使用方法
環境構築
# 必要なライブラリのインストール
pip install transformers>=4.42.0
pip install torch torchvision
pip install av # PyAV for video processing
pip install numpy pillow
基本的な動画推論
import av
import torch
import numpy as np
from huggingface_hub import hf_hub_download
from transformers import LlavaNextVideoProcessor, LlavaNextVideoForConditionalGeneration
from PIL import Image
import requests
class LLaVAVideoAnalyzer:
def __init__(self, model_id="llava-hf/LLaVA-NeXT-Video-7B-hf"):
self.model_id = model_id
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# モデルとプロセッサーの初期化
self.model = LlavaNextVideoForConditionalGeneration.from_pretrained(
model_id,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
low_cpu_mem_usage=True,
device_map="auto"
)
self.processor = LlavaNextVideoProcessor.from_pretrained(model_id)
print(f"Model loaded on {self.device}")
def read_video_pyav(self, container, indices):
"""
PyAVを使用してビデオフレームをデコード
"""
frames = []
container.seek(0)
start_index = indices[0]
end_index = indices[-1]
for i, frame in enumerate(container.decode(video=0)):
if i > end_index:
break
if i >= start_index and i in indices:
frames.append(frame)
return np.stack([x.to_ndarray(format="rgb24") for x in frames])
def analyze_video(self, video_path, question, num_frames=8):
"""
動画を分析して質問に答える
"""
try:
# 動画を開く
container = av.open(video_path)
total_frames = container.streams.video[0].frames
# フレームを均等にサンプリング
indices = np.linspace(0, total_frames-1, num_frames, dtype=int)
clip = self.read_video_pyav(container, indices)
# 会話形式のプロンプト作成
conversation = [
{
"role": "user",
"content": [
{"type": "text", "text": question},
{"type": "video"},
],
},
]
# チャットテンプレートを適用
prompt = self.processor.apply_chat_template(
conversation,
add_generation_prompt=True
)
# 入力を処理
inputs = self.processor(
text=prompt,
videos=clip,
padding=True,
return_tensors="pt"
).to(self.model.device)
# 推論実行
with torch.no_grad():
output = self.model.generate(
**inputs,
max_new_tokens=200,
do_sample=False,
temperature=0.7
)
# 結果をデコード
response = self.processor.decode(
output[0],
skip_special_tokens=True
)
# プロンプト部分を除去して回答のみ取得
answer_start = response.find("ASSISTANT:")
if answer_start != -1:
answer = response[answer_start + len("ASSISTANT:"):].strip()
else:
answer = response
return answer
except Exception as e:
return f"エラーが発生しました: {str(e)}"
finally:
if 'container' in locals():
container.close()
# 使用例
analyzer = LLaVAVideoAnalyzer()
# 動画ファイルを分析
video_path = "sample_video.mp4"
questions = [
"この動画で何が起きていますか?",
"動画の中で面白い部分はどこですか?",
"登場人物は何をしていますか?",
"この動画の雰囲気を説明してください"
]
for question in questions:
answer = analyzer.analyze_video(video_path, question)
print(f"Q: {question}")
print(f"A: {answer}\n")
画像とのマルチモーダル分析
def analyze_image_and_video(self, image_path, video_path, question):
"""
画像と動画を同時に分析
"""
try:
# 画像を読み込み
if image_path.startswith('http'):
image = Image.open(requests.get(image_path, stream=True).raw)
else:
image = Image.open(image_path)
# 動画を読み込み
container = av.open(video_path)
total_frames = container.streams.video[0].frames
indices = np.linspace(0, total_frames-1, 8, dtype=int)
video_clip = self.read_video_pyav(container, indices)
# マルチモーダル会話の構成
conversation = [
{
"role": "user",
"content": [
{"type": "text", "text": question},
{"type": "image"},
{"type": "video"},
],
},
]
prompt = self.processor.apply_chat_template(
conversation,
add_generation_prompt=True
)
# 入力処理
inputs = self.processor(
text=prompt,
images=image,
videos=video_clip,
padding=True,
return_tensors="pt"
).to(self.model.device)
# 推論実行
with torch.no_grad():
output = self.model.generate(
**inputs,
max_new_tokens=300,
do_sample=True,
temperature=0.7,
top_p=0.9
)
response = self.processor.decode(output[0], skip_special_tokens=True)
answer_start = response.find("ASSISTANT:")
return response[answer_start + len("ASSISTANT:"):].strip() if answer_start != -1 else response
except Exception as e:
return f"エラーが発生しました: {str(e)}"
finally:
if 'container' in locals():
container.close()
# LLaVAVideoAnalyzerクラスにメソッドを追加
LLaVAVideoAnalyzer.analyze_image_and_video = analyze_image_and_video
最適化テクニック
4bit量子化による高速化
class OptimizedLLaVAVideoAnalyzer(LLaVAVideoAnalyzer):
def __init__(self, model_id="llava-hf/LLaVA-NeXT-Video-7B-hf"):
self.model_id = model_id
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 4bit量子化の設定
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4"
)
# 量子化済みモデルの読み込み
self.model = LlavaNextVideoForConditionalGeneration.from_pretrained(
model_id,
quantization_config=quantization_config,
low_cpu_mem_usage=True,
torch_dtype=torch.float16
)
self.processor = LlavaNextVideoProcessor.from_pretrained(model_id)
print("量子化モデルが読み込まれました(VRAMを大幅に節約)")
# 使用例
optimized_analyzer = OptimizedLLaVAVideoAnalyzer()
Flash Attention 2による高速化
class FlashAttentionLLaVAAnalyzer(LLaVAVideoAnalyzer):
def __init__(self, model_id="llava-hf/LLaVA-NeXT-Video-7B-hf"):
self.model_id = model_id
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Flash Attention 2を使用
self.model = LlavaNextVideoForConditionalGeneration.from_pretrained(
model_id,
torch_dtype=torch.float16,
low_cpu_mem_usage=True,
use_flash_attention_2=True, # Flash Attention 2を有効化
device_map="auto"
)
self.processor = LlavaNextVideoProcessor.from_pretrained(model_id)
print("Flash Attention 2が有効化されました")
# 使用時は事前にflash-attnをインストール
# pip install flash-attn --no-build-isolation
実践的な応用例
動画コンテンツ自動分析システム
import os
import json
from datetime import datetime
from typing import Dict, List
import logging
class VideoContentAnalyzer:
def __init__(self, model_id="llava-hf/LLaVA-NeXT-Video-7B-hf"):
self.analyzer = OptimizedLLaVAVideoAnalyzer(model_id)
self.analysis_templates = {
'content_summary': "この動画の内容を詳しく要約してください。",
'key_moments': "この動画の重要な場面やハイライトを教えてください。",
'emotional_tone': "この動画の感情的なトーンや雰囲気を説明してください。",
'technical_quality': "この動画の撮影技術や品質について評価してください。",
'target_audience': "この動画はどのような視聴者をターゲットにしていると思いますか?",
'improvement_suggestions': "この動画をより良くするための提案があれば教えてください。"
}
def comprehensive_analysis(self, video_path: str) -> Dict:
"""
動画の包括的分析を実行
"""
results = {
'video_path': video_path,
'analysis_timestamp': datetime.now().isoformat(),
'analyses': {}
}
print(f"動画分析開始: {video_path}")
for analysis_type, question in self.analysis_templates.items():
print(f"実行中: {analysis_type}")
try:
answer = self.analyzer.analyze_video(video_path, question, num_frames=12)
results['analyses'][analysis_type] = {
'question': question,
'answer': answer,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
results['analyses'][analysis_type] = {
'question': question,
'error': str(e),
'timestamp': datetime.now().isoformat()
}
return results
def batch_analysis(self, video_directory: str, output_file: str = None):
"""
複数動画のバッチ分析
"""
supported_formats = ('.mp4', '.avi', '.mov', '.mkv', '.webm')
video_files = [
f for f in os.listdir(video_directory)
if f.lower().endswith(supported_formats)
]
batch_results = []
for i, video_file in enumerate(video_files, 1):
video_path = os.path.join(video_directory, video_file)
print(f"\n[{i}/{len(video_files)}] 処理中: {video_file}")
analysis_result = self.comprehensive_analysis(video_path)
batch_results.append(analysis_result)
# 結果を保存
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(batch_results, f, ensure_ascii=False, indent=2)
print(f"\n分析結果を保存しました: {output_file}")
return batch_results
def generate_report(self, analysis_results: List[Dict]) -> str:
"""
分析結果からレポートを生成
"""
report = []
report.append("# 動画コンテンツ分析レポート\n")
report.append(f"生成日時: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
report.append(f"分析対象動画数: {len(analysis_results)}\n")
for i, result in enumerate(analysis_results, 1):
report.append(f"\n## 動画 {i}: {os.path.basename(result['video_path'])}\n")
for analysis_type, analysis_data in result['analyses'].items():
report.append(f"### {analysis_type.replace('_', ' ').title()}\n")
if 'answer' in analysis_data:
report.append(f"{analysis_data['answer']}\n")
else:
report.append(f"エラー: {analysis_data.get('error', '不明なエラー')}\n")
return ''.join(report)
# 使用例
analyzer = VideoContentAnalyzer()
# 単一動画の分析
single_result = analyzer.comprehensive_analysis("sample_video.mp4")
print(json.dumps(single_result, ensure_ascii=False, indent=2))
# バッチ分析
batch_results = analyzer.batch_analysis("./videos/", "analysis_results.json")
# レポート生成
report = analyzer.generate_report(batch_results)
with open("analysis_report.md", "w", encoding="utf-8") as f:
f.write(report)
リアルタイム動画監視システム
import cv2
import threading
import queue
from typing import Callable
class RealTimeVideoMonitor:
def __init__(self, model_id="llava-hf/LLaVA-NeXT-Video-7B-hf"):
self.analyzer = LLaVAVideoAnalyzer(model_id)
self.frame_buffer = queue.Queue(maxsize=32) # フレームバッファ
self.analysis_queue = queue.Queue()
self.monitoring = False
def capture_frames(self, source=0, analysis_interval=5.0):
"""
フレームキャプチャスレッド
"""
cap = cv2.VideoCapture(source)
frame_count = 0
try:
while self.monitoring:
ret, frame = cap.read()
if not ret:
break
# フレームをバッファに追加
if not self.frame_buffer.full():
self.frame_buffer.put(frame)
frame_count += 1
# 指定間隔で分析実行
if frame_count % int(analysis_interval * cap.get(cv2.CAP_PROP_FPS)) == 0:
self.analyze_current_frames()
except Exception as e:
print(f"フレームキャプチャエラー: {e}")
finally:
cap.release()
def analyze_current_frames(self):
"""
現在のフレームバッファを分析
"""
if self.frame_buffer.empty():
return
# フレームを収集
frames = []
temp_frames = []
while not self.frame_buffer.empty() and len(frames) < 8:
frame = self.frame_buffer.get()
temp_frames.append(frame)
frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if frames:
# 分析スレッドに送信
threading.Thread(
target=self._analyze_frames_async,
args=(frames,),
daemon=True
).start()
def _analyze_frames_async(self, frames):
"""
非同期でフレーム分析を実行
"""
try:
# フレームを一時的に動画として保存
temp_video = "temp_analysis.mp4"
self._save_frames_as_video(frames, temp_video)
# 分析実行
questions = [
"現在何が起きていますか?",
"注意すべき異常な行動はありますか?",
"安全性に問題はありますか?"
]
analysis_result = {
'timestamp': datetime.now().isoformat(),
'analyses': {}
}
for question in questions:
answer = self.analyzer.analyze_video(temp_video, question)
analysis_result['analyses'][question] = answer
# 結果をキューに追加
self.analysis_queue.put(analysis_result)
# 一時ファイル削除
if os.path.exists(temp_video):
os.remove(temp_video)
except Exception as e:
print(f"分析エラー: {e}")
def _save_frames_as_video(self, frames, output_path, fps=8):
"""
フレームを動画ファイルとして保存
"""
if not frames:
return
height, width = frames[0].shape[:2]
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
for frame_rgb in frames:
frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)
out.write(frame_bgr)
out.release()
def start_monitoring(self, source=0, callback: Callable = None):
"""
監視開始
"""
self.monitoring = True
# キャプチャスレッド開始
capture_thread = threading.Thread(
target=self.capture_frames,
args=(source,),
daemon=True
)
capture_thread.start()
# 結果監視スレッド
if callback:
result_thread = threading.Thread(
target=self._monitor_results,
args=(callback,),
daemon=True
)
result_thread.start()
print("リアルタイム監視を開始しました...")
return capture_thread
def _monitor_results(self, callback: Callable):
"""
分析結果を監視してコールバック実行
"""
while self.monitoring:
try:
if not self.analysis_queue.empty():
result = self.analysis_queue.get(timeout=1)
callback(result)
except queue.Empty:
continue
except Exception as e:
print(f"結果監視エラー: {e}")
def stop_monitoring(self):
"""
監視停止
"""
self.monitoring = False
print("リアルタイム監視を停止しました")
# 使用例
def analysis_callback(result):
"""分析結果の処理"""
print(f"\n[{result['timestamp']}] 監視結果:")
for question, answer in result['analyses'].items():
print(f"Q: {question}")
print(f"A: {answer}\n")
# 監視システムの起動
monitor = RealTimeVideoMonitor()
monitor.start_monitoring(source=0, callback=analysis_callback) # Webカメラを使用
# 監視を一定時間実行
import time
time.sleep(60) # 60秒間監視
# 監視停止
monitor.stop_monitoring()
高度な機能とカスタマイズ
カスタム推論パイプライン
class CustomVideoInference:
def __init__(self, model_id="llava-hf/LLaVA-NeXT-Video-7B-hf"):
self.model = LlavaNextVideoForConditionalGeneration.from_pretrained(
model_id,
torch_dtype=torch.float16,
low_cpu_mem_usage=True,
device_map="auto"
)
self.processor = LlavaNextVideoProcessor.from_pretrained(model_id)
def advanced_video_analysis(
self,
video_path: str,
questions: List[str],
sampling_strategy: str = "uniform",
num_frames: int = 16,
max_tokens: int = 300,
temperature: float = 0.7,
top_p: float = 0.9
):
"""
高度な動画分析パイプライン
"""
# 動画読み込み
container = av.open(video_path)
total_frames = container.streams.video[0].frames
# フレームサンプリング戦略
if sampling_strategy == "uniform":
indices = np.linspace(0, total_frames-1, num_frames, dtype=int)
elif sampling_strategy == "key_frames":
# キーフレーム検出(簡略化版)
indices = self._detect_key_frames(container, num_frames)
elif sampling_strategy == "temporal_segments":
# 時間的セグメント
indices = self._temporal_segments(total_frames, num_frames)
else:
indices = np.linspace(0, total_frames-1, num_frames, dtype=int)
# フレーム読み込み
frames = self._read_video_frames(container, indices)
results = {}
for question in questions:
# 会話構成
conversation = [
{
"role": "user",
"content": [
{"type": "text", "text": question},
{"type": "video"}
]
}
]
prompt = self.processor.apply_chat_template(
conversation,
add_generation_prompt=True
)
inputs = self.processor(
text=prompt,
videos=frames,
return_tensors="pt"
).to(self.model.device)
# カスタム生成パラメータ
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
do_sample=True,
pad_token_id=self.processor.tokenizer.eos_token_id
)
response = self.processor.decode(outputs[0], skip_special_tokens=True)
answer = self._extract_answer(response)
results[question] = {
'answer': answer,
'sampling_strategy': sampling_strategy,
'num_frames': num_frames,
'generation_params': {
'temperature': temperature,
'top_p': top_p,
'max_tokens': max_tokens
}
}
container.close()
return results
def _detect_key_frames(self, container, num_frames):
"""キーフレーム検出(簡略版)"""
# 実際の実装では、より洗練されたキーフレーム検出アルゴリズムを使用
total_frames = container.streams.video[0].frames
segment_size = total_frames // num_frames
indices = []
for i in range(num_frames):
# 各セグメントの中央フレーム
center = i * segment_size + segment_size // 2
indices.append(min(center, total_frames - 1))
return np.array(indices)
def _temporal_segments(self, total_frames, num_frames):
"""時間的セグメントサンプリング"""
segments = np.array_split(np.arange(total_frames), num_frames)
return np.array([segment[len(segment)//2] for segment in segments])
def _read_video_frames(self, container, indices):
"""効率的なフレーム読み込み"""
frames = []
container.seek(0)
frame_dict = {}
for i, frame in enumerate(container.decode(video=0)):
if i in indices:
frame_dict[i] = frame.to_ndarray(format="rgb24")
if i > max(indices):
break
# インデックス順にフレームを配置
for idx in indices:
if idx in frame_dict:
frames.append(frame_dict[idx])
return np.stack(frames) if frames else None
def _extract_answer(self, response):
"""回答部分を抽出"""
answer_start = response.find("ASSISTANT:")
if answer_start != -1:
return response[answer_start + len("ASSISTANT:"):].strip()
return response
# 使用例
custom_analyzer = CustomVideoInference()
questions = [
"この動画の主な内容は何ですか?",
"技術的な品質はどうですか?",
"感情的なトーンを説明してください"
]
results = custom_analyzer.advanced_video_analysis(
"sample_video.mp4",
questions,
sampling_strategy="key_frames",
num_frames=20,
temperature=0.8
)
for question, result in results.items():
print(f"Q: {question}")
print(f"A: {result['answer']}")
print(f"Settings: {result['generation_params']}\n")
パフォーマンス測定とベンチマーク
import time
import psutil
import torch
from typing import Dict, List
class PerformanceBenchmark:
def __init__(self, model_id="llava-hf/LLaVA-NeXT-Video-7B-hf"):
self.analyzer = LLaVAVideoAnalyzer(model_id)
self.metrics = []
def benchmark_video_analysis(
self,
video_paths: List[str],
questions: List[str],
num_frames_list: List[int] = [8, 16, 32]
) -> Dict:
"""
動画分析のパフォーマンスベンチマーク
"""
results = {
'system_info': self._get_system_info(),
'benchmarks': []
}
for video_path in video_paths:
for num_frames in num_frames_list:
for question in questions:
benchmark_result = self._single_benchmark(
video_path, question, num_frames
)
results['benchmarks'].append(benchmark_result)
# 統計情報を計算
results['statistics'] = self._calculate_statistics(results['benchmarks'])
return results
def _single_benchmark(self, video_path: str, question: str, num_frames: int) -> Dict:
"""
単一の分析パフォーマンスを測定
"""
# GPU使用量の初期値
gpu_memory_start = self._get_gpu_memory() if torch.cuda.is_available() else 0
cpu_percent_start = psutil.cpu_percent()
memory_start = psutil.virtual_memory().percent
# 分析実行
start_time = time.time()
try:
answer = self.analyzer.analyze_video(video_path, question, num_frames)
success = True
error_message = None
except Exception as e:
answer = ""
success = False
error_message = str(e)
end_time = time.time()
inference_time = end_time - start_time
# リソース使用量の測定
gpu_memory_peak = self._get_gpu_memory() if torch.cuda.is_available() else 0
cpu_percent_end = psutil.cpu_percent()
memory_end = psutil.virtual_memory().percent
return {
'video_path': video_path,
'question': question,
'num_frames': num_frames,
'inference_time': inference_time,
'success': success,
'error_message': error_message,
'answer_length': len(answer) if answer else 0,
'performance_metrics': {
'cpu_usage_avg': (cpu_percent_start + cpu_percent_end) / 2,
'memory_usage_avg': (memory_start + memory_end) / 2,
'gpu_memory_start': gpu_memory_start,
'gpu_memory_peak': gpu_memory_peak,
'gpu_memory_used': gpu_memory_peak - gpu_memory_start
},
'timestamp': datetime.now().isoformat()
}
def _get_system_info(self) -> Dict:
"""
システム情報を取得
"""
system_info = {
'cpu_count': psutil.cpu_count(),
'cpu_freq': psutil.cpu_freq()._asdict() if psutil.cpu_freq() else {},
'memory_total_gb': psutil.virtual_memory().total / (1024**3),
'python_version': psutil.__version__,
'torch_version': torch.__version__,
}
if torch.cuda.is_available():
system_info.update({
'cuda_available': True,
'gpu_name': torch.cuda.get_device_name(0),
'gpu_memory_total': torch.cuda.get_device_properties(0).total_memory / (1024**3),
'cuda_version': torch.version.cuda
})
else:
system_info['cuda_available'] = False
return system_info
def _get_gpu_memory(self) -> float:
"""GPU メモリ使用量を取得(GB単位)"""
if torch.cuda.is_available():
return torch.cuda.memory_allocated(0) / (1024**3)
return 0.0
def _calculate_statistics(self, benchmarks: List[Dict]) -> Dict:
"""
ベンチマーク結果の統計を計算
"""
successful_benchmarks = [b for b in benchmarks if b['success']]
if not successful_benchmarks:
return {'error': 'No successful benchmarks'}
inference_times = [b['inference_time'] for b in successful_benchmarks]
answer_lengths = [b['answer_length'] for b in successful_benchmarks]
return {
'total_benchmarks': len(benchmarks),
'successful_benchmarks': len(successful_benchmarks),
'success_rate': len(successful_benchmarks) / len(benchmarks) * 100,
'inference_time': {
'min': min(inference_times),
'max': max(inference_times),
'avg': sum(inference_times) / len(inference_times),
'median': sorted(inference_times)[len(inference_times)//2]
},
'answer_length': {
'min': min(answer_lengths),
'max': max(answer_lengths),
'avg': sum(answer_lengths) / len(answer_lengths)
},
'frames_per_second': {
f'{b["num_frames"]}_frames': b["num_frames"] / b["inference_time"]
for b in successful_benchmarks
}
}
def generate_performance_report(self, benchmark_results: Dict) -> str:
"""
パフォーマンスレポートを生成
"""
report = []
report.append("# LLaVA-NeXT-Video パフォーマンス ベンチマーク レポート\n")
report.append(f"実行日時: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
# システム情報
sys_info = benchmark_results['system_info']
report.append("## システム情報\n")
report.append(f"- CPU: {sys_info['cpu_count']} cores")
if 'cpu_freq' in sys_info and sys_info['cpu_freq']:
report.append(f" @ {sys_info['cpu_freq'].get('max', 'Unknown')} MHz")
report.append(f"\n- RAM: {sys_info['memory_total_gb']:.1f} GB\n")
if sys_info['cuda_available']:
report.append(f"- GPU: {sys_info['gpu_name']}\n")
report.append(f"- VRAM: {sys_info['gpu_memory_total']:.1f} GB\n")
report.append(f"- CUDA Version: {sys_info['cuda_version']}\n")
else:
report.append("- GPU: Not available\n")
# 統計情報
stats = benchmark_results['statistics']
report.append("\n## パフォーマンス統計\n")
report.append(f"- 総実行回数: {stats['total_benchmarks']}\n")
report.append(f"- 成功回数: {stats['successful_benchmarks']}\n")
report.append(f"- 成功率: {stats['success_rate']:.1f}%\n")
report.append("\n### 推論時間\n")
inf_time = stats['inference_time']
report.append(f"- 最小: {inf_time['min']:.2f}秒\n")
report.append(f"- 最大: {inf_time['max']:.2f}秒\n")
report.append(f"- 平均: {inf_time['avg']:.2f}秒\n")
report.append(f"- 中央値: {inf_time['median']:.2f}秒\n")
return ''.join(report)
# 使用例
benchmark = PerformanceBenchmark()
# テスト用のビデオファイルと質問
test_videos = ["video1.mp4", "video2.mp4"]
test_questions = [
"この動画で何が起きていますか?",
"主な登場人物は誰ですか?"
]
# ベンチマーク実行
results = benchmark.benchmark_video_analysis(
test_videos,
test_questions,
num_frames_list=[8, 16, 24]
)
# レポート生成
report = benchmark.generate_performance_report(results)
print(report)
# 結果をJSONファイルに保存
with open("benchmark_results.json", "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
トラブルシューティングと最適化
よくある問題と解決策
class LLaVATroubleshooter:
@staticmethod
def check_system_requirements():
"""
システム要件をチェック
"""
requirements = {
'transformers_version': '4.42.0',
'minimum_ram_gb': 8,
'recommended_vram_gb': 12
}
issues = []
# Transformersバージョンチェック
import transformers
current_version = transformers.__version__
required_version = requirements['transformers_version']
if current_version < required_version:
issues.append(
f"Transformers version {required_version}以上が必要です。"
f"現在: {current_version}"
)
# RAM チェック
total_ram = psutil.virtual_memory().total / (1024**3)
if total_ram < requirements['minimum_ram_gb']:
issues.append(
f"最低{requirements['minimum_ram_gb']}GB以上のRAMが推奨されます。"
f"現在: {total_ram:.1f}GB"
)
# VRAM チェック
if torch.cuda.is_available():
total_vram = torch.cuda.get_device_properties(0).total_memory / (1024**3)
if total_vram < requirements['recommended_vram_gb']:
issues.append(
f"推奨VRAM: {requirements['recommended_vram_gb']}GB以上。"
f"現在: {total_vram:.1f}GB。4bit量子化の使用を検討してください。"
)
else:
issues.append("CUDAが利用できません。CPUモードで動作しますが、処理が遅くなります。")
if issues:
print("システム要件の問題:")
for issue in issues:
print(f"- {issue}")
else:
print("システム要件チェック: 問題なし")
return issues
@staticmethod
def optimize_for_limited_resources():
"""
リソース制約下での最適化設定
"""
optimization_tips = {
'low_vram': {
'use_4bit_quantization': True,
'enable_cpu_offload': True,
'reduce_max_tokens': 150,
'use_smaller_batch_size': True
},
'slow_cpu': {
'reduce_num_frames': 6,
'use_lower_resolution': True,
'disable_sampling': True
},
'limited_storage': {
'delete_temp_files': True,
'compress_video_input': True
}
}
print("リソース制約下での最適化提案:")
for constraint, tips in optimization_tips.items():
print(f"\n{constraint}の場合:")
for tip, enabled in tips.items():
if enabled:
print(f" ✓ {tip}")
return optimization_tips
@staticmethod
def debug_inference_issues(error_message: str):
"""
推論エラーのデバッグ支援
"""
common_issues = {
'CUDA out of memory': [
"4bit量子化を有効にする",
"num_framesを減らす(例:8 → 4)",
"max_new_tokensを減らす",
"バッチサイズを1にする",
"モデルの一部をCPUにオフロード"
],
'Video file not found': [
"ファイルパスを確認",
"サポートされている形式か確認(mp4, avi, mov, mkv, webm)",
"ファイルの読み取り権限を確認"
],
'Transformers version': [
"pip install transformers>=4.42.0でアップデート",
"仮想環境の確認",
"依存関係の競合チェック"
],
'Model loading failed': [
"インターネット接続を確認",
"Hugging Face Hubからの手動ダウンロードを試行",
"認証トークンが必要な場合は設定"
]
}
print(f"エラーメッセージ: {error_message}")
print("\n考えられる解決策:")
for issue_type, solutions in common_issues.items():
if issue_type.lower() in error_message.lower():
for solution in solutions:
print(f"- {solution}")
return solutions
print("- 一般的なトラブルシューティング手順を参照してください")
return []
# 使用例
troubleshooter = LLaVATroubleshooter()
# システムチェック
troubleshooter.check_system_requirements()
# 最適化提案
troubleshooter.optimize_for_limited_resources()
# エラーデバッグ
troubleshooter.debug_inference_issues("CUDA out of memory")
実用的な応用事例
YouTube動画分析ツール
import yt_dlp
import tempfile
import shutil
class YouTubeVideoAnalyzer:
def __init__(self, model_id="llava-hf/LLaVA-NeXT-Video-7B-hf"):
self.analyzer = OptimizedLLaVAVideoAnalyzer(model_id)
self.temp_dir = tempfile.mkdtemp()
def analyze_youtube_video(self, youtube_url: str, questions: List[str] = None):
"""
YouTube動画をダウンロードして分析
"""
if questions is None:
questions = [
"この動画の主な内容を要約してください。",
"視聴者にとって最も価値のある部分はどこですか?",
"この動画の品質や制作技術についてコメントしてください。"
]
try:
# YouTube動画をダウンロード
video_path = self._download_youtube_video(youtube_url)
# 動画情報を取得
video_info = self._get_video_info(youtube_url)
# 分析実行
analysis_results = {}
for question in questions:
answer = self.analyzer.analyze_video(video_path, question, num_frames=16)
analysis_results[question] = answer
return {
'video_info': video_info,
'analysis': analysis_results,
'youtube_url': youtube_url
}
except Exception as e:
return {'error': str(e)}
finally:
# 一時ファイルをクリーンアップ
self._cleanup_temp_files()
def _download_youtube_video(self, url: str) -> str:
"""
YouTube動画をダウンロード
"""
ydl_opts = {
'format': 'best[height<=720]', # 720p以下で最高品質
'outtmpl': f'{self.temp_dir}/%(id)s.%(ext)s',
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
filename = ydl.prepare_filename(info)
return filename
def _get_video_info(self, url: str) -> Dict:
"""
動画メタデータを取得
"""
ydl_opts = {'quiet': True}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
return {
'title': info.get('title', 'Unknown'),
'uploader': info.get('uploader', 'Unknown'),
'duration': info.get('duration', 0),
'view_count': info.get('view_count', 0),
'like_count': info.get('like_count', 0),
'upload_date': info.get('upload_date', 'Unknown')
}
def _cleanup_temp_files(self):
"""
一時ファイルをクリーンアップ
"""
try:
shutil.rmtree(self.temp_dir)
self.temp_dir = tempfile.mkdtemp()
except Exception as e:
print(f"クリーンアップエラー: {e}")
def batch_analyze_playlist(self, playlist_url: str) -> List[Dict]:
"""
プレイリスト内の動画を一括分析
"""
ydl_opts = {'quiet': True, 'extract_flat': True}
results = []
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
playlist_info = ydl.extract_info(playlist_url, download=False)
if 'entries' in playlist_info:
for entry in playlist_info['entries']:
if entry:
video_url = f"https://www.youtube.com/watch?v={entry['id']}"
print(f"分析中: {entry.get('title', 'Unknown')}")
analysis_result = self.analyze_youtube_video(video_url)
results.append(analysis_result)
return results
# 使用例(YouTube動画分析)
youtube_analyzer = YouTubeVideoAnalyzer()
# 単一動画の分析
result = youtube_analyzer.analyze_youtube_video(
"https://www.youtube.com/watch?v=dQw4w9WgXcQ",
[
"この動画の内容を説明してください。",
"どのような感情を呼び起こしますか?",
"技術的品質はどうですか?"
]
)
print(json.dumps(result, ensure_ascii=False, indent=2))
まとめ
LLaVA-NeXT-Video-7B-hfは、動画理解分野において革新的な成果を収めたオープンソースモデルです。本記事で紹介した技術的特徴と実装方法を活用することで、以下のような実用的なアプリケーションを構築できます:
主要な技術的利点
- ゼロショット動画理解: 画像のみで学習したモデルが動画タスクで優秀な性能を発揮
- AnyRes技術: 高解像度画像・動画を効率的に処理
- マルチモーダル対応: 画像と動画を同時に処理可能
- 効率的な推論: 最適化により実用的な速度を実現
実用化のポイント
- 4bit量子化によるメモリ使用量の大幅削減
- Flash Attention 2による推論速度向上
- バッチ処理による大量データの効率的な処理
- リアルタイム監視システムへの応用
応用分野
- コンテンツ制作支援
- 動画監視・セキュリティ
- 教育・eラーニング
- エンターテイメント産業
- 医療・研究分野
LLaVA-NeXT-Video-7B-hfは、動画AI技術の民主化を推進し、様々な分野でのイノベーションを可能にする強力なツールです。本記事のコード例を参考に、あなたのプロジェクトにぜひ活用してください。
