pagmoの利用方法
pagmoはC++で書かれた最適化計算のライブラリの一つ。
遺伝的アルゴリズムなどさまざまなアルゴリズムが提供されていて、統一的なインターフェースから利用できる。2022年現在でもちゃんと開発が継続しているようである。以下のような特徴がある。
- 評価関数の評価の並列実行も可能
- 進化計算など評価関数の計算を多数回行う時に並列に計算することができる
- 多目的最適化、拘束条件の有無、確率的・決定的、離散最適化・連続最適化など問題に合わせて色々な手法が実装されている
- C++で多目的最適化が実装されている唯一(?)のライブラリ
この記事ではpagmoの基本的な使い方を解説する。
インストール
boostとIntel TBBライブラリに依存しているが、パッケージ管理ソフトで簡単に導入することができる。macの場合、homebrewを使って以下のコマンドで簡単に導入することができる。
$ brew install pagmo
最小限のプログラム
参考: https://esa.github.io/pagmo2/quickstart.html
まずは以下のコードをコンパイルし、実行するところまでをやってみる。
内容については後ほど解説するが、まずは手元で動かせるようにするまでの手順を示す。
#include <iostream>
#include <pagmo/algorithms/sade.hpp>
#include <pagmo/problem.hpp>
using namespace pagmo;
int main() {
struct my_problem {
// Implementation of the objective function.
vector_double fitness(const vector_double &dv) const
{
double f = dv[0] * dv[0] + dv[1] * dv[1] + dv[2] * dv[2];
return {f};
}
// Implementation of the box bounds.
std::pair<vector_double, vector_double> get_bounds() const
{
return {{-1.0, -1.0, -1.0}, {1.0, 1.0, 1.0}};
}
};
problem prob{my_problem{}};
// 2 - Instantiate a pagmo algorithm (self-adaptive differential evolution, 100 generations).
algorithm algo{sade{100}};
algo.set_seed(1234);
// 3 - Instantiate a population of size 50
population pop{prob, 50, 123456};
// 4 - Let population evolve
pop = algo.evolve(pop);
std::cout << pop;
return 0;
}
ビルドはcmakeを使って行うのが簡単である。以下のようにfind_package(Pagmo REQUIRED)
でインストールされたPagmoを探し、target_link_libraries
でpagmoのライブラリとリンクすることを指定すれば使えるようになる。
cmake_minimum_required(VERSION 3.16)
project(sample_project)
# Look for an installation of pagmo in the system.
find_package(Pagmo REQUIRED)
add_executable(getting_started getting_started.cpp)
target_link_libraries(getting_started Pagmo::pagmo)
以上のファイルを用意したのち、以下のコマンドでビルドできる。
mkdir build
cd build
cmake .. -DCMAKE_BUILD_TYPE=Release
make
cmakeを使わない場合は、以下のようなコンパイルコマンドで直接必要なライブラリをリンクすれば良い。
g++ -O2 -DNDEBUG -std=c++17 getting_started.cpp -pthread -lpagmo -lboost_serialization -ltbb
これを実行してみると、以下のような出力が得られる。
Problem name: main::my_problem
C++ class name: main::my_problem
Global dimension: 3
Integer dimension: 0
Fitness dimension: 1
Number of objectives: 1
Equality constraints dimension: 0
Inequality constraints dimension: 0
Lower bounds: [-1, -1, -1]
Upper bounds: [1, 1, 1]
Has batch fitness evaluation: false
Has gradient: false
User implemented gradient sparsity: false
Has hessians: false
User implemented hessians sparsity: false
Fitness evaluations: 2600
Thread safety: basic
Population size: 50
List of individuals:
#0:
ID: 6974227922669909857
Decision vector: [0.000225865, -0.000333656, 0.000215769]
Fitness vector: [2.08898e-07]
#1:
ID: 16533536718309661815
Decision vector: [-0.000136895, -7.55962e-05, 0.000217031]
Fitness vector: [7.15573e-08]
...
#49:
ID: 12762337260798831408
Decision vector: [-6.08054e-05, 4.81642e-06, 4.01976e-05]
Fitness vector: [5.33634e-09]
Champion decision vector: [-1.69489e-05, -5.31734e-06, -1.97038e-05]
Champion fitness: [7.03778e-10]
1: 最適化問題の定義
最適化ライブラリーを利用するためには、解決すべき最適化問題を表現することがまずは必要である。
このセクションでは、pagmoでどのように最適化問題を定義するかを解説する。
先ほどのサンプルコードでは、以下のような3変数の目的関数を最小化する問題を考えている。
f(x_1, x_2, x_3) = x_1^2 + x_2^2 + x_3^2
ただし、定義域は
-1 \leq x_{1,2,3} \leq 1
としている。
この目的関数は(自明ではあるが)$(0,0,0)$において最小値$0$を取る。この最小値とその時の$(x_1,x_2,x_3)$を求めるのが、ここで解きたい問題である。
ここで用語として目的関数の入力$\mathbf{x}$を"decision vector"と呼び、出力を"fitness"と呼ぶ。
この問題をPagmoで定義するには、構造体(またはクラス)を定義する。
ここではmy_problem
という構造体を定義した。
struct my_problem {
// Implementation of the objective function.
vector_double fitness(const vector_double &dv) const
{
double f = dv[0] * dv[0] + dv[1] * dv[1] + dv[2] * dv[2];
return {f};
}
// Implementation of the box bounds.
std::pair<vector_double, vector_double> get_bounds() const
{
return {{-1.0, -1.0, -1.0}, {1.0, 1.0, 1.0}};
}
};
ここで、問題を定義した構造体は、ある特定のクラスを継承している必要はなく、以下の二つの関数をメンバー関数を最低限定義してあれば問題を定義できる。
(Pagmoでは継承ではなくテンプレートを利用した型消去(type erasure)を積極的に用いている)
-
vector_double fitness(const vector_double &dv) const
- decision vector(入力)を受け取って、目的関数の出力を返す
- 目的関数は複数の値を返すケースもあるので、doubleのvectorを返す。ここで
pagmo::vector_double
はstd::vector<double>
のエイリアス。
-
std::pair<vector_double, vector_double> get_bounds() const
- decision vectorの領域を返す。下界(lower bound)と上界(upper bound)のペアを返す。
この構造体を用いて、pagmo::problem
クラスのインスタンスを以下のように作る。
problem prob{my_problem{}};
ちなみに、この{...}
というのは単にコンストラクタに渡す引数を記述する"uniform initialization"という構文。(...)
で書くのと同じく単なる変数の初期化だが、(...)
を使うと構文上、関数定義と解釈されてしまうためuniform initializationを利用している。
https://cpprefjp.github.io/lang/cpp11/uniform_initialization.html
2: 最適化アルゴリズムの指定
続いて、最適化アルゴリズムのクラスであるalgorithm
のインスタンスを構築する。
ここでは、pagmoで定義されている既存のアルゴリズムの一つである"self-adaptive differential evolution"(SADE) で最適化する。
SADEは差分進化アルゴリズムの一つで、詳細はこのページを参照のこと。
#include <pagmo/algorithms/sade.hpp>
...
algorithm algo{sade{100}}; // 100 generations
algo.set_seed(1234); // set random number seed
まず、必要なヘッダをインクルードして、algorithm
のコンストラクタの引数にsade
インスタンスを渡して、構築する。
sade{100}
の引数の100は世代数を指定している。他にもパラメータを引数として渡すことができる。
アルゴリズムの構築後は、そのままでも利用できるが、ここでは結果の再現性を補償するために乱数の種を指定している。
3: populationの初期化
続いて、populationクラスのインスタンスを作る。populationクラスは、複数の決定ベクトルを持つクラス。
pagmoで定義されている最適化アルゴリズムは、このpopulationを更新していくことにより最適化を行う。
populationは、problemについての情報と個体(決定ベクトルとそのfitness)の情報を持つ。
population pop{prob, 50, 123456};
4: populationの進化
populationを更新していく。
pop = algo.evolve(pop);
この最小限のコードに加えて、最適化問題に拘束条件を加えたり、多目的最適化を実施したりといったより複雑な最適化を行うこともできる。
しかし、コードのメインの部分は以上の4つのプロセス(「問題の定義」「アルゴリズムの指定」「集団の初期化」「集団の進化」)からなる。
多目的最適化
これまでは目的関数が一つのスカラー値を返す単目的最適化を行なってきた。
次は、目的関数が二つ以上ありそのパレート最適面を求める「多目的最適化問題」を解いてみることにする。
ここでは例として、以下のような入力・出力の次元がともに2である多目的最適化問題を解くこととする。
\begin{cases}
f_1(x_1, x_2) = x_1^2 + x_2^2 \\
f_2(x_1, x_2) = (1-x_1)^2 + (1-x_2)^2
\end{cases}
ただし、定義域は $0 \leq x_{1,2} \leq 1$とする。
定義から明らかなように$f_1$, $f_2$を最小にする点はそれぞれ$(0,0)$, $(1,1)$であり、目的関数のどちらかを最小化しようとすると他方は最小化できない。
この問題をNon dominated sorting genetic algorithm (NSGA2)というアルゴリズムで解く。
https://esa.github.io/pagmo2/docs/cpp/algorithms/nsga2.html
コードは以下のようになる。
#include <iostream>
#include <pagmo/algorithm.hpp>
#include <pagmo/algorithms/nsga2.hpp>
#include <pagmo/population.hpp>
#include <pagmo/problem.hpp>
#include <pagmo/problems/dtlz.hpp>
using namespace pagmo;
int main() {
// 1 - Define a problem
struct my_problem {
pagmo::vector_double fitness(const pagmo::vector_double &x) const {
return {x[0] * x[0] + x[1] * x[1], (1 - x[0]) * (1 - x[0]) + (1 - x[1]) * (1 - x[1])};
}
size_t get_nobj() const {
return 2;
}
std::pair<pagmo::vector_double, pagmo::vector_double> get_bounds() const {
return {{0, 0},
{1, 1}};
}
};
problem prob{my_problem{}};
// 2 - Instantiate a pagmo algorithm
algorithm algo{nsga2(1000)};
algo.set_seed(1234);
// 3 - Instantiate a population
population pop{prob, 32, 123456};
// 4 - Evolve the population
pop = algo.evolve(pop);
// 5 - Output the population
std::cerr << "The population: \n" << pop;
for (const auto &f: pop.get_f()) {
std::cout << f[0] << ' ' << f[1] << std::endl;
}
}
これを実行すると、populationの$(f_1,f_2)$の値のリストが得られる。$\mathbf{f}$のリストをプロットすると以下のようになる。
$f_1$を最小化する解($f_1=0,f_2=2$)と$f_2$を最小化する解($f_1=2,f_2=0$)と、その中間が得られる。
また、$\mathbf{x}$をプロットすると以下のようになる。
コードの解説
単目的最適化と多目的最適化のコードで異なるのは問題の定義の部分となる。
struct my_problem {
pagmo::vector_double fitness(const pagmo::vector_double &x) const {
return {x[0] * x[0] + x[1] * x[1], (1 - x[0]) * (1 - x[0]) + (1 - x[1]) * (1 - x[1])};
}
size_t get_nobj() const {
return 2;
}
...
};
まずfitness
が長さ2のdoubleのvectorを返すようになっている。目的関数が複数あるので、複数の値を返す必要がある。
また、目的関数の出力次元を返す関数get_nobj
を定義する必要がある。
以上のように問題を定義すれば単目的最適化と同じように実行することができる。
ただし、最適化アルゴリズムは多目的最適化をサポートしているものでなくてはいけない。
pagmoで実装されている最適化アルゴリズムの一覧は以下のページにある。
https://esa.github.io/pagmo2/overview.html#list-of-algorithms
目的関数のバッチ評価
続いて、fitness関数の評価に実行時間がかかる場合を考えよう。先ほどの多目的最適化のコードを以下のように変更する。
#include <iostream>
#include <chrono>
#include <thread>
#include <pagmo/algorithm.hpp>
#include <pagmo/algorithms/nsga2.hpp>
#include <pagmo/population.hpp>
#include <pagmo/problem.hpp>
using namespace pagmo;
int main() {
struct my_problem {
pagmo::vector_double fitness(const pagmo::vector_double &x) const {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
return {x[0] * x[0] + x[1] * x[1], (1 - x[0]) * (1 - x[0]) + (1 - x[1]) * (1 - x[1])};
}
size_t get_nobj() const {
return 2;
}
std::pair<pagmo::vector_double, pagmo::vector_double> get_bounds() const {
return {{0, 0},
{1, 1}};
}
};
problem prob{my_problem{}};
algorithm algo{nsga2(10)};
algo.set_seed(1234);
population pop{prob, 32};
auto start = std::chrono::high_resolution_clock::now();
pop = algo.evolve(pop);
auto end = std::chrono::high_resolution_clock::now();
std::cout << "Elapsed time: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << "ms\n";
}
fitness関数を呼び出すために10msスリープするコードを入れてある。
これを10世代、個体数32で手元のマシンで実行すると計算が終わるまで3.6secを要した。
fitnessは各個体に対して計算しているのだが、これらは同じ世代のpopulationの含まれる個体については並列に評価できるはずである。そこでこれをOpenMPを使って並列化してみる。
コードの変更点は以下の通り。
//...
#include <omp.h>
#include <pagmo/batch_evaluators/member_bfe.hpp>
struct my_problem {
// ...省略
// batch_fitness を追加
vector_double batch_fitness(const vector_double& dvs) const {
size_t input_dim = 2, output_dim = get_nobj();
size_t num_dvs = dvs.size() / input_dim;
vector_double fitnesses(num_dvs*output_dim);
std::cerr << "batch_fitness called: " << omp_get_max_threads() << ' ' << num_dvs << std::endl;
#pragma omp parallel for
for (size_t i = 0; i < num_dvs; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
size_t idx = i*input_dim;
double f1 = dvs[idx] * dvs[idx] + dvs[idx+1] * dvs[idx+1];
double f2 = (1 - dvs[idx]) * (1 - dvs[idx]) + (1 - dvs[idx+1]) * (1 - dvs[idx+1]);
fitnesses[i*output_dim] = f1;
fitnesses[i*output_dim+1] = f2;
}
return fitnesses;
}
};
problem prob{my_problem{}};
nsga2 my_nsga2{10};
my_nsga2.set_bfe(bfe{member_bfe{}}); // batch function evaluatorを追加
algorithm algo{my_nsga2};
algo.set_seed(1234);
population pop{prob, member_bfe{}, 32, 123456ull}; // populationのコンストラクタにもbatch function evaluatorを追加
// 以後は同じ
-
my_problem
クラスにbatch_fitness
関数を定義 -
nsga2
クラスにmember_bfe
というbatch function evaluatorをセット -
population
のコンストラクタにもmember_bfe
を指定
この3つの変更点を適用すれば、populationの初期化時および進化時にfitness
関数の代わりにbatch_fitness
関数が呼ばれるようになる。
batch_fitness
関数は、decision vectorの配列が引数として渡される。入力次元を$n$とすると、$[0,n)$が最初のdecision vector、$[n,2n)$に2番目のdecision vectorが入っている。これらに対する出力をやはりvector_double
で返す。出力次元を$m$とすると$[0,m)$に最初の入力に対する出力を格納し、$[m,2m)$に二つ目の出力を格納し、というように最後まで結果を出力する。
これらのバッチ処理をこの関数の中で行い、各世代ごとに一度だけbatch_fitness
関数が呼ばれる。この関数をOpenMPを使ったスレッド並列化したり、MPIを用いた複数プロセス並列にするなりして高速化することで、評価関数の計算に時間を要する最適化計算を高速化することができる。
実際に上のコードを32 threadのOpenMP並列で実行すると、0.12秒で完了する。
注意点として、すべてのアルゴリズムがbatch function evaluatorを受け付けられるわけではない。
bfeをサポートしているかどうかは、それぞれのアルゴリズムの実装を確認し、bfeを受け付けるメソッドがあるかどうかを確認するしかないようだ。
アルゴリズムの実装のソースコードはこちらから確認できる。
https://github.com/esa/pagmo2/tree/1fb2448cda6851f70076a947c84584a68d0b5187/src/algorithms
参考: https://github.com/esa/pagmo2/blob/1fb2448cda6851f70076a947c84584a68d0b5187/tests/nsga2.cpp