はじめに
12月3日に、Sakana AIより以下の記事が投稿されました。
Sakana AIはLLMエージェントの集団を生み出す新技術「CycleQD」を開発しました。CycleQDは多様性に着目した進化的計算とモデルマージに基づいており、知識やスキルを忘却せず蓄積していく「生涯学習」の実現への第一歩です。
今回は、公開されているコードをいっしょに読み解いていきましょう。
前回の記事
メイン処理
main.pyはわずか500行弱のコードです。
処理の流れを見ていきましょう。
処理概要
CycleQDのメイン処理フローは、大きく4つに分かれています。
- 初期化フェーズ(ピンク色の領域)
- シードモデル評価フェーズ(緑色の領域)
- QD最適化ループ(青色の領域)
- クリーンアップフェーズ(オレンジ色の領域)
順に見ていきましょう。
1. 初期化フェーズ
- Hydraを使用して設定ファイルを読み込み、実行環境を設定します
- Workerクラスを初期化し、2種類の言語モデルをロード
- vLLM: bfloat16形式で高速推論用のモデル
- HuggingFace: パラメータ操作が可能なモデル
- QD(Quality Diversity)に関連するコンポーネントを初期化
- CMA-ES(共分散行列適応進化戦略)ソルバーを各タスクごとに初期化
2. シードモデル評価フェーズ
- 初期モデル群の性能を評価し、アーカイブの初期状態を構築
- 各タスクの評価指標に基づいてモデルの性能を測定
- 評価結果に基づいてアーカイブマップを構築(優秀なモデルを保存)
3. QD最適化ループ
指定された世代数だけ以下の処理を繰り返します。
- アーカイブから性能の良い親モデルペアを選択
- CMA-ESから最適化パラメータを取得
- 親モデルのパラメータを組み合わせて新しいモデルを生成(交配)
- 生成されたモデルにランダムな変異を加える(突然変異)
- 新しいモデルの性能を全タスクで評価
- 既存のアーカイブと比較して性能が向上している場合は保存
- 評価結果に基づいてCMA-ESのパラメータを更新
もう少し詳しく見て見ましょう。
QD最適化ループの詳細図
CycleQDには、以下の特徴があります。
- Quality Diversity (QD)フレームワークを活用
- タスクパフォーメトリクスを品質指標と行動特性として交互に使用
- モデルマージングによるクロスオーバーを採用
- SVDベースの変異を導入
使用ライブラリ
ライブラリを機能カテゴリごとに整理して表にまとめます。
カテゴリー | ライブラリ名 | 主な用途 |
---|---|---|
基本ライブラリ | os | ファイルシステム操作、環境変数管理 |
time | 時間計測、遅延処理 | |
re | 正規表現処理 | |
typing | 型ヒントの定義 | |
collections | deque、defaultdictなどのデータ構造 | |
数値計算・科学計算 | numpy | 数値計算、行列操作 |
torch | ディープラーニングフレームワーク | |
cma | CMA-ES進化戦略の実装 | |
機械学習関連 | vllm | 大規模言語モデルの高速推論 |
transformers | Hugging Faceのモデル操作 | |
wandb | 実験管理、ログ記録 | |
設定管理 | hydra | 構成管理、実験設定 |
omegaconf | 設定ファイルの管理(DictConfig, OmegaConf) | |
分散処理 | celery | 分散タスク処理 |
コメントつきコード
import os
import cma
import hydra
import logging
import wandb
import torch
import time
import re
import numpy as np
from typing import List, Tuple, Dict, Optional
from omegaconf import DictConfig, OmegaConf
from celery import Celery
from collections import deque
from vllm import LLM
from transformers import AutoModelForCausalLM
from collections import defaultdict
from tasks.base import BaseTask
from utils.celery_utils import setup_celery
from utils.helpers import (
load_hf_params_to_vllm, save_archive_map,
delete_outdated_models, plot_elite_map,
get_largest_gen_file, load_archive_map
)
from datatypes import (
ArchiveData, ModelEvalResult, MergeResult, QDInfo, TaskMetric
)
def load_task_configs(cfg: DictConfig) -> List[DictConfig]:
# Hydraのベースディレクトリを取得
hydra_base_dir = hydra.utils.get_original_cwd()
task_configs = {}
# 各タスクの設定ファイルを読み込む
for task_name in cfg.tasks:
task_config_path = os.path.join(
hydra_base_dir, "configs", "task", f"{task_name}.yaml")
task_config = OmegaConf.load(task_config_path)
task_configs[task_name] = task_config
return task_configs
class Worker(object):
def __init__(self, cfg: DictConfig):
# 初期化メソッド
self.cfg = cfg
self.logger = logging.getLogger("Worker")
# タスクの設定を読み込んでインスタンス化
self.tasks = [hydra.utils.instantiate(x) for x in load_task_configs(cfg).values()]
# LLM(大規模言語モデル)の初期化
self.llm = LLM(
cfg.base_model_path,
max_model_len=4096,
gpu_memory_utilization=0.8, # GPUメモリ使用率を80%に制限
dtype="bfloat16", # bfloat16形式で計算
enforce_eager=True,
)
# モデルのパラメータを凍結(勾配計算を無効化)
m = self.llm.llm_engine.driver_worker.model_runner.model
for _, param in m.named_parameters():
param.requires_grad = False
# QD(Quality Diversity)関連のコンポーネントを初期化
self.qd_sampler = hydra.utils.instantiate(cfg.qd.sampling) # サンプリング戦略
self.crossover = hydra.utils.instantiate(cfg.qd.crossover) # 交差戦略
self.mutator = hydra.utils.instantiate(cfg.qd.mutation) # 突然変異戦略
# Hugging Faceのモデルを読み込む
self.hf_model = AutoModelForCausalLM.from_pretrained(
cfg.base_model_path, torch_dtype=torch.bfloat16)
self.base_params = self.hf_model.state_dict()
# 各タスクに対してCMA-ESソルバーを初期化
self.use_cma = cfg.qd.use_cma
self.cma_popsize = cfg.cma_es.popsize
if self.use_cma:
self.cma_solvers = {}
self.cma_input_grid_sizes = {}
for task in self.tasks:
# 入力情報を取得
input_size, grid_sizes = self._get_input_info(task, self.tasks)
self.cma_input_grid_sizes[task.task_name] = grid_sizes
self.logger.info(
f"Init CMA for {task.task_name}: input_size={input_size}")
# 初期パラメータを設定
init_params = np.zeros(
input_size * (self.crossover.num_merge_params +
self.mutator.num_mutation_params)
)
# CMA-ESソルバーを初期化
self.cma_solvers[task.task_name] = cma.CMAEvolutionStrategy(
x0=init_params,
sigma0=cfg.cma_es.sigma,
inopts={
'popsize': cfg.cma_es.popsize,
'seed': cfg.cma_es.seed if cfg.cma_es.seed > 0 else 42,
'randn': np.random.randn,
},
)
# 各タスクの世代カウント、パラメータ、フィットネスを初期化
self.cma_gen_counts = {task.task_name: 0 for task in self.tasks}
self.cma_gen_params = {task.task_name: None for task in self.tasks}
self.cma_fitnesses = {task.task_name: [] for task in self.tasks}
def merge_models(
self,
q_name: str,
generation: int,
model_dir: str,
archive_map: Dict[str, Dict[Tuple[int], ArchiveData]]) -> MergeResult:
# 保存パスの設定
save_path=f"{model_dir}/gen_{generation}"
# シード値を更新
self.qd_sampler.update_seed(generation)
self.crossover.update_seed(generation)
self.mutator.update_seed(generation)
while True:
trial = 0
try:
# 親モデルをアーカイブからサンプリング
parent1, parent2 = self.qd_sampler.sample(archive_map[q_name])
# CMA-ESを使用する場合のパラメータ取得
if self.use_cma:
if self.cma_gen_counts[q_name] == 0:
self.cma_gen_params[q_name] = self.cma_solvers[q_name].ask()
input_data = (
np.array(parent1 + parent2) /
np.tile(self.cma_input_grid_sizes[q_name], 2)
)
ix = self.cma_gen_counts[q_name]
params = np.dot(
input_data,
self.cma_gen_params[q_name][ix].reshape((input_data.size, -1))
)
else:
params = None
# モデルをマージ
self.logger.info(f"Local Rank {os.environ['RANK']}. Merging models ...")
archive = archive_map[q_name]
child_param = self.crossover.merge(
self.base_params,
[archive[parent1].model_path, archive[parent2].model_path],
params[:self.crossover.num_merge_params] if self.use_cma else None,
)
self.logger.info(f"Local Rank {os.environ['RANK']}. Merge Success! Trial {trial}. parent 1: {archive[parent1].model_path}, parent 2: {archive[parent2].model_path}")
break
except:
self.logger.error(f"Local Rank {os.environ['RANK']}. Merge Failed: Trial {trial}. parent 1: {archive[parent1].model_path}, parent 2: {archive[parent2].model_path}")
trial += 1
# モデルを突然変異
self.logger.info(f"Local Rank {os.environ['RANK']}. Mutating models ...")
child_param = self.mutator.mutate(
child_param,
q_name,
params[self.crossover.num_merge_params:] if self.use_cma else None,
)
# モデルを評価
self.logger.info(f"Local Rank {os.environ['RANK']}. Evaluating models ...")
result = self._eval_model(child_param)
# 結果を解析
to_save = False
qd_info = {}
for k in archive_map:
q_val = result.task_metrics[k].quality
bc_ids = self._get_bc_ids(k, result.task_metrics)
qd_info[k] = QDInfo(task_name=k, quality=q_val, bc_ids=bc_ids)
if (
bc_ids not in archive_map[k] or
archive_map[k][bc_ids].quality < q_val
):
to_save = True
self.logger.info(f"Local Rank {os.environ['RANK']}. qd_info={qd_info}")
# アーカイブが更新された場合のみディスクに書き込む
if to_save:
self.hf_model.load_state_dict(child_param)
try:
self.hf_model.save_pretrained(save_path, safe_serialize=True)
except:
self.logger.error(f"Local Rank {os.environ['RANK']}. error save path {save_path}")
else:
save_path = None
# CMA-ESの更新
if self.use_cma:
self.cma_gen_counts[q_name] += 1
# フィットネスを計算(CMAは最小化するため逆にする)
fitness = (
np.sum(np.array(qd_info[q_name].bc_ids) /
self.cma_input_grid_sizes[q_name]) +
qd_info[q_name].quality
)
assert 0 <= fitness <= len(self.tasks)
self.cma_fitnesses[q_name].append(-fitness)
# 集団のフィットネスが収集された場合、CMAを更新
if self.cma_gen_counts[q_name] == self.cma_popsize:
assert len(self.cma_fitnesses[q_name]) == self.cma_popsize
self.cma_solvers[q_name].tell(
self.cma_gen_params[q_name], self.cma_fitnesses[q_name])
self.cma_gen_counts[q_name] = 0
self.cma_fitnesses[q_name] = []
self.logger.info(f"Local Rank {os.environ['RANK']}. Updated CMA for {q_name}.")
return MergeResult(
qd_info=qd_info,
save_path=save_path,
)
def _get_bc_ids(self,
q_name: str,
task_metrics: Dict[str, TaskMetric]) -> Tuple[int]:
# 行動特徴(Behavior Characteristic)IDを取得
bc_ids = ()
for k in task_metrics:
if k != q_name:
bc_ids += task_metrics[k].bc_ids
return bc_ids
def _get_input_info(self,
task: BaseTask,
tasks: List[BaseTask]) -> Tuple[int, List]:
# 入力情報を取得
target_task_name = task.task_name
input_size = 0
input_grid_sizes = []
for t in tasks:
if t.task_name != target_task_name:
input_size += t.bc_num_dims
input_grid_sizes.extend(t.bc_grid_sizes)
return input_size * 2, input_grid_sizes # 入力サイズとグリッドサイズを返す
def _eval_model(self, param: Dict, task_name: Optional[str] = None) -> ModelEvalResult:
# モデルを評価
load_hf_params_to_vllm(param, self.llm) # パラメータをvllmにロード
if task_name:
task_metrics = {
task.task_name: task.get_q_and_bc(self.llm)
for task in self.tasks if task.task_name == task_name
}
else:
task_metrics = {
task.task_name: task.get_q_and_bc(self.llm) for task in self.tasks
}
self.logger.info(f"Local Rank {os.environ['RANK']}. task_metrics={task_metrics}") # ログにタスクメトリクスを出力
return ModelEvalResult(
model_path=None,
task_metrics=task_metrics,
) # モデル評価結果を返す
def eval_model(self, model_path: str, task_name: Optional[str] = None) -> MergeResult:
# モデルをロードして評価
model_param = AutoModelForCausalLM.from_pretrained(
model_path, torch_dtype=torch.bfloat16).state_dict() # モデルのパラメータをロード
self.hf_model.load_state_dict(model_param) # モデルのパラメータを設定
result = self._eval_model(model_param, task_name) # モデルを評価
if task_name:
qd_info = {
task_name: QDInfo(
task_name=task_name,
quality=result.task_metrics[task_name].quality, # 評価結果のqualityを取得
bc_ids=self._get_bc_ids(task_name, result.task_metrics), # 評価結果のbc_idsを取得
)
}
else:
qd_info = {
task.task_name: QDInfo(
task_name=task.task_name,
quality=result.task_metrics[task.task_name].quality, # 評価結果のqualityを取得
bc_ids=self._get_bc_ids(task.task_name, result.task_metrics), # 評価結果のbc_idsを取得
) for task in self.tasks
}
return MergeResult(save_path=model_path, qd_info=qd_info) # 評価結果をMergeResultとして返す
def run_qd(celery: Celery, cfg: DictConfig) -> None:
if cfg.celery.mode in ["main", "solo"] and not cfg.qd.restart_dir:
# 出力ディレクトリの設定
output_dir = hydra.core.hydra_config.HydraConfig.get().runtime.output_dir
model_dir = os.path.join(output_dir, "models")
merged_model_dir = os.path.join(output_dir, "merged_models")
archive_dir = os.path.join(output_dir, "archives")
image_dir = os.path.join(output_dir, "images")
# 必要なディレクトリを作成
os.makedirs(model_dir, exist_ok=True)
os.makedirs(merged_model_dir, exist_ok=True)
os.makedirs(archive_dir, exist_ok=True)
os.makedirs(image_dir, exist_ok=True)
logger = logging.getLogger("QD")
logger.info(f"Run QD for {cfg.qd.num_generation} generations ...")
logger.info(f"seed models: {cfg.seed_model_path}")
call_fn = celery.tasks["call"]
all_tasks = cfg.tasks
q_idx = 0
q_name = all_tasks[q_idx]
archive_map: Dict[str, Dict[Tuple[int], ArchiveData]] = {
x: dict() for x in all_tasks
}
task_configs = load_task_configs(cfg)
if not cfg.qd.restart_dir:
# シードモデルでarchive_mapを初期化
promises = [
call_fn.delay("eval_model", model_path=model_path)
for i, model_path in enumerate(cfg.seed_model_path)
]
init_q = deque()
init_q.extend(promises) # promisesをinit_qに追加
init_results = []
while init_q:
promise = init_q.popleft() # キューからpromiseを取り出す
if promise.ready(): # promiseが準備できているか確認
init_results.append(promise.get(timeout=cfg.celery.timeout)) # 結果を取得してinit_resultsに追加
else:
init_q.append(promise) # 準備できていない場合は再度キューに追加
for result in init_results:
for target_q_name in all_tasks:
q_val = result.qd_info[target_q_name].quality # qualityの値を取得
bc_ids = result.qd_info[target_q_name].bc_ids # bc_idsを取得
if (
bc_ids not in archive_map[target_q_name] or # bc_idsがarchive_mapに存在しない場合
archive_map[target_q_name][bc_ids].quality < q_val # 既存のqualityよりも高い場合
):
archive_map[target_q_name][bc_ids] = ArchiveData(
model_path=result.save_path, # モデルのパスを保存
quality=q_val, # qualityの値を保存
sampling_freq=1, # サンプリング頻度を1に設定
validation_quality=None, # validation_qualityをNoneに設定
)
archive_map_path = f"{archive_dir}/gen0_archive_map.json" # アーカイブマップのパスを設定
save_archive_map(archive_map, archive_map_path) # アーカイブマップを保存
plot_elite_map(archive_map_path, task_configs, f"{image_dir}/gen0_elite_map_train.png", "train") # エリートマップをプロットして保存
gen = 1
else:
# 再開する場合の設定
output_dir = cfg.qd.restart_dir
model_dir = os.path.join(output_dir, "models")
archive_dir = os.path.join(output_dir, "archives")
image_dir = os.path.join(output_dir, "images")
merged_model_dir = os.path.join(output_dir, "merged_models")
largest_gen_file, largest_gen = get_largest_gen_file(archive_dir)
archive_map = load_archive_map(f"{archive_dir}/{largest_gen_file}", ArchiveData)
archive_map_path = f"{archive_dir}/gen0_archive_map.json"
plot_elite_map(archive_map_path, task_configs, f"{image_dir}/gen0_elite_map_train.png", "train")
gen = largest_gen + 1
q = deque()
for i in range(cfg.celery.num_workers - cfg.num_of_evaluation_workers):
q_idx = (q_idx + 1) % len(all_tasks) # q_idxを更新して次のタスクを選択
q_name = all_tasks[q_idx] # 現在のタスク名を取得
logger.info(f"QD switched to optimize {q_name}") # ログにタスク名を出力
q.append(
call_fn.delay(
"merge_models",
q_name=q_name,
generation=i + gen,
model_dir=model_dir,
archive_map=archive_map,
)
) # 非同期でmerge_modelsタスクをキューに追加
# QD最適化ループ
prev_log_time = time.time() # 前回のログ時間を記録
while gen < cfg.qd.num_generation + 1:
promise = q.popleft() # キューからpromiseを取り出す
if promise.ready(): # promiseが準備できているか確認
eval_gen = gen - 1 # 現在の世代を設定
logger.info(f"Generation {eval_gen} ...") # ログに世代を出力
archive_map_path = f"{archive_dir}/gen{eval_gen}_archive_map.json" # アーカイブマップのパスを設定
archive_train_path = f"{image_dir}/gen{eval_gen}_elite_map_train.png" # エリートマップのパスを設定
save_archive_map(archive_map, archive_map_path) # アーカイブマップを保存
plot_elite_map(archive_map_path, task_configs, archive_train_path, "train") # エリートマップをプロットして保存
log_archive_map = {}
for dataset, entries in archive_map.items():
log_archive_map[dataset] = {str(coordinates): data.quality for coordinates, data in entries.items()} # アーカイブマップをログ用に変換
current_time = time.time() # 現在の時間を取得
time_interval_minutes = (current_time - prev_log_time) # 前回のログ時間からの経過時間を計算
prev_log_time = current_time # 前回のログ時間を更新
wandb.log(
{
f"archive_map_image_train/archive_map_image": wandb.Image(archive_train_path), # エリートマップの画像をログ
f"all_elite_map/all_elite_map": log_archive_map, # 全エリートマップをログ
f"base_info/generation": eval_gen, # 世代情報をログ
f"base_info/gpu_num": cfg.celery.num_workers, # GPUの数をログ
f"base_info/log_interval_seconds": time_interval_minutes # ログ間隔をログ
},
step=eval_gen,
commit=False,
)
result = promise.get(timeout=cfg.celery.timeout) # promiseの結果を取得
archive_updated = False # アーカイブが更新されたかどうかのフラグ
if result.save_path is not None: # 結果に保存パスが含まれている場合
for k in archive_map:
q_val = result.qd_info[k].quality # qualityの値を取得
bc_ids = result.qd_info[k].bc_ids # bc_idsを取得
if (
bc_ids not in archive_map[k] or # bc_idsがarchive_mapに存在しない場合
archive_map[k][bc_ids].quality < q_val # 既存のqualityよりも高い場合
):
archive_map[k][bc_ids] = ArchiveData(
quality=q_val, # qualityの値を保存
model_path=result.save_path, # モデルのパスを保存
sampling_freq=1, # サンプリング頻度を1に設定
validation_quality=None, # validation_qualityをNoneに設定
)
archive_updated = True # アーカイブが更新されたことを記録
wandb.log(
{
f"save_path_true/save_path_true": 1 if result.save_path is not None else 0, # 保存パスがあるかどうかをログ
f"archive_map_updated/archive_map_updated": int(archive_updated), # アーカイブが更新されたかどうかをログ
},
step=eval_gen,
commit=True,
)
# 新しいタスクをキューに追加
if gen % cfg.qd.flip_interval == 0:
q_idx = (q_idx + 1) % len(all_tasks)
q_name = all_tasks[q_idx]
logger.info(f"QD switched to optimize {q_name}")
new_promise = call_fn.delay(
"merge_models",
q_name=q_name,
generation=gen+cfg.celery.num_workers-cfg.num_of_evaluation_workers,
model_dir=model_dir,
archive_map=archive_map,
)
q.append(new_promise)
if gen % cfg.disk_cleaning_interval == 0:
deleted_models = delete_outdated_models(archive_map, model_dir, gen-cfg.celery.num_workers*2)
for model in deleted_models:
logger.info(f"Deleted model: {model}")
gen += 1
else:
q.append(promise)
# クリーンアップ
deleted_models = delete_outdated_models(
archive_map, model_dir, cfg.qd.num_generation)
for model in deleted_models:
logger.info(f"Deleted model: {model}")
@hydra.main(version_base=None, config_path="configs", config_name="config")
def main(cfg: DictConfig):
# 設定内容を表示
print(OmegaConf.to_yaml(cfg))
def get_worker_cls(cfg):
# Workerクラスを初期化する関数を返す
def init_func():
return Worker(cfg)
return init_func
# Celeryのセットアップ
celery = setup_celery(
name=cfg.celery.name,
mode=cfg.celery.mode,
worker_cls=get_worker_cls(cfg),
)
# wandbの初期化
wandb.init(
entity=cfg.wandb.entity,
project=cfg.wandb.project,
name=f"{cfg.wandb.name}",
config=OmegaConf.to_container(cfg, resolve=True, throw_on_missing=True),
)
# QDアルゴリズムの実行
run_qd(celery, cfg)
if __name__ == "__main__":
# メイン関数の実行
main()
技術用語まとめ
CycleQDの主要な用語を機能カテゴリーごとに整理します。
カテゴリー | 用語 | 説明 |
---|---|---|
フレームワーク関連 | Hydra | Facebookが開発した構成管理フレームワーク。階層的な設定管理が可能 |
vLLM | 大規模言語モデルの高速推論フレームワーク。PagedAttentionで最適化 | |
HuggingFace | 自然言語処理のライブラリ・プラットフォーム。モデル操作が容易 | |
WandB (Weights & Biases) | 機械学習実験の追跡・可視化プラットフォーム | |
モデル関連 | bfloat16 | 16ビット浮動小数点数形式。省メモリで機械学習に最適 |
パラメータ | モデルの重みと偏り。モデルの動作を決定する数値 | |
エリートモデル | アーカイブに保存される性能の高いモデル | |
QD関連 | Quality Diversity (QD) | 多様性と品質を両立する最適化アルゴリズム |
アーカイブ | 優秀なモデルを保存するデータベース | |
行動特性 | モデルの多様性を特徴づける指標 | |
多様性指標 | モデル群の多様さを数値化した指標 | |
進化戦略関連 | CMA-ES | 共分散行列適応進化戦略。効率的なパラメータ探索を実現 |
サンプリング戦略 | 親モデルの選択方法(トーナメント選択、ランキング選択など) | |
交配(crossover) | 親モデルのパラメータを組み合わせて新モデルを生成 | |
突然変異(mutation) | モデルのパラメータにランダムな変更を加える操作 | |
最適化関連 | フィットネス | モデルの適応度を示す評価値 |
探索戦略 | パラメータ空間を効率的に探索する方法 | |
フリップ間隔 | タスクを切り替える世代間隔 | |
実装関連 | Worker | モデルの生成と評価を実行する処理単位 |
層別混合 | モデルの層ごとにパラメータを組み合わせる方法 | |
パラメータ摂動 | パラメータに小さな変化を加える操作 |
おわりに
Sakana AIの新技術「CycleQD」公開されているコードを読んでみました。
コードの解説、シーケンス図作成には、Claudeを使用しました。