はじめに
Corosync は Pacemaker と組み合わせたクラスタの障害検知・リカバリーに多く用いられます。
Corosync は Totem と呼ばれる単一リングプロトコル上で仮想同期を実現します。
仮想同期は、複製されたプロセスを管理し、それらの間の通信を調整するためのモデルです。
このモデルでは、一貫性のあるメッセージ配送の保証します。つまり、送信したメッセージがグループ内のプロセスすべてに同じ順序で到達する(あるいはいずれにも配送されない)ことを保証します。
Corosync での仮想同期通信は CPG (Closed Process Group) ライブラリから利用することができます。
- CPG グループへの参加・離脱
- CPG グループメンバーの取得、メンバー構成変更通知の受信
- CPG グループ内でのメッセージ送信・受信
一貫性のあるメッセージ配送が保証されるならば手軽に高信頼な分散アプリケーションを作ることができるのかもしれない、ということで、試しにテストプログラムを動かしてみることにしました。
corosync クラスタのテスト環境を作る
corosync は計算機一台では動かないので、Docker でクラスタを立ち上げるようにします。
具体的には、
- ホスト側で corosync を一つ動かす
- Docker 上で ubuntu ノード2台立ち上げ、それぞれで corosync を動かす
という形で 3 つの corosync からなるクラスタを動かします。
3 つとも Docker 上で動かしてホスト側はきれいなままに保つというのでもよかったのですが、なんとなく全部仮想で動かすだけだと本当に動かしたという感じがしない、という気分の問題でこうしました。
corosync クラスタの起動
docker-compose.yml
から 192.168.20.0/24 仮想ネットワーク上に corosync2(192.168.20.12), corosync3(192.168.20.13) の2つのノードを立ち上げることにします。ホスト側には 192.168.20.1 のアドレスを割り当てます。
適宜テストプログラムを入れ替えて動かすのに便利ないように、それぞれのノードは /volume/
に ./corosync2/
, ./corosync3
のディレクトリをマウントすることとしました(名前の一貫性があるほうが覚えやすいので、ホスト側用に ./corosync1/
のディレクトリも作っておきました)。
というわけで書いた docker-compose.yml はこんな感じです:
networks:
corosync_net:
driver: bridge
ipam:
driver: default
config:
- subnet: 192.168.20.0/24
gateway: 192.168.20.1
services:
corosync2:
image: corosync2
build: ./corosync_docker
container_name: corosync2
volumes:
- ./corosync2:/volume
networks:
corosync_net:
ipv4_address: 192.168.20.12
corosync3:
image: corosync3
build: ./corosync_docker
container_name: corosync3
volumes:
- ./corosync3:/volume
networks:
corosync_net:
ipv4_address: 192.168.20.13
build: ./corosync_docker
と指定しているので、 ./corosync_docker/Dockerfile
も作っておきます:
FROM ubuntu:latest
RUN \
apt update && \
apt -y install net-tools corosync
COPY corosync.conf /etc/corosync/corosync.conf
COPY corosync-loop.sh /usr/local/bin/corosync-loop.sh
CMD ["/bin/bash", "/usr/local/bin/corosync-loop.sh"]
corosync.conf
はこんな感じで corosync パッケージをインストールしたときにできる /etc/corosync/corosyc.conf
ほぼそのまま、nodelist に
192.168.20.1, 192.168.20.12, 192.168.20.13
のアドレスを追加しただけです:
# Please read the corosync.conf.5 manual page
system {
allow_knet_handle_fallback: yes
}
totem {
version: 2
cluster_name: corosync_test
crypto_cipher: none
crypto_hash: none
}
logging {
fileline: off
to_stderr: yes
to_logfile: yes
logfile: /var/log/corosync/corosync.log
to_syslog: yes
debug: off
logger_subsys {
subsys: QUORUM
debug: off
}
}
quorum {
provider: corosync_votequorum
}
nodelist {
node {
nodeid: 1
name: node1
ring0_addr: 192.168.20.1
}
node {
nodeid: 2
name: node2
ring0_addr: 192.168.20.12
}
node {
nodeid: 3
name: node3
ring0_addr: 192.168.20.13
}
}
ホスト側の /etc/corosync/corosync.conf
も同じ内容にしておきます。
corosync-loop.sh
は corosync サービスを立ち上げて待ちに入る、というだけです:
service corosync start
while [ 1 ]; do sleep 1; done
立ち上げてみます:
docker compose up -d
docker ps
で corosync2, corosync3 コンテナが動作しているはずです。
その後、ホスト(192.168.20.1)側の corosync を起動します:
sudo systemctl start corosync
docker の corosync が起動していなければクラスタを構成できないので corosync 起動で待ち状態のままになることに注意が必要です。
無事に corosync クラスタが起動できると、corosync-cfgtool
コマンドで以下のように nodeid 1, nodeid 2, nodeid 3 がつながっていることが確認できます。
$ sudo corosync-cfgtool -s
Local node ID 1, transport knet
LINK ID 0 udp
addr = 192.168.20.1
status:
nodeid: 1: localhost
nodeid: 2: connected
nodeid: 3: connected
corosync クラスタの停止
一応、後片付けの手順を書いておくと
ホスト側の corosync の停止
sudo systemctl stop corosync
sudo systemctl status corosync
docker ノードの停止
docker compose stop
docker イメージ含めてすべて停止・破棄するスクリプトも準備しておきました。
#!/bin/bash
# stop corosync cluster
docker compose stop
# remove corosync cluster
for pid in $(docker ps -a |awk '/corosync[1-4]/{print $1;}'); do
docker rm ${pid}
done
# remove corosync cluster images
for img in $(docker images |awk '/corosync[1-4]/{print $3;}'); do
docker rmi ${img}
done
CPG 通信のテスト
テストプログラムの準備
corosync クラスタを起動できたところで、以下のようなテストプログラムを書いてみました。
// g++ cpg_test.cc -lcpg -o cpg_test
#include <corosync/cpg.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <iostream>
#include <sstream>
namespace std {
// エラーコードを文字列に変換する
ostream& operator<<(ostream& os, cs_error_t cs) {
switch (cs) {
case CS_ERR_TRY_AGAIN: os << "Resource temporarily unavailable"; break;
case CS_ERR_INVALID_PARAM: os << "Invalid argument"; break;
case CS_ERR_ACCESS: os << "Permission denied"; break;
case CS_ERR_LIBRARY: os << "The connection failed"; break;
case CS_ERR_INTERRUPT: os << "System call interrupted by a signal"; break;
case CS_ERR_NOT_SUPPORTED:
os << "The requested protocol/functionality not supported"; break;
case CS_ERR_MESSAGE_ERROR: os << "Incorrect auth message received"; break;
case CS_ERR_NO_MEMORY:
os << "Not enough memory to complete the requested task"; break;
default: os << "UNKNOWN";
}
os << "(" << static_cast<int>(cs) << ")";
return os;
}
// 理由コードを文字列に変換する
ostream& operator<<(ostream& os, cpg_reason_t reason) {
switch (reason) {
case CPG_REASON_UNDEFINED: os << "CPG_REASON_UNDEFINED"; break;
case CPG_REASON_JOIN: os << "CPG_REASON_JOIN"; break;
case CPG_REASON_LEAVE: os << "CPG_REASON_LEAVE"; break;
case CPG_REASON_NODEDOWN: os << "CPG_REASON_NODEDOWN"; break;
case CPG_REASON_NODEUP: os << "CPG_REASON_NODEUP"; break;
case CPG_REASON_PROCDOWN: os << "CPG_REASON_PROCDOWN"; break;
default: os << "UNKNOWN";
}
os << "(" << static_cast<int>(reason) << ")";
return os;
}
ostream& operator<<(ostream& os, const cpg_name* name) {
os << std::string(name->value, name->length);
return os;
}
} // namespace std
// エラーメッセージを表示
static void
report_cs_error(const char* title, cs_error_t cs) {
std::cerr << title << ": " << cs << std::endl;
}
// DELIVER コールバック
static void
cpg_deliver_cb_(
cpg_handle_t handle,
const struct cpg_name *group_name,
uint32_t nodeid,
uint32_t pid,
void *msg,
size_t msg_len) {
std::cout << __FUNCTION__ << ":" << std::endl;
std::cout << " group_name[" << group_name << "]," << std::endl;
std::cout << " nodeid_" << nodeid << "," << std::endl;
std::cout << " pid[" << pid << "]," << std::endl;
std::cout << " msg[" <<
std::string(static_cast<char*>(msg), msg_len) << "]" << std::endl;
}
static std::string
cpg_address_list_to_str_(
const struct cpg_address *addr_list,
size_t addr_list_entries) {
std::stringstream ss;
for (size_t i = 0; i < addr_list_entries; ++i) {
ss << "\n nodeid_" << addr_list[i].nodeid << " {"
"pid[" << addr_list[i].pid << "],"
"reason[" << static_cast<cpg_reason_t>(addr_list[i].reason) << "]"
"}";
}
return ss.str();
}
// CONFCHG コールバック
static void
cpg_confchg_cb_(
cpg_handle_t handle,
const struct cpg_name *group_name,
const struct cpg_address *member_list,
size_t member_list_entries,
const struct cpg_address *left_list,
size_t left_list_entries,
const struct cpg_address *joined_list,
size_t joined_list_entries) {
std::cout << __FUNCTION__ << ":" << std::endl;
std::cout << " group_name[" << group_name << "]," << std::endl;
std::cout << " member_list[" << cpg_address_list_to_str_(
member_list,
member_list_entries) << "]," << std::endl;
std::cout << " left_list[" << cpg_address_list_to_str_(
left_list,
left_list_entries) << "]," << std::endl;
std::cout << " joined_list[" << cpg_address_list_to_str_(
joined_list,
joined_list_entries) << "]" << std::endl;
}
// TOTEM_CONFCHG コールバック
static void
cpg_totem_confchg_cb_(
cpg_handle_t handle,
struct cpg_ring_id ring_id,
uint32_t member_list_entries,
const uint32_t *member_list) {
std::cout << __FUNCTION__ << ":" << std::endl;
std::cout << " ring_id["
"nodeid_" << ring_id.nodeid <<
" (seq:" << ring_id.seq << ")]," << std::endl;
std::cout << " member_list[";
for (uint32_t i = 0; i < member_list_entries; ++i) {
if (i != 0) {
std::cout << ",";
}
std::cout << member_list[i];
}
std::cout << "]" << std::endl;
}
static cpg_model_v1_data_t CPG_MODEL_DATA_ {
.cpg_deliver_fn = cpg_deliver_cb_,
.cpg_confchg_fn = cpg_confchg_cb_,
.cpg_totem_confchg_fn = cpg_totem_confchg_cb_,
.flags = CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
};
int main(int argc, char* argv[]) {
cpg_handle_t handle;
// 1. 初期化
const cs_error_t cs_init = cpg_model_initialize(
&handle,
CPG_MODEL_V1,
reinterpret_cast<cpg_model_data_t*>(&CPG_MODEL_DATA_),
NULL);
if (cs_init != CS_OK) {
report_cs_error("cpg_initialize", cs_init);
exit(1);
}
// 2. 自ノード ID の取得と表示
uint32_t local_nodeid;
const cs_error_t cs_lg = cpg_local_get(handle, &local_nodeid);
if (cs_lg != CS_OK) {
report_cs_error("cpg_local_get", cs_lg);
exit(1);
}
std::cout << "local_nodeid = " << local_nodeid << std::endl;
// 3. グループへの参加
std::cout << "... cpg_join" << std::endl;
struct cpg_name group_name;
group_name.length = strlen("test");
strncpy(group_name.value, "test", sizeof(group_name.value));
const cs_error_t cs_join = cpg_join(handle, &group_name);
if (cs_join != CS_OK) {
report_cs_error("cpg_join", cs_join);
exit(1);
}
// 4. グループに属するメンバーの取得と表示
std::cout << "... cpg_membership_get" << std::endl;
cpg_address member_list[8];
int member_list_len = 8;
const cs_error_t cs_mg = cpg_membership_get(
handle,
&group_name,
member_list,
&member_list_len);
if (cs_mg != CS_OK) {
report_cs_error("cpg_membership_get", cs_mg);
exit(1);
}
for (int i = 0; i < member_list_len; ++i) {
std::cout << " member[" << i << "] {" << std::endl;
std::cout << " nodeid_" << member_list[i].nodeid << "," << std::endl;
std::cout << " pid[" << member_list[i].pid << "]," << std::endl;
std::cout << " reason[" <<
static_cast<cpg_reason_t>(member_list[i].reason) << "]}" << std::endl;
}
// 5. イベントループ用に FD を取得
std::cout << "... cpg_fd_get" << std::endl;
int cpg_fd;
const cs_error_t cs_fg = cpg_fd_get(handle, &cpg_fd);
if (cs_fg != CS_OK) {
report_cs_error("cpg_fd_get", cs_fg);
exit(1);
}
std::cout << " cpg_fd = " << cpg_fd << std::endl;
// 6. イベントループ
// 標準入力および CPG からの通知を監視
std::cout << "... start epoll loop" << std::endl;
epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = cpg_fd;
int epollfd = epoll_create1(0);
if (epollfd == -1) {
perror("epoll_create1");
exit(1);
}
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, cpg_fd, &ev) == -1) {
perror("epoll_ctl(cpg)");
exit(1);
}
ev.data.fd = STDIN_FILENO;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev) == -1) {
perror("epoll_ctl(stdin)");
exit(1);
}
for (;;) {
#define MAX_EVENTS 10
epoll_event events[MAX_EVENTS];
const int nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(1);
}
for (int n = 0; n < nfds; ++n) {
if (events[n].data.fd == STDIN_FILENO) {
// 標準入力からの入力をグループに送信
std::string line;
std::getline(std::cin, line);
char ibuf[256];
strncpy(ibuf, line.c_str(), sizeof(ibuf));
const size_t ibuf_len =
(line.size() < sizeof(ibuf)) ? line.size() : sizeof(ibuf);
iovec iov;
iov.iov_base = ibuf;
iov.iov_len = ibuf_len;
std::cout << "... cpg_mcast_joined: '" <<
std::string(ibuf, ibuf_len) << "'" << std::endl;
const cs_error_t cs_mj = cpg_mcast_joined(
handle, CPG_TYPE_AGREED,
&iov,
1);
if (cs_mj != CS_OK) {
report_cs_error("cpg_mcast_joined", cs_mj);
exit(1);
}
}
if (events[n].data.fd == cpg_fd) {
// CPG からの通知をディスパッチ
const cs_error_t cs_d = cpg_dispatch(handle, CS_DISPATCH_ALL);
if (cs_d != CS_OK) {
report_cs_error("cpg_dispatch", cs_d);
exit(1);
}
}
}
}
}
これをホスト上でコンパイルして、corosync2/
, corosync3/
ディレクトリ上に配置して各クラスタノード上で実行できるようにします。
実行
まず、ホスト上でテストプログラムを起動してみます。
$ sudo ./cpg_test
local_nodeid = 1
... cpg_join
... cpg_membership_get
... cpg_fd_get
cpg_fd = 3
... start epoll loop
cpg_confchg_cb_:
group_name[test],
member_list[
nodeid_1 {pid[1299723],reason[CPG_REASON_UNDEFINED(0)]}],
left_list[],
joined_list[
nodeid_1 {pid[1299723],reason[CPG_REASON_JOIN(1)]}]
cpg_totem_confchg_cb_:
ring_id[nodeid_1 (seq:106)],
member_list[1,2,3]
cpg_join
でグループに参加、まだ一人目なので cpg_membership_get でなにも取得できないものの、イベントループでは cpg_confchg コールバックと cpg_totem_confchg で自身がグループに参加したときのイベントが通知されています。
では、ここで corosync2 ノード上でテストプログラムを起動してみます。
$ docker exec -it corosync2 bash
root@62128d3f6e33:/# /volume/cpg_test
local_nodeid = 2
... cpg_join
... cpg_membership_get
member[0] {
nodeid_1,
pid[1299723],
reason[CPG_REASON_UNDEFINED(0)]}
... cpg_fd_get
cpg_fd = 3
... start epoll loop
cpg_confchg_cb_:
group_name[test],
member_list[
nodeid_1 {pid[1299723],reason[CPG_REASON_UNDEFINED(0)]}
nodeid_2 {pid[148452],reason[CPG_REASON_UNDEFINED(0)]}],
left_list[],
joined_list[
nodeid_2 {pid[148452],reason[CPG_REASON_JOIN(1)]}]
cpg_totem_confchg_cb_:
ring_id[nodeid_1 (seq:106)],
member_list[1,2,3]
今度は cpg_membership_get
で nodeid_1 が見えました。また、cpg_confchg_cb_
では nodeid_1 と nodeid_2 がメンバーとして存在していること(member_list)、あらたに参加したのが nodeid_2 であること(joined_list)として見えています。
また、ホスト側で起動していたテストプログラム側にも cpg_confchg 通知が来ています:
cpg_confchg_cb_:
group_name[test],
member_list[
nodeid_1 {pid[1299723],reason[CPG_REASON_UNDEFINED(0)]}
nodeid_2 {pid[148452],reason[CPG_REASON_UNDEFINED(0)]}],
left_list[],
joined_list[
nodeid_2 {pid[148452],reason[CPG_REASON_JOIN(1)]}]
さらに corosync3 上でもテストプログラムを起動すると:
$ docker exec -it corosync3 bash
root@46762d9ceb63:/# /volume/cpg_test
local_nodeid = 3
... cpg_join
... cpg_membership_get
member[0] {
nodeid_1,
pid[1299723],
reason[CPG_REASON_UNDEFINED(0)]}
member[1] {
nodeid_2,
pid[148452],
reason[CPG_REASON_UNDEFINED(0)]}
... cpg_fd_get
cpg_fd = 3
... start epoll loop
cpg_confchg_cb_:
group_name[test],
member_list[
nodeid_1 {pid[1299723],reason[CPG_REASON_UNDEFINED(0)]}
nodeid_2 {pid[148452],reason[CPG_REASON_UNDEFINED(0)]}
nodeid_3 {pid[148570],reason[CPG_REASON_UNDEFINED(0)]}],
left_list[],
joined_list[
nodeid_3 {pid[148570],reason[CPG_REASON_JOIN(1)]}]
cpg_totem_confchg_cb_:
ring_id[nodeid_1 (seq:106)],
member_list[1,2,3]
ホスト(nodeid_1), corosync2(nodeid_2) 側にも
cpg_confchg_cb_:
group_name[test],
member_list[
nodeid_1 {pid[1299723],reason[CPG_REASON_UNDEFINED(0)]}
nodeid_2 {pid[148452],reason[CPG_REASON_UNDEFINED(0)]}
nodeid_3 {pid[148570],reason[CPG_REASON_UNDEFINED(0)]}],
left_list[],
joined_list[
nodeid_3 {pid[148570],reason[CPG_REASON_JOIN(1)]}]
で通知が来ます。
では、いよいよメッセージを送ってみましょう。
ホスト(nodeid_1)上で Hello, I am nodeid_1 !
を投入します:
Hello, I am nodeid_1 !
... cpg_mcast_joined: 'Hello, I am nodeid_1 !'
すると、メッセージを送信した node_1 を含め、すべてのテストプログラム上で
cpg_deliver_cb_:
group_name[test],
nodeid_1,
pid[1299723],
msg[Hello, I am nodeid_1 !]
が表示されます。
同様に nodeid_2 上で Hello, I am nodeid_2 !
を投入すると
cpg_deliver_cb_:
group_name[test],
nodeid_2,
pid[148452],
msg[Hello, I am nodeid_2 !]
がすべてのテストプログラム上で表示されます。
nodeid_3 から Hello, I am nodeid_3 !
を投入したときも同様です。
cpg_deliver_cb_:
group_name[test],
nodeid_3,
pid[148570],
msg[Hello, I am nodeid_3 !]
欠点
実際に動かしてみると、Corosync を使った通信には制約があることがわかりました。
上記のテストプログラムをホスト上で動かすとき sudo
を付けたのですが、Corosync と通信できるユーザが限られます。root あるいは corosync.conf の uidgid に設定したユーザでないと Corosync と通信できません。
これは、IPC に共有メモリファイルを用いているため、ファイル所有者で通信できる相手が決まっているためです。
考えようによってはセキュリティ面で有利、とも言えますが、Corosync を一般アプリケーションのミドルウェアとして使うには不便です。多分、Corosync の上にもう一段通信用サーバをかませて、一般アプリケーションは通信用サーバとのみ通信する、というのが正しい使い方なのでしょう。
例えば、Apache Qpid ブローカクラスタは Corosync の CPG を通信に使っていました。
いました、というのは Apache Qpid はかなり前に CPG を使ったクラスタ構成をやめてしまったからです。
仮想同期は、全クライアントからのメッセージを単一のストリームとして全体に配信します。
黙っていても順序性保証を保証してくれる、というのは好ましい性質なのですが、マルチコアで各メッセージを並列に処理して性能を上げたい、というときには逆に阻害要因となります。
また、全クライアントがメッセージを同じ順番で処理すれば、全クライアントで同じ状態が保たれるはず、という仮定がそもそも成り立たない状況もあります。
例えば、タイマーでメッセージを監視して一定時間内に到達しなければタイムアウト処理する、という場合には全クライアントで歩調を合わせることができません。タイムアウト以外にも、メッセージから決定論的に動作を予測できないものはすべて一貫性を壊してしまいます。
そんなわけで、仮想同期を使ってもやっぱりクラスタアプリケーションを作るのもテストするのも大変なので Active-Active 構成のサポートは放棄して単純な Active-Standby だけサポートすることにしました、なので CPG も不要です、ということになってしまったようです。
まとめ
- Corosync の CPG 通信を使うとなにか分散アプリケーションが簡単に作れるのではないかな、と思いたってテストプログラムを作ってみました。
- Corosync クラスタを動かすのは割と簡単でした。
- CPG でメッセージを送受信するのもそう難しくはなさそうです。
- とはいえ、CPG が使えればそれだけでなにも考えずに書ける、というほど分散アプリケーションは簡単なものではないっぽいです。