2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Sakana AIの新技術「CycleQD」のコードを読む

Posted at

はじめに

12月3日に、Sakana AIより以下の記事が投稿されました。

Sakana AIはLLMエージェントの集団を生み出す新技術「CycleQD」を開発しました。CycleQDは多様性に着目した進化的計算とモデルマージに基づいており、知識やスキルを忘却せず蓄積していく「生涯学習」の実現への第一歩です。

今回は、公開されているコードをいっしょに読み解いていきましょう。

前回の記事

メイン処理

main.pyはわずか500行弱のコードです。

処理の流れを見ていきましょう。

処理概要

CycleQDのメイン処理フローは、大きく4つに分かれています。

  1. 初期化フェーズ(ピンク色の領域)
  2. シードモデル評価フェーズ(緑色の領域)
  3. QD最適化ループ(青色の領域)
  4. クリーンアップフェーズ(オレンジ色の領域)

順に見ていきましょう。

1. 初期化フェーズ

image.png

  • Hydraを使用して設定ファイルを読み込み、実行環境を設定します
  • Workerクラスを初期化し、2種類の言語モデルをロード
    • vLLM: bfloat16形式で高速推論用のモデル
    • HuggingFace: パラメータ操作が可能なモデル
  • QD(Quality Diversity)に関連するコンポーネントを初期化
  • CMA-ES(共分散行列適応進化戦略)ソルバーを各タスクごとに初期化

2. シードモデル評価フェーズ

image.png

  • 初期モデル群の性能を評価し、アーカイブの初期状態を構築
  • 各タスクの評価指標に基づいてモデルの性能を測定
  • 評価結果に基づいてアーカイブマップを構築(優秀なモデルを保存)

3. QD最適化ループ

指定された世代数だけ以下の処理を繰り返します。

  1. アーカイブから性能の良い親モデルペアを選択
  2. CMA-ESから最適化パラメータを取得
  3. 親モデルのパラメータを組み合わせて新しいモデルを生成(交配)
  4. 生成されたモデルにランダムな変異を加える(突然変異)
  5. 新しいモデルの性能を全タスクで評価
  6. 既存のアーカイブと比較して性能が向上している場合は保存
  7. 評価結果に基づいてCMA-ESのパラメータを更新

もう少し詳しく見て見ましょう。

QD最適化ループの詳細図

CycleQDには、以下の特徴があります。

  • Quality Diversity (QD)フレームワークを活用
  • タスクパフォーメトリクスを品質指標と行動特性として交互に使用
  • モデルマージングによるクロスオーバーを採用
  • SVDベースの変異を導入

使用ライブラリ

ライブラリを機能カテゴリごとに整理して表にまとめます。

カテゴリー ライブラリ名 主な用途
基本ライブラリ os ファイルシステム操作、環境変数管理
time 時間計測、遅延処理
re 正規表現処理
typing 型ヒントの定義
collections deque、defaultdictなどのデータ構造
数値計算・科学計算 numpy 数値計算、行列操作
torch ディープラーニングフレームワーク
cma CMA-ES進化戦略の実装
機械学習関連 vllm 大規模言語モデルの高速推論
transformers Hugging Faceのモデル操作
wandb 実験管理、ログ記録
設定管理 hydra 構成管理、実験設定
omegaconf 設定ファイルの管理(DictConfig, OmegaConf)
分散処理 celery 分散タスク処理

コメントつきコード


import os
import cma
import hydra
import logging
import wandb
import torch
import time
import re
import numpy as np
from typing import List, Tuple, Dict, Optional
from omegaconf import DictConfig, OmegaConf
from celery import Celery
from collections import deque
from vllm import LLM
from transformers import AutoModelForCausalLM
from collections import defaultdict

from tasks.base import BaseTask
from utils.celery_utils import setup_celery
from utils.helpers import (
    load_hf_params_to_vllm, save_archive_map,
    delete_outdated_models, plot_elite_map,
    get_largest_gen_file, load_archive_map
)
from datatypes import (
    ArchiveData, ModelEvalResult, MergeResult, QDInfo, TaskMetric
)


def load_task_configs(cfg: DictConfig) -> List[DictConfig]:
    # Hydraのベースディレクトリを取得
    hydra_base_dir = hydra.utils.get_original_cwd()
    task_configs = {}
    # 各タスクの設定ファイルを読み込む
    for task_name in cfg.tasks:
        task_config_path = os.path.join(
            hydra_base_dir, "configs", "task", f"{task_name}.yaml")
        task_config = OmegaConf.load(task_config_path)
        task_configs[task_name] = task_config
    return task_configs

class Worker(object):

    def __init__(self, cfg: DictConfig):
        # 初期化メソッド
        self.cfg = cfg
        self.logger = logging.getLogger("Worker")
        # タスクの設定を読み込んでインスタンス化
        self.tasks = [hydra.utils.instantiate(x) for x in load_task_configs(cfg).values()]

        # LLM(大規模言語モデル)の初期化
        self.llm = LLM(
            cfg.base_model_path,
            max_model_len=4096,
            gpu_memory_utilization=0.8,  # GPUメモリ使用率を80%に制限
            dtype="bfloat16",  # bfloat16形式で計算
            enforce_eager=True,
        )
        # モデルのパラメータを凍結(勾配計算を無効化)
        m = self.llm.llm_engine.driver_worker.model_runner.model
        for _, param in m.named_parameters():
            param.requires_grad = False

        # QD(Quality Diversity)関連のコンポーネントを初期化
        self.qd_sampler = hydra.utils.instantiate(cfg.qd.sampling)  # サンプリング戦略
        self.crossover = hydra.utils.instantiate(cfg.qd.crossover)  # 交差戦略
        self.mutator = hydra.utils.instantiate(cfg.qd.mutation)  # 突然変異戦略
        # Hugging Faceのモデルを読み込む
        self.hf_model = AutoModelForCausalLM.from_pretrained(
            cfg.base_model_path, torch_dtype=torch.bfloat16)
        self.base_params = self.hf_model.state_dict()

        # 各タスクに対してCMA-ESソルバーを初期化
        self.use_cma = cfg.qd.use_cma
        self.cma_popsize = cfg.cma_es.popsize
        if self.use_cma:
            self.cma_solvers = {}
            self.cma_input_grid_sizes = {}
            for task in self.tasks:
                # 入力情報を取得
                input_size, grid_sizes = self._get_input_info(task, self.tasks)
                self.cma_input_grid_sizes[task.task_name] = grid_sizes
                self.logger.info(
                    f"Init CMA for {task.task_name}: input_size={input_size}")
                # 初期パラメータを設定
                init_params = np.zeros(
                    input_size * (self.crossover.num_merge_params +
                                  self.mutator.num_mutation_params)
                )
                # CMA-ESソルバーを初期化
                self.cma_solvers[task.task_name] = cma.CMAEvolutionStrategy(
                    x0=init_params,
                    sigma0=cfg.cma_es.sigma,
                    inopts={
                        'popsize': cfg.cma_es.popsize,
                        'seed': cfg.cma_es.seed if cfg.cma_es.seed > 0 else 42,
                        'randn': np.random.randn,
                    },
                )
            # 各タスクの世代カウント、パラメータ、フィットネスを初期化
            self.cma_gen_counts = {task.task_name: 0 for task in self.tasks}
            self.cma_gen_params = {task.task_name: None for task in self.tasks}
            self.cma_fitnesses = {task.task_name: [] for task in self.tasks}


    def merge_models(
            self,
            q_name: str,
            generation: int,
            model_dir: str,
            archive_map: Dict[str, Dict[Tuple[int], ArchiveData]]) -> MergeResult:
    
            # 保存パスの設定
            save_path=f"{model_dir}/gen_{generation}"
            # シード値を更新
            self.qd_sampler.update_seed(generation)
            self.crossover.update_seed(generation)
            self.mutator.update_seed(generation)
    
            while True:
                trial = 0
                try:
                    # 親モデルをアーカイブからサンプリング
                    parent1, parent2 = self.qd_sampler.sample(archive_map[q_name])
    
                    # CMA-ESを使用する場合のパラメータ取得
                    if self.use_cma:
                        if self.cma_gen_counts[q_name] == 0:
                            self.cma_gen_params[q_name] = self.cma_solvers[q_name].ask()
                        input_data = (
                            np.array(parent1 + parent2) /
                            np.tile(self.cma_input_grid_sizes[q_name], 2)
                        )
                        ix = self.cma_gen_counts[q_name]
                        params = np.dot(
                            input_data,
                            self.cma_gen_params[q_name][ix].reshape((input_data.size, -1))
                        )
                    else:
                        params = None
    
                    # モデルをマージ
                    self.logger.info(f"Local Rank {os.environ['RANK']}. Merging models ...")
                    archive = archive_map[q_name]
                    child_param = self.crossover.merge(
                        self.base_params,
                        [archive[parent1].model_path, archive[parent2].model_path],
                        params[:self.crossover.num_merge_params] if self.use_cma else None,
                    )
                    self.logger.info(f"Local Rank {os.environ['RANK']}. Merge Success! Trial {trial}. parent 1: {archive[parent1].model_path}, parent 2: {archive[parent2].model_path}")
                    break
                except:
                    self.logger.error(f"Local Rank {os.environ['RANK']}. Merge Failed: Trial {trial}. parent 1: {archive[parent1].model_path}, parent 2: {archive[parent2].model_path}")
                    trial += 1
    
            # モデルを突然変異
            self.logger.info(f"Local Rank {os.environ['RANK']}. Mutating models ...")
            child_param = self.mutator.mutate(
                child_param,
                q_name,
                params[self.crossover.num_merge_params:] if self.use_cma else None,
            )
            # モデルを評価
            self.logger.info(f"Local Rank {os.environ['RANK']}. Evaluating models ...")
            result = self._eval_model(child_param)
    
            # 結果を解析
            to_save = False
            qd_info = {}
            for k in archive_map:
                q_val = result.task_metrics[k].quality
                bc_ids = self._get_bc_ids(k, result.task_metrics)
                qd_info[k] = QDInfo(task_name=k, quality=q_val, bc_ids=bc_ids)
                if (
                    bc_ids not in archive_map[k] or
                    archive_map[k][bc_ids].quality < q_val
                ):
                    to_save = True
            self.logger.info(f"Local Rank {os.environ['RANK']}. qd_info={qd_info}")
    
            # アーカイブが更新された場合のみディスクに書き込む
            if to_save:
                self.hf_model.load_state_dict(child_param)
                try:
                    self.hf_model.save_pretrained(save_path, safe_serialize=True)
                except:
                    self.logger.error(f"Local Rank {os.environ['RANK']}. error save path {save_path}")
            else:
                save_path = None
    
            # CMA-ESの更新
            if self.use_cma:
                self.cma_gen_counts[q_name] += 1
                # フィットネスを計算(CMAは最小化するため逆にする)
                fitness = (
                    np.sum(np.array(qd_info[q_name].bc_ids) /
                           self.cma_input_grid_sizes[q_name]) +
                    qd_info[q_name].quality
                )
                assert 0 <= fitness <= len(self.tasks)
                self.cma_fitnesses[q_name].append(-fitness)
                # 集団のフィットネスが収集された場合、CMAを更新
                if self.cma_gen_counts[q_name] == self.cma_popsize:
                    assert len(self.cma_fitnesses[q_name]) == self.cma_popsize
                    self.cma_solvers[q_name].tell(
                        self.cma_gen_params[q_name], self.cma_fitnesses[q_name])
                    self.cma_gen_counts[q_name] = 0
                    self.cma_fitnesses[q_name] = []
                    self.logger.info(f"Local Rank {os.environ['RANK']}. Updated CMA for {q_name}.")
    
            return MergeResult(
                qd_info=qd_info,
                save_path=save_path,
            )

    def _get_bc_ids(self,
                    q_name: str,
                    task_metrics: Dict[str, TaskMetric]) -> Tuple[int]:
        # 行動特徴(Behavior Characteristic)IDを取得
        bc_ids = ()
        for k in task_metrics:
            if k != q_name:
                bc_ids += task_metrics[k].bc_ids
        return bc_ids
    
    def _get_input_info(self,
                        task: BaseTask,
                        tasks: List[BaseTask]) -> Tuple[int, List]:
        # 入力情報を取得
        target_task_name = task.task_name
        input_size = 0
        input_grid_sizes = []
        for t in tasks:
            if t.task_name != target_task_name:
                input_size += t.bc_num_dims
                input_grid_sizes.extend(t.bc_grid_sizes)
        return input_size * 2, input_grid_sizes  # 入力サイズとグリッドサイズを返す
    
    def _eval_model(self, param: Dict, task_name: Optional[str] = None) -> ModelEvalResult:
        # モデルを評価
        load_hf_params_to_vllm(param, self.llm)  # パラメータをvllmにロード
        if task_name:
            task_metrics = {
                task.task_name: task.get_q_and_bc(self.llm) 
                for task in self.tasks if task.task_name == task_name
            }
        else:
            task_metrics = {
                task.task_name: task.get_q_and_bc(self.llm) for task in self.tasks
            }
        self.logger.info(f"Local Rank {os.environ['RANK']}. task_metrics={task_metrics}")  # ログにタスクメトリクスを出力
        return ModelEvalResult(
            model_path=None,
            task_metrics=task_metrics,
        )  # モデル評価結果を返す
    
    def eval_model(self, model_path: str, task_name: Optional[str] = None) -> MergeResult:
        # モデルをロードして評価
        model_param = AutoModelForCausalLM.from_pretrained(
            model_path, torch_dtype=torch.bfloat16).state_dict()  # モデルのパラメータをロード
        self.hf_model.load_state_dict(model_param)  # モデルのパラメータを設定
        result = self._eval_model(model_param, task_name)  # モデルを評価
    
        if task_name:
            qd_info = {
                task_name: QDInfo(
                    task_name=task_name,
                    quality=result.task_metrics[task_name].quality,  # 評価結果のqualityを取得
                    bc_ids=self._get_bc_ids(task_name, result.task_metrics),  # 評価結果のbc_idsを取得
                )
            }
        else:
            qd_info = {
                task.task_name: QDInfo(
                    task_name=task.task_name,
                    quality=result.task_metrics[task.task_name].quality,  # 評価結果のqualityを取得
                    bc_ids=self._get_bc_ids(task.task_name, result.task_metrics),  # 評価結果のbc_idsを取得
                ) for task in self.tasks
            }
        return MergeResult(save_path=model_path, qd_info=qd_info)  # 評価結果をMergeResultとして返す
    
def run_qd(celery: Celery, cfg: DictConfig) -> None:

    if cfg.celery.mode in ["main", "solo"] and not cfg.qd.restart_dir:
        # 出力ディレクトリの設定
        output_dir = hydra.core.hydra_config.HydraConfig.get().runtime.output_dir
        model_dir = os.path.join(output_dir, "models")
        merged_model_dir = os.path.join(output_dir, "merged_models")
        archive_dir = os.path.join(output_dir, "archives")
        image_dir = os.path.join(output_dir, "images")
        # 必要なディレクトリを作成
        os.makedirs(model_dir, exist_ok=True)
        os.makedirs(merged_model_dir, exist_ok=True)
        os.makedirs(archive_dir, exist_ok=True)
        os.makedirs(image_dir, exist_ok=True)

    logger = logging.getLogger("QD")
    logger.info(f"Run QD for {cfg.qd.num_generation} generations ...")
    logger.info(f"seed models: {cfg.seed_model_path}")

    call_fn = celery.tasks["call"]
    all_tasks = cfg.tasks
    q_idx = 0
    q_name = all_tasks[q_idx]
    archive_map: Dict[str, Dict[Tuple[int], ArchiveData]] = {
        x: dict() for x in all_tasks
    }
    task_configs = load_task_configs(cfg)

    if not cfg.qd.restart_dir:
        # シードモデルでarchive_mapを初期化
        promises = [
            call_fn.delay("eval_model", model_path=model_path)
            for i, model_path in enumerate(cfg.seed_model_path)
        ]

        init_q = deque()
        init_q.extend(promises)  # promisesをinit_qに追加
        init_results = []
        while init_q:
            promise = init_q.popleft()  # キューからpromiseを取り出す
            if promise.ready():  # promiseが準備できているか確認
                init_results.append(promise.get(timeout=cfg.celery.timeout))  # 結果を取得してinit_resultsに追加
            else:
                init_q.append(promise)  # 準備できていない場合は再度キューに追加
        
        for result in init_results:
            for target_q_name in all_tasks:
                q_val = result.qd_info[target_q_name].quality  # qualityの値を取得
                bc_ids = result.qd_info[target_q_name].bc_ids  # bc_idsを取得
                if (
                    bc_ids not in archive_map[target_q_name] or  # bc_idsがarchive_mapに存在しない場合
                    archive_map[target_q_name][bc_ids].quality < q_val  # 既存のqualityよりも高い場合
                ):
                    archive_map[target_q_name][bc_ids] = ArchiveData(
                        model_path=result.save_path,  # モデルのパスを保存
                        quality=q_val,  # qualityの値を保存
                        sampling_freq=1,  # サンプリング頻度を1に設定
                        validation_quality=None,  # validation_qualityをNoneに設定
                    )
        archive_map_path = f"{archive_dir}/gen0_archive_map.json"  # アーカイブマップのパスを設定
        save_archive_map(archive_map, archive_map_path)  # アーカイブマップを保存
        plot_elite_map(archive_map_path, task_configs, f"{image_dir}/gen0_elite_map_train.png", "train")  # エリートマップをプロットして保存
        gen = 1
    else:
        # 再開する場合の設定
        output_dir = cfg.qd.restart_dir
        model_dir = os.path.join(output_dir, "models")
        archive_dir = os.path.join(output_dir, "archives")
        image_dir = os.path.join(output_dir, "images")
        merged_model_dir = os.path.join(output_dir, "merged_models")
        largest_gen_file, largest_gen = get_largest_gen_file(archive_dir)
        archive_map = load_archive_map(f"{archive_dir}/{largest_gen_file}", ArchiveData)
        archive_map_path = f"{archive_dir}/gen0_archive_map.json"
        plot_elite_map(archive_map_path, task_configs, f"{image_dir}/gen0_elite_map_train.png", "train")
        gen = largest_gen + 1

    q = deque()
    for i in range(cfg.celery.num_workers - cfg.num_of_evaluation_workers):
        q_idx = (q_idx + 1) % len(all_tasks)  # q_idxを更新して次のタスクを選択
        q_name = all_tasks[q_idx]  # 現在のタスク名を取得
        logger.info(f"QD switched to optimize {q_name}")  # ログにタスク名を出力
        q.append(
            call_fn.delay(
                "merge_models",
                q_name=q_name,
                generation=i + gen,
                model_dir=model_dir,
                archive_map=archive_map,
            )
        )  # 非同期でmerge_modelsタスクをキューに追加

    # QD最適化ループ
    prev_log_time = time.time()  # 前回のログ時間を記録
    while gen < cfg.qd.num_generation + 1:
        promise = q.popleft()  # キューからpromiseを取り出す
        if promise.ready():  # promiseが準備できているか確認
            eval_gen = gen - 1  # 現在の世代を設定
            logger.info(f"Generation {eval_gen} ...")  # ログに世代を出力
            archive_map_path = f"{archive_dir}/gen{eval_gen}_archive_map.json"  # アーカイブマップのパスを設定
    
            archive_train_path = f"{image_dir}/gen{eval_gen}_elite_map_train.png"  # エリートマップのパスを設定
            save_archive_map(archive_map, archive_map_path)  # アーカイブマップを保存
            plot_elite_map(archive_map_path, task_configs, archive_train_path, "train")  # エリートマップをプロットして保存
    
            log_archive_map = {}
            for dataset, entries in archive_map.items():
                log_archive_map[dataset] = {str(coordinates): data.quality for coordinates, data in entries.items()}  # アーカイブマップをログ用に変換
    
            current_time = time.time()  # 現在の時間を取得
            time_interval_minutes = (current_time - prev_log_time)  # 前回のログ時間からの経過時間を計算
            prev_log_time = current_time  # 前回のログ時間を更新
    
            wandb.log(
                {
                    f"archive_map_image_train/archive_map_image": wandb.Image(archive_train_path),  # エリートマップの画像をログ
                    f"all_elite_map/all_elite_map": log_archive_map,  # 全エリートマップをログ
                    f"base_info/generation": eval_gen,  # 世代情報をログ
                    f"base_info/gpu_num": cfg.celery.num_workers,  # GPUの数をログ
                    f"base_info/log_interval_seconds": time_interval_minutes  # ログ間隔をログ
                },
                step=eval_gen,
                commit=False,
            )
    
            result = promise.get(timeout=cfg.celery.timeout)  # promiseの結果を取得
            archive_updated = False  # アーカイブが更新されたかどうかのフラグ
            if result.save_path is not None:  # 結果に保存パスが含まれている場合
                for k in archive_map:
                    q_val = result.qd_info[k].quality  # qualityの値を取得
                    bc_ids = result.qd_info[k].bc_ids  # bc_idsを取得
                    if (
                        bc_ids not in archive_map[k] or  # bc_idsがarchive_mapに存在しない場合
                        archive_map[k][bc_ids].quality < q_val  # 既存のqualityよりも高い場合
                    ):
                        archive_map[k][bc_ids] = ArchiveData(
                            quality=q_val,  # qualityの値を保存
                            model_path=result.save_path,  # モデルのパスを保存
                            sampling_freq=1,  # サンプリング頻度を1に設定
                            validation_quality=None,  # validation_qualityをNoneに設定
                        )
                        archive_updated = True  # アーカイブが更新されたことを記録
    
            wandb.log(
                {
                    f"save_path_true/save_path_true": 1 if result.save_path is not None else 0,  # 保存パスがあるかどうかをログ
                    f"archive_map_updated/archive_map_updated": int(archive_updated),  # アーカイブが更新されたかどうかをログ
                },
                step=eval_gen,
                commit=True,
            )

            # 新しいタスクをキューに追加
            if gen % cfg.qd.flip_interval == 0:
                q_idx =  (q_idx + 1) % len(all_tasks)
                q_name = all_tasks[q_idx]
                logger.info(f"QD switched to optimize {q_name}")

            new_promise = call_fn.delay(
                "merge_models",
                q_name=q_name,
                generation=gen+cfg.celery.num_workers-cfg.num_of_evaluation_workers,
                model_dir=model_dir,
                archive_map=archive_map,
            )
            q.append(new_promise)

            if gen % cfg.disk_cleaning_interval == 0:
                deleted_models = delete_outdated_models(archive_map, model_dir, gen-cfg.celery.num_workers*2)
                for model in deleted_models:
                    logger.info(f"Deleted model: {model}")

            gen += 1
        else:
            q.append(promise)

    # クリーンアップ
    deleted_models = delete_outdated_models(
        archive_map, model_dir, cfg.qd.num_generation)
    for model in deleted_models:
        logger.info(f"Deleted model: {model}")


@hydra.main(version_base=None, config_path="configs", config_name="config")
def main(cfg: DictConfig):
    # 設定内容を表示
    print(OmegaConf.to_yaml(cfg))

    def get_worker_cls(cfg):
        # Workerクラスを初期化する関数を返す
        def init_func():
            return Worker(cfg)
        return init_func

    # Celeryのセットアップ
    celery = setup_celery(
        name=cfg.celery.name,
        mode=cfg.celery.mode,
        worker_cls=get_worker_cls(cfg),
    )

    # wandbの初期化
    wandb.init(
        entity=cfg.wandb.entity,
        project=cfg.wandb.project,
        name=f"{cfg.wandb.name}",
        config=OmegaConf.to_container(cfg, resolve=True, throw_on_missing=True),
    )

    # QDアルゴリズムの実行
    run_qd(celery, cfg)


if __name__ == "__main__":
    # メイン関数の実行
    main()

技術用語まとめ

CycleQDの主要な用語を機能カテゴリーごとに整理します。

カテゴリー 用語 説明
フレームワーク関連 Hydra Facebookが開発した構成管理フレームワーク。階層的な設定管理が可能
vLLM 大規模言語モデルの高速推論フレームワーク。PagedAttentionで最適化
HuggingFace 自然言語処理のライブラリ・プラットフォーム。モデル操作が容易
WandB (Weights & Biases) 機械学習実験の追跡・可視化プラットフォーム
モデル関連 bfloat16 16ビット浮動小数点数形式。省メモリで機械学習に最適
パラメータ モデルの重みと偏り。モデルの動作を決定する数値
エリートモデル アーカイブに保存される性能の高いモデル
QD関連 Quality Diversity (QD) 多様性と品質を両立する最適化アルゴリズム
アーカイブ 優秀なモデルを保存するデータベース
行動特性 モデルの多様性を特徴づける指標
多様性指標 モデル群の多様さを数値化した指標
進化戦略関連 CMA-ES 共分散行列適応進化戦略。効率的なパラメータ探索を実現
サンプリング戦略 親モデルの選択方法(トーナメント選択、ランキング選択など)
交配(crossover) 親モデルのパラメータを組み合わせて新モデルを生成
突然変異(mutation) モデルのパラメータにランダムな変更を加える操作
最適化関連 フィットネス モデルの適応度を示す評価値
探索戦略 パラメータ空間を効率的に探索する方法
フリップ間隔 タスクを切り替える世代間隔
実装関連 Worker モデルの生成と評価を実行する処理単位
層別混合 モデルの層ごとにパラメータを組み合わせる方法
パラメータ摂動 パラメータに小さな変化を加える操作

おわりに

Sakana AIの新技術「CycleQD」公開されているコードを読んでみました。
コードの解説、シーケンス図作成には、Claudeを使用しました。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?