スパコンのように数百〜数万並列規模のMPI環境上で、タスクを分散実行するライブラリ caravan-lib を紹介する。
タスク自体はシングルプロセスで動作し、数秒から数時間くらいの粒度のものを想定している。いわゆるバカパラみたいなことを気軽に実行するために使うものと理解していただきたい。
マスターワーカー型のタスク分散処理を手軽に定義でき、なおかつC++で書かれたヘッダオンリーのライブラリなのでインクルードするだけですぐに利用することができる。また、大規模な並列環境までスケールするようになっている。
サンプルコードのビルドと実行
まずは基本的な使い方が示されているサンプルプログラムを紹介する。この手順通りにやっていけば自分の手でビルドすることができるだろう。
まずはライブラリを手元に持ってくる。ヘッダオンリーライブラリなので、インクルードするだけで利用できるが、外部ライブラリとして nlohmann::json にも依存しているため少し注意が必要である。
git管理しているプロジェクトに導入したい場合、一番簡単な利用方法はこのライブラリをgitのsubmoduleとしてリポジトリに追加するのが良いだろう。
git submodule add https://github.com/yohm/caravan-lib.git
git submodule update --init --recursive
依存しているライブラリも含めてcloneしてくることができる。
例として、次のようなサンプルプログラムをビルドして実行してみることにする。
#include <iostream>
#include <mpi.h>
#include "caravan.hpp"
using json = nlohmann::json;
int main(int argc, char* argv[]) {
MPI_Init(&argc, &argv);
std::function<void(caravan::Queue&)> on_init = [](caravan::Queue& q) {
// pre-process: create json_object that contains parameters of Tasks
for (int i = 0; i < 3; i++) {
json input = { {"message","hello"}, {"param", i} };
uint64_t task_id = q.Push(input);
std::cerr << "task: " << task_id << " has been created: " << input << std::endl;
}
};
std::function<json(const json& input)> do_task = [](const json& input) {
// do some job
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
std::cerr << "doing tasks: " << input << " at rank " << rank << std::endl;
sleep(1);
json output;
output["result"] = input["message"].get<std::string>() + " world";
return output;
};
std::function<void(int64_t, const json&, const json&, caravan::Queue&)> on_result_receive = [](int64_t task_id, const json& input, const json& output, caravan::Queue& q) {
std::cerr << "task: " << task_id << " has finished, input: " << input << ", output: " << output << std::endl;
if (input["message"].get<std::string>() == "hello") {
json input_2 = { {"message", "bye"}, {"param", input["param"].get<int>()}};
q.Push(input_2);
}
};
caravan::Start(on_init, on_result_receive, do_task, MPI_COMM_WORLD);
MPI_Finalize();
return 0;
}
このファイルをビルドするには次のようにする。
(ただし、当然ながらMPIを事前にインストールしておく必要がある。macの場合は、brew install open-mpi
でインストールできる。)
$ mpic++ -std=c++11 hello_world.cpp
実行するには次のようにする。ただし、MPIのプロセス数は3以上じゃないと適切に動作しないので注意。
$ mpiexec -n 8 ./a.out
task: 0 has been created: {"message":"hello","param":0}
task: 1 has been created: {"message":"hello","param":1}
task: 2 has been created: {"message":"hello","param":2}
doing tasks: {"message":"hello","param":0} at rank 2
doing tasks: {"message":"hello","param":2} at rank 4
doing tasks: {"message":"hello","param":1} at rank 3
task: 0 has finished, input: {"message":"hello","param":0}, output: {"result":"hello world"}
task: 1 has finished, input: {"message":"hello","param":1}, output: {"result":"hello world"}
task: 2 has finished, input: {"message":"hello","param":2}, output: {"result":"hello world"}
doing tasks: {"message":"bye","param":0} at rank 2
doing tasks: {"message":"bye","param":2} at rank 4
doing tasks: {"message":"bye","param":1} at rank 3
task: 3 has finished, input: {"message":"bye","param":0}, output: {"result":"bye world"}
task: 4 has finished, input: {"message":"bye","param":1}, output: {"result":"bye world"}
task: 5 has finished, input: {"message":"bye","param":2}, output: {"result":"bye world"}
サンプルコードの解説
ここからサンプルコードで何を行っているかを解説していく。
コードを見て貰えばわかる通り、以下の3つのstd::function
を定義しcaravan::Start
を呼ぶとタスクのスケジューリングと実行が行われるという仕組みになっている。
-
on_init
: 最初に呼ばれる初期化処理のコード -
do_task
: タスクの実体。 -
on_result_receive
: タスクが完了した時の処理の定義。- タスクの結果に応じて、ここでキューにタスクを追加することができる。これによって依存関係のあるタスクを定義することが可能。
on_init
関数
このfunctionの中では、タスクの入力データを定義してキューイングする処理を行なっている。
std::function<void(caravan::Queue&)> on_init = [](caravan::Queue& q) {
// pre-process: create json_object that contains parameters of Tasks
for (int i = 0; i < 3; i++) {
json input = { {"message","hello"}, {"param", i} };
uint64_t task_id = q.Push(input);
std::cerr << "task: " << task_id << " has been created: " << input << std::endl;
}
};
ここでは3つの入力データを定義し(ここではinput
という変数名で定義されている)、キューに追加(q.Push(input)
)している。
入力はnlohmann::json
オブジェクトとして定義するので、さまざまな型のデータを定義できる。ここでは以下のような入力データをキューイングしている。
nlohmann::jsonライブラリの使い方の概要については、"C++のjsonライブラリ決定版 nlohmnn-json"の記事を参考にしていただきたい。
{ "message": "hello", "param", 0}
q.Push
を行うと、返り値としてそのタスクのIDが返る。
この処理はランク0のプロセスで実行される。
do_task
関数
ワーカープロセスで行われる処理を定義する。on_init
でキューイングされた入力データを引数として受け取る。
std::function<json(const json& input)> do_task = [](const json& input) {
// do some job
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
std::cerr << "doing tasks: " << input << " at rank " << rank << std::endl;
sleep(1);
json output;
output["result"] = input["message"].get<std::string>() + " world";
return output;
};
このサンプルでは、キューから受け取った入力を表示してから1秒スリープするというタスクを定義している。
さらに以下のようなJSONデータを出力として返している。
{ "result": "hello world" }
この処理はワーカープロセス(ランク0以外)で並列に実行される。
ちなみにタスクID順に実行されることは保証されず、空いているワーカーがあれば順次タスクが実行される。
on_result_receive
関数
この関数では、各タスクの出力データをどのように処理するかを定義する。
do_task
の出力のデータはマスタープロセス(ランク0のプロセス)に転送され、この関数が呼ばれる事になる。
std::function<void(int64_t, const json&, const json&, caravan::Queue&)> on_result_receive = [](int64_t task_id, const json& input, const json& output, caravan::Queue& q) {
std::cerr << "task: " << task_id << " has finished, input: " << input << ", output: " << output << std::endl;
if (input["message"].get<std::string>() == "hello") {
json input_2 = { {"message", "bye"}, {"param", input["param"].get<int>()}};
q.Push(input_2);
}
};
このサンプルでは、まずタスクID、入力、出力データを標準エラー出力にプリントしている。
さらにこの関数の中では追加でタスクを定義することもできる。それをやっているのが後半の部分で、追加で {"message": "bye", "param": 0}
という入力データをキューに追加している。追加された入力データも順次ワーカープロセスに転送されてタスクが実行される。
このように、依存関係のあるタスクを定義したり、結果に応じてタスクを追加したりすることができる。
キューの中の全てのタスクが実行されて空になり、またそれぞれのタスクの出力に対してon_result_receive
が実行されたときに caravan::Start
関数が終了となり処理を返す。
注意点
上記の関数を定義するときに変数をキャプチャすることもできる。
しかし、マスタープロセスとワーカープロセスではメモリ空間が異なるため、注意が必要である。例えば、on_init
関数内であるキャプチャした変数に変更を加えたとしても、do_task
内でその変数の変更を参照することはできない。これらの処理は別のプロセスで行われるためである。
マスターワーカー間の通信はすべて、Queueを経由して入出力されるJSONオブジェクトを通して行わなくてはならない。
実装について検討した点
実装について以下のようなことを検討して実装した。
- マスターワーカー型の実装
- タスクのスケジューリング時にワーカー間でタスクを受け渡しする分散型の方式も検討したが、想定しているユースケースでのタスクの粒度が大きく実行時間のばらつきも大きいため中央集権型のデザインにした。
- マスタープロセスでタスクの定義をしてキューイングをしたら、タスクを実行していないワーカーがマスターに問い合わせをして入力データを取りに行くというのが基本的な動きになっている。
- 入出力の処理がマスターでのみ行われるので利用者にも挙動がわかりやすいと思う
- JSON型での入出力データの定義
- できるだけ汎用的に使えるように、入出力にnlohmann::jsonライブラリを用いた。JSONで表現できるデータ型ならば入出力として利用できる。
- このライブラリはSTLとの親和性がとても高く、さまざまなSTLコンテナをシームレスにJSONオブジェクトに変換できる。また、ユーザー定義型も簡単にJSONにシリアライズするためのマクロが用意されている。
- 特に可変長のデータを通信する実装はMPIで直に書くと面倒になりがちだが、そのような面倒な処理を利用者自身で書く必要はない。
- 実装としてはJSONオブジェクトをMessagePackにバイナリ化してMPIで通信している。
- 依存関係のあるタスクや、実行結果に応じたタスクの追加ができるようにした。
- タスクの結果をマスターが受け取った際に、その結果に応じて別のタスクを追加できるようにした。
- これを用いると、例えばパラメータの最適化などの処理が実現できる。
- スケーラブルな実装
- マスターワーカー型の処理ではマスターとワーカーの間に1対1の通信が必要だが、ワーカーの数が数千、数万と増えてきたときに単純にやると破綻してしまう。
- 一つのプロセスと1対1通信するプロセスが数百を超えてしまうと、ハングしてプログラムが停止してしまうことが往々にして起きる。
- そこでマスターとワーカーという2分類ではなく、マスター・バッファー・ワーカーというように階層を増やすようにした。マスターはバッファーのプロセスとやりとりし、バッファーはワーカーとやりとりする。一つのバッファーが担当するワーカーの数を数百くらいに制限することにより、合計で10万くらいのプロセスまで問題なくスケールする。
- バッファーのプロセスは通信に従事しタスクは実行しないので若干効率が落ちるが、効率低下は1%以内なので気にしないという方針
- (実はこれが3プロセス以上じゃないとサンプルが動かない理由。マスター、バッファー、ワーカーで少なくとも一プロセス必要なので3プロセスが最小構成になる。一般にnプロセスで実行したときには、最大(n-2)ワーカープロセスでタスクを処理する)
- マスターワーカー型の処理ではマスターとワーカーの間に1対1の通信が必要だが、ワーカーの数が数千、数万と増えてきたときに単純にやると破綻してしまう。