0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Corosync を使った仮想同期通信

Last updated at Posted at 2024-12-22

はじめに

Corosync は Pacemaker と組み合わせたクラスタの障害検知・リカバリーに多く用いられます。

Corosync は Totem と呼ばれる単一リングプロトコル上で仮想同期を実現します。

仮想同期は、複製されたプロセスを管理し、それらの間の通信を調整するためのモデルです。

このモデルでは、一貫性のあるメッセージ配送の保証します。つまり、送信したメッセージがグループ内のプロセスすべてに同じ順序で到達する(あるいはいずれにも配送されない)ことを保証します。

Corosync での仮想同期通信は CPG (Closed Process Group) ライブラリから利用することができます。

  • CPG グループへの参加・離脱
  • CPG グループメンバーの取得、メンバー構成変更通知の受信
  • CPG グループ内でのメッセージ送信・受信

一貫性のあるメッセージ配送が保証されるならば手軽に高信頼な分散アプリケーションを作ることができるのかもしれない、ということで、試しにテストプログラムを動かしてみることにしました。

corosync クラスタのテスト環境を作る

corosync は計算機一台では動かないので、Docker でクラスタを立ち上げるようにします。

具体的には、

  • ホスト側で corosync を一つ動かす
  • Docker 上で ubuntu ノード2台立ち上げ、それぞれで corosync を動かす

という形で 3 つの corosync からなるクラスタを動かします。

3 つとも Docker 上で動かしてホスト側はきれいなままに保つというのでもよかったのですが、なんとなく全部仮想で動かすだけだと本当に動かしたという感じがしない、という気分の問題でこうしました。

corosync クラスタの起動

docker-compose.yml から 192.168.20.0/24 仮想ネットワーク上に corosync2(192.168.20.12), corosync3(192.168.20.13) の2つのノードを立ち上げることにします。ホスト側には 192.168.20.1 のアドレスを割り当てます。

適宜テストプログラムを入れ替えて動かすのに便利ないように、それぞれのノードは /volume/./corosync2/, ./corosync3 のディレクトリをマウントすることとしました(名前の一貫性があるほうが覚えやすいので、ホスト側用に ./corosync1/ のディレクトリも作っておきました)。

というわけで書いた docker-compose.yml はこんな感じです:

docker-compose.yml
networks:
  corosync_net:
    driver: bridge
    ipam:
      driver: default
      config:
        - subnet: 192.168.20.0/24
          gateway: 192.168.20.1

services:
  corosync2:
    image: corosync2
    build: ./corosync_docker
    container_name: corosync2
    volumes:
      - ./corosync2:/volume
    networks:
      corosync_net:
        ipv4_address: 192.168.20.12

  corosync3:
    image: corosync3
    build: ./corosync_docker
    container_name: corosync3
    volumes:
      - ./corosync3:/volume
    networks:
      corosync_net:
        ipv4_address: 192.168.20.13

build: ./corosync_docker と指定しているので、 ./corosync_docker/Dockerfile も作っておきます:

FROM ubuntu:latest

RUN \
  apt update && \
  apt -y install net-tools corosync

COPY corosync.conf /etc/corosync/corosync.conf
COPY corosync-loop.sh /usr/local/bin/corosync-loop.sh

CMD ["/bin/bash", "/usr/local/bin/corosync-loop.sh"]

corosync.conf はこんな感じで corosync パッケージをインストールしたときにできる /etc/corosync/corosyc.conf ほぼそのまま、nodelist に
192.168.20.1, 192.168.20.12, 192.168.20.13
のアドレスを追加しただけです:

# Please read the corosync.conf.5 manual page
system {
	allow_knet_handle_fallback: yes
}

totem {
	version: 2
	cluster_name: corosync_test
	crypto_cipher: none
	crypto_hash: none
}

logging {
	fileline: off
	to_stderr: yes
	to_logfile: yes
	logfile: /var/log/corosync/corosync.log
	to_syslog: yes
	debug: off
	logger_subsys {
		subsys: QUORUM
		debug: off
	}
}

quorum {
	provider: corosync_votequorum
}

nodelist {
	node {
		nodeid: 1
		name: node1
		ring0_addr: 192.168.20.1
	}
	node {
		nodeid: 2
		name: node2
		ring0_addr: 192.168.20.12
	}
	node {
		nodeid: 3
		name: node3
		ring0_addr: 192.168.20.13
	}
}

ホスト側の /etc/corosync/corosync.conf も同じ内容にしておきます。

corosync-loop.sh は corosync サービスを立ち上げて待ちに入る、というだけです:

service corosync start
while [ 1 ]; do sleep 1; done

立ち上げてみます:

docker compose up -d

docker ps で corosync2, corosync3 コンテナが動作しているはずです。

その後、ホスト(192.168.20.1)側の corosync を起動します:

sudo systemctl start corosync

docker の corosync が起動していなければクラスタを構成できないので corosync 起動で待ち状態のままになることに注意が必要です。

無事に corosync クラスタが起動できると、corosync-cfgtool コマンドで以下のように nodeid 1, nodeid 2, nodeid 3 がつながっていることが確認できます。

$ sudo corosync-cfgtool -s
Local node ID 1, transport knet
LINK ID 0 udp
	addr	= 192.168.20.1
	status:
		nodeid:          1:	localhost
		nodeid:          2:	connected
		nodeid:          3:	connected

corosync クラスタの停止

一応、後片付けの手順を書いておくと

ホスト側の corosync の停止

sudo systemctl stop corosync
sudo systemctl status corosync

docker ノードの停止

docker compose stop

docker イメージ含めてすべて停止・破棄するスクリプトも準備しておきました。

#!/bin/bash

# stop corosync cluster
docker compose stop

# remove corosync cluster
for pid in $(docker ps -a |awk '/corosync[1-4]/{print $1;}'); do
    docker rm ${pid}
done

# remove corosync cluster images
for img in $(docker images |awk '/corosync[1-4]/{print $3;}'); do
    docker rmi ${img}
done

CPG 通信のテスト

テストプログラムの準備

corosync クラスタを起動できたところで、以下のようなテストプログラムを書いてみました。

// g++ cpg_test.cc -lcpg -o cpg_test

#include <corosync/cpg.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <iostream>
#include <sstream>

namespace std {
// エラーコードを文字列に変換する
ostream& operator<<(ostream& os, cs_error_t cs) {
  switch (cs) {
  case CS_ERR_TRY_AGAIN: os << "Resource temporarily unavailable"; break;
  case CS_ERR_INVALID_PARAM: os << "Invalid argument"; break;
  case CS_ERR_ACCESS: os << "Permission denied"; break;
  case CS_ERR_LIBRARY: os << "The connection failed"; break;
  case CS_ERR_INTERRUPT: os << "System call interrupted by a signal"; break;
  case CS_ERR_NOT_SUPPORTED:
    os << "The requested protocol/functionality not supported"; break;
  case CS_ERR_MESSAGE_ERROR: os << "Incorrect auth message received"; break;
  case CS_ERR_NO_MEMORY:
    os << "Not enough memory to complete the requested task"; break;
  default: os << "UNKNOWN";
  }
  os << "(" << static_cast<int>(cs) << ")";
  return os;
}
// 理由コードを文字列に変換する
ostream& operator<<(ostream& os, cpg_reason_t reason) {
  switch (reason) {
  case CPG_REASON_UNDEFINED: os << "CPG_REASON_UNDEFINED"; break;
  case CPG_REASON_JOIN: os << "CPG_REASON_JOIN"; break;
  case CPG_REASON_LEAVE: os << "CPG_REASON_LEAVE"; break;
  case CPG_REASON_NODEDOWN: os << "CPG_REASON_NODEDOWN"; break;
  case CPG_REASON_NODEUP: os << "CPG_REASON_NODEUP"; break;
  case CPG_REASON_PROCDOWN: os << "CPG_REASON_PROCDOWN"; break;
  default: os << "UNKNOWN";
  }
  os << "(" << static_cast<int>(reason) << ")";
  return os;
}

ostream& operator<<(ostream& os, const cpg_name* name) {
  os << std::string(name->value, name->length);
  return os;
}

}  // namespace std



// エラーメッセージを表示
static void
report_cs_error(const char* title, cs_error_t cs) {
  std::cerr << title << ": " << cs << std::endl;
}

// DELIVER コールバック
static void
cpg_deliver_cb_(
    cpg_handle_t handle,
    const struct cpg_name *group_name,
    uint32_t nodeid,
    uint32_t pid,
    void *msg,
    size_t msg_len) {
  std::cout << __FUNCTION__ << ":" << std::endl;
  std::cout << "    group_name[" << group_name << "]," << std::endl;
  std::cout << "    nodeid_" << nodeid << "," << std::endl;
  std::cout << "    pid[" << pid << "]," << std::endl;
  std::cout << "    msg[" <<
    std::string(static_cast<char*>(msg), msg_len) << "]" << std::endl;
}

static std::string
cpg_address_list_to_str_(
    const struct cpg_address *addr_list,
    size_t addr_list_entries) {
  std::stringstream ss;
  for (size_t i = 0; i < addr_list_entries; ++i) {
    ss << "\n        nodeid_" << addr_list[i].nodeid << " {"
      "pid[" << addr_list[i].pid << "],"
      "reason[" << static_cast<cpg_reason_t>(addr_list[i].reason) << "]"
      "}";
  }
  return ss.str();
}

// CONFCHG コールバック
static void
cpg_confchg_cb_(
    cpg_handle_t handle,
    const struct cpg_name *group_name,
    const struct cpg_address *member_list,
    size_t member_list_entries,
    const struct cpg_address *left_list,
    size_t left_list_entries,
    const struct cpg_address *joined_list,
    size_t joined_list_entries) {
  std::cout << __FUNCTION__ << ":" << std::endl;
  std::cout << "    group_name[" << group_name << "]," << std::endl;
  std::cout << "    member_list[" << cpg_address_list_to_str_(
      member_list,
      member_list_entries) << "]," << std::endl;
  std::cout << "    left_list[" << cpg_address_list_to_str_(
      left_list,
      left_list_entries) << "]," << std::endl;
  std::cout << "    joined_list[" << cpg_address_list_to_str_(
      joined_list,
      joined_list_entries) << "]" << std::endl;
}

// TOTEM_CONFCHG コールバック
static void
cpg_totem_confchg_cb_(
    cpg_handle_t handle,
    struct cpg_ring_id ring_id,
    uint32_t member_list_entries,
    const uint32_t *member_list) {
  std::cout << __FUNCTION__ << ":" << std::endl;
  std::cout << "    ring_id["
    "nodeid_" << ring_id.nodeid <<
    " (seq:" << ring_id.seq << ")]," << std::endl;
  std::cout << "    member_list[";
  for (uint32_t i = 0; i < member_list_entries; ++i) {
    if (i != 0) {
      std::cout << ",";
    }
    std::cout << member_list[i];
  }
  std::cout << "]" << std::endl;
}

static cpg_model_v1_data_t CPG_MODEL_DATA_ {
  .cpg_deliver_fn = cpg_deliver_cb_,
  .cpg_confchg_fn = cpg_confchg_cb_,
  .cpg_totem_confchg_fn = cpg_totem_confchg_cb_,
  .flags = CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
};

int main(int argc, char* argv[]) {
  cpg_handle_t handle;

  // 1. 初期化
  const cs_error_t cs_init = cpg_model_initialize(
      &handle,
      CPG_MODEL_V1,
      reinterpret_cast<cpg_model_data_t*>(&CPG_MODEL_DATA_),
      NULL);
  if (cs_init != CS_OK) {
    report_cs_error("cpg_initialize", cs_init);
    exit(1);
  }

  // 2. 自ノード ID の取得と表示
  uint32_t local_nodeid;
  const cs_error_t cs_lg = cpg_local_get(handle, &local_nodeid);
  if (cs_lg != CS_OK) {
    report_cs_error("cpg_local_get", cs_lg);
    exit(1);
  }
  std::cout << "local_nodeid = " << local_nodeid << std::endl;

  // 3. グループへの参加
  std::cout << "... cpg_join" << std::endl;
  struct cpg_name group_name;
  group_name.length = strlen("test");
  strncpy(group_name.value, "test", sizeof(group_name.value));
  const cs_error_t cs_join = cpg_join(handle, &group_name);
  if (cs_join != CS_OK) {
    report_cs_error("cpg_join", cs_join);
    exit(1);
  }

  // 4. グループに属するメンバーの取得と表示
  std::cout << "... cpg_membership_get" << std::endl;
  cpg_address member_list[8];
  int member_list_len = 8;
  const cs_error_t cs_mg = cpg_membership_get(
      handle,
      &group_name,
      member_list,
      &member_list_len);
  if (cs_mg != CS_OK) {
    report_cs_error("cpg_membership_get", cs_mg);
    exit(1);
  }
  for (int i = 0; i < member_list_len; ++i) {
    std::cout << "    member[" << i << "] {" << std::endl;
    std::cout << "        nodeid_" << member_list[i].nodeid << "," << std::endl;
    std::cout << "        pid[" << member_list[i].pid << "]," << std::endl;
    std::cout << "        reason[" <<
      static_cast<cpg_reason_t>(member_list[i].reason) << "]}" << std::endl;
  }

  // 5. イベントループ用に FD を取得
  std::cout << "... cpg_fd_get" << std::endl;
  int cpg_fd;
  const cs_error_t cs_fg = cpg_fd_get(handle, &cpg_fd);
  if (cs_fg != CS_OK) {
    report_cs_error("cpg_fd_get", cs_fg);
    exit(1);
  }
  std::cout << "    cpg_fd = " << cpg_fd << std::endl;

  // 6. イベントループ
  //   標準入力および CPG からの通知を監視
  std::cout << "... start epoll loop" << std::endl;
  epoll_event ev;
  ev.events = EPOLLIN;
  ev.data.fd = cpg_fd;
  int epollfd = epoll_create1(0);
  if (epollfd == -1) {
    perror("epoll_create1");
    exit(1);
  }
  if (epoll_ctl(epollfd, EPOLL_CTL_ADD, cpg_fd, &ev) == -1) {
    perror("epoll_ctl(cpg)");
    exit(1);
  }
  ev.data.fd = STDIN_FILENO;
  if (epoll_ctl(epollfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev) == -1) {
    perror("epoll_ctl(stdin)");
    exit(1);
  }
  for (;;) {
#define MAX_EVENTS 10
    epoll_event events[MAX_EVENTS];
    const int nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
    if (nfds == -1) {
      perror("epoll_wait");
      exit(1);
    }
    for (int n = 0; n < nfds; ++n) {
      if (events[n].data.fd == STDIN_FILENO) {
        // 標準入力からの入力をグループに送信
        std::string line;
        std::getline(std::cin, line);
        char ibuf[256];
        strncpy(ibuf, line.c_str(), sizeof(ibuf));
        const size_t ibuf_len =
          (line.size() < sizeof(ibuf)) ? line.size() : sizeof(ibuf);
        iovec iov;
        iov.iov_base = ibuf;
        iov.iov_len = ibuf_len;
        std::cout << "... cpg_mcast_joined: '" <<
          std::string(ibuf, ibuf_len) << "'" << std::endl;
        const cs_error_t cs_mj = cpg_mcast_joined(
            handle, CPG_TYPE_AGREED,
            &iov,
            1);
        if (cs_mj != CS_OK) {
          report_cs_error("cpg_mcast_joined", cs_mj);
          exit(1);
        }
      }
      if (events[n].data.fd == cpg_fd) {
        // CPG からの通知をディスパッチ
        const cs_error_t cs_d = cpg_dispatch(handle, CS_DISPATCH_ALL);
        if (cs_d != CS_OK) {
          report_cs_error("cpg_dispatch", cs_d);
          exit(1);
        }
      }
    }
  }
}

これをホスト上でコンパイルして、corosync2/, corosync3/ ディレクトリ上に配置して各クラスタノード上で実行できるようにします。

実行

まず、ホスト上でテストプログラムを起動してみます。

$ sudo ./cpg_test
local_nodeid = 1
... cpg_join
... cpg_membership_get
... cpg_fd_get
    cpg_fd = 3
... start epoll loop
cpg_confchg_cb_:
    group_name[test],
    member_list[
        nodeid_1 {pid[1299723],reason[CPG_REASON_UNDEFINED(0)]}],
    left_list[],
    joined_list[
        nodeid_1 {pid[1299723],reason[CPG_REASON_JOIN(1)]}]
cpg_totem_confchg_cb_:
    ring_id[nodeid_1 (seq:106)],
    member_list[1,2,3]

cpg_join でグループに参加、まだ一人目なので cpg_membership_get でなにも取得できないものの、イベントループでは cpg_confchg コールバックと cpg_totem_confchg で自身がグループに参加したときのイベントが通知されています。

では、ここで corosync2 ノード上でテストプログラムを起動してみます。

$ docker exec -it corosync2 bash
root@62128d3f6e33:/# /volume/cpg_test
local_nodeid = 2
... cpg_join
... cpg_membership_get
    member[0] {
        nodeid_1,
        pid[1299723],
        reason[CPG_REASON_UNDEFINED(0)]}
... cpg_fd_get
    cpg_fd = 3
... start epoll loop
cpg_confchg_cb_:
    group_name[test],
    member_list[
        nodeid_1 {pid[1299723],reason[CPG_REASON_UNDEFINED(0)]}
        nodeid_2 {pid[148452],reason[CPG_REASON_UNDEFINED(0)]}],
    left_list[],
    joined_list[
        nodeid_2 {pid[148452],reason[CPG_REASON_JOIN(1)]}]
cpg_totem_confchg_cb_:
    ring_id[nodeid_1 (seq:106)],
    member_list[1,2,3]

今度は cpg_membership_get で nodeid_1 が見えました。また、cpg_confchg_cb_ では nodeid_1 と nodeid_2 がメンバーとして存在していること(member_list)、あらたに参加したのが nodeid_2 であること(joined_list)として見えています。

また、ホスト側で起動していたテストプログラム側にも cpg_confchg 通知が来ています:

cpg_confchg_cb_:
    group_name[test],
    member_list[
        nodeid_1 {pid[1299723],reason[CPG_REASON_UNDEFINED(0)]}
        nodeid_2 {pid[148452],reason[CPG_REASON_UNDEFINED(0)]}],
    left_list[],
    joined_list[
        nodeid_2 {pid[148452],reason[CPG_REASON_JOIN(1)]}]

さらに corosync3 上でもテストプログラムを起動すると:

$ docker exec -it corosync3 bash
root@46762d9ceb63:/# /volume/cpg_test
local_nodeid = 3
... cpg_join
... cpg_membership_get
    member[0] {
        nodeid_1,
        pid[1299723],
        reason[CPG_REASON_UNDEFINED(0)]}
    member[1] {
        nodeid_2,
        pid[148452],
        reason[CPG_REASON_UNDEFINED(0)]}
... cpg_fd_get
    cpg_fd = 3
... start epoll loop
cpg_confchg_cb_:
    group_name[test],
    member_list[
        nodeid_1 {pid[1299723],reason[CPG_REASON_UNDEFINED(0)]}
        nodeid_2 {pid[148452],reason[CPG_REASON_UNDEFINED(0)]}
        nodeid_3 {pid[148570],reason[CPG_REASON_UNDEFINED(0)]}],
    left_list[],
    joined_list[
        nodeid_3 {pid[148570],reason[CPG_REASON_JOIN(1)]}]
cpg_totem_confchg_cb_:
    ring_id[nodeid_1 (seq:106)],
    member_list[1,2,3]

ホスト(nodeid_1), corosync2(nodeid_2) 側にも

cpg_confchg_cb_:
    group_name[test],
    member_list[
        nodeid_1 {pid[1299723],reason[CPG_REASON_UNDEFINED(0)]}
        nodeid_2 {pid[148452],reason[CPG_REASON_UNDEFINED(0)]}
        nodeid_3 {pid[148570],reason[CPG_REASON_UNDEFINED(0)]}],
    left_list[],
    joined_list[
        nodeid_3 {pid[148570],reason[CPG_REASON_JOIN(1)]}]

で通知が来ます。

では、いよいよメッセージを送ってみましょう。

ホスト(nodeid_1)上で Hello, I am nodeid_1 ! を投入します:

Hello, I am nodeid_1 !
... cpg_mcast_joined: 'Hello, I am nodeid_1 !'

すると、メッセージを送信した node_1 を含め、すべてのテストプログラム上で

cpg_deliver_cb_:
    group_name[test],
    nodeid_1,
    pid[1299723],
    msg[Hello, I am nodeid_1 !]

が表示されます。

同様に nodeid_2 上で Hello, I am nodeid_2 ! を投入すると

cpg_deliver_cb_:
    group_name[test],
    nodeid_2,
    pid[148452],
    msg[Hello, I am nodeid_2 !]

がすべてのテストプログラム上で表示されます。

nodeid_3 から Hello, I am nodeid_3 ! を投入したときも同様です。

cpg_deliver_cb_:
    group_name[test],
    nodeid_3,
    pid[148570],
    msg[Hello, I am nodeid_3 !]

欠点

実際に動かしてみると、Corosync を使った通信には制約があることがわかりました。

上記のテストプログラムをホスト上で動かすとき sudo を付けたのですが、Corosync と通信できるユーザが限られます。root あるいは corosync.conf の uidgid に設定したユーザでないと Corosync と通信できません。

これは、IPC に共有メモリファイルを用いているため、ファイル所有者で通信できる相手が決まっているためです。

考えようによってはセキュリティ面で有利、とも言えますが、Corosync を一般アプリケーションのミドルウェアとして使うには不便です。多分、Corosync の上にもう一段通信用サーバをかませて、一般アプリケーションは通信用サーバとのみ通信する、というのが正しい使い方なのでしょう。

例えば、Apache Qpid ブローカクラスタは Corosync の CPG を通信に使っていました。

いました、というのは Apache Qpid はかなり前に CPG を使ったクラスタ構成をやめてしまったからです。

仮想同期は、全クライアントからのメッセージを単一のストリームとして全体に配信します。
黙っていても順序性保証を保証してくれる、というのは好ましい性質なのですが、マルチコアで各メッセージを並列に処理して性能を上げたい、というときには逆に阻害要因となります。

また、全クライアントがメッセージを同じ順番で処理すれば、全クライアントで同じ状態が保たれるはず、という仮定がそもそも成り立たない状況もあります。

例えば、タイマーでメッセージを監視して一定時間内に到達しなければタイムアウト処理する、という場合には全クライアントで歩調を合わせることができません。タイムアウト以外にも、メッセージから決定論的に動作を予測できないものはすべて一貫性を壊してしまいます。

そんなわけで、仮想同期を使ってもやっぱりクラスタアプリケーションを作るのもテストするのも大変なので Active-Active 構成のサポートは放棄して単純な Active-Standby だけサポートすることにしました、なので CPG も不要です、ということになってしまったようです。

まとめ

  • Corosync の CPG 通信を使うとなにか分散アプリケーションが簡単に作れるのではないかな、と思いたってテストプログラムを作ってみました。
  • Corosync クラスタを動かすのは割と簡単でした。
  • CPG でメッセージを送受信するのもそう難しくはなさそうです。
  • とはいえ、CPG が使えればそれだけでなにも考えずに書ける、というほど分散アプリケーションは簡単なものではないっぽいです。
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?