1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

コンピュータ将棋ソフトとの対局サーバーを立てよう<その15>

Last updated at Posted at 2017-03-16

前回の記事 : http://qiita.com/muzudho1/items/f73f562dabd3cfd655c4

終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
14:55:09 109 34_cs (P)___ enqueue begin [a1]
14:55:09 109 34_cs channel.open [0] begin
14:55:09 110 34_cs connection.open begin
14:55:09 116 34_cs connection.open end
14:55:09 118 34_cs channel.open [0] end
14:55:09 124 34_cs (P)___ enqueue end
14:55:09 125 34_cs connection.close [0] begin
14:55:09 125 34_cs channel.close [0] begin
14:55:09 130 34_cs channel.close [0] end
14:55:09 136 34_cs connection.close [0] end
Enqueue? > a2
14:55:11 181 34_cs (P)___ enqueue begin [a2]
14:55:11 181 34_cs channel.open [0] begin
14:55:11 181 34_cs connection.open begin
14:55:11 186 34_cs connection.open end
14:55:11 187 34_cs channel.open [0] end
14:55:11 187 34_cs (P)___ enqueue end
14:55:11 187 34_cs connection.close [0] begin
14:55:11 187 34_cs channel.close [0] begin
14:55:11 189 34_cs channel.close [0] end
14:55:11 190 34_cs connection.close [0] end
Enqueue? > 14:55:11 202 34_cs <-------- [interrupt!] Dequeue(^q^) a2daze
a44
14:55:17 412 34_cs (P)___ enqueue begin [4]
14:55:17 412 34_cs channel.open [0] begin
14:55:17 412 34_cs connection.open begin
14:55:17 423 34_cs connection.open end
14:55:17 425 34_cs channel.open [0] end
14:55:17 425 34_cs (P)___ enqueue end
14:55:17 425 34_cs connection.close [0] begin
14:55:17 425 34_cs channel.close [0] begin
14:55:17 427 34_cs channel.close [0] end
14:55:17 428 34_cs connection.close [0] end
Enqueue? > 14:55:17 439 34_cs <-------- [interrupt!] Dequeue(^q^) 4daze

3秒程度待っても 1回目の打鍵に応答し損ねるが、7秒待つと 1回目の打鍵に応答してくることがあるのは前回の記事までだった。
結局 確定的ではない。

そして問題は

05:48:13'015  35a2_cpp[0] connection.open begin
05:48:13'015 35a2_cpp[0] connection.open end
05:48:13'015 35a2_cpp[0] channel.open end
05:48:13'015 35a2_cpp (P)___ WKP.SM_.declareQueue end
05:48:13'015 35a2_cpp (P)___ WKP.SM_.bindQueue begin
05:48:13'015 35a2_cpp (P)___ WKP.SM_.bindQueue end
05:48:13'015 35a2_cpp (P)___ WKP.SM_ ev_run begin
05:55:09'123 35a2_cpp ___(C) CNS.received begin [a1]
05:55:09'124 35a2_cpp rotationBuffer.putMessage begin [5] before lock [a1]
05:55:09'124 35a2_cpp rotationBuffer.putMessage begin [5] [a1]
05:55:09'124 35a2_cpp rotationBuffer.putMessage end next=[6]
05:55:09'124 35a2_cpp ___(C) CNS.received.ack begin [6] [a1]
05:55:09'124 35a2_cpp ___(C) CNS.received.ack end [6]
05:55:09'124 35a2_cpp ___(C) CNS.received end [a1]
05:55:09'141 35a2_cpp rotationBuffer.removeMessage begin [5] [a1]
05:55:09'141 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[6]
05:55:09'141 35a2_cpp (P)___ WKP.bindQueue.onSuccess next,publish message=[a1daze] ****important****
05:55:09'141 35a2_cpp[0] connection.close begin
05:55:09'141 35a2_cpp[0] channel.close begin
05:55:09'141 35a2_cpp[0] channel.close end
05:55:09'141 35a2_cpp[0] connection.close end
05:55:09'142 35a2_cpp (P)___ WKP.SM_ ev_run end
05:55:09'142 35a2_cpp (P)___ WKP.SM_ end
05:55:09'142 35a2_cpp (P)___ WKP.SM_ begin
05:55:09'142 35a2_cpp (P)___ WKP.SM_.declareQueue begin
05:55:09'142 35a2_cpp[0] channel.open begin
05:55:09'142  35a2_cpp[0] connection.open begin
05:55:09'142 35a2_cpp[0] connection.open end
05:55:09'142 35a2_cpp[0] channel.open end
05:55:09'142 35a2_cpp (P)___ WKP.SM_.declareQueue end
05:55:09'142 35a2_cpp (P)___ WKP.SM_.bindQueue begin
05:55:09'142 35a2_cpp (P)___ WKP.SM_.bindQueue end
05:55:09'142 35a2_cpp (P)___ WKP.SM_ ev_run begin
05:55:11'189 35a2_cpp ___(C) CNS.received begin [a2]
05:55:11'189 35a2_cpp rotationBuffer.putMessage begin [6] before lock [a2]
05:55:11'189 35a2_cpp rotationBuffer.putMessage begin [6] [a2]
05:55:11'189 35a2_cpp rotationBuffer.putMessage end next=[7]
05:55:11'189 35a2_cpp ___(C) CNS.received.ack begin [7] [a2]
05:55:11'189 35a2_cpp ___(C) CNS.received.ack end [7]
05:55:11'189 35a2_cpp ___(C) CNS.received end [a2]
05:55:11'197 35a2_cpp rotationBuffer.removeMessage begin [6] [a2]
05:55:11'197 35a2_cpp rotationBuffer.removeMessage end return=[a2] next=[7]
05:55:11'197 35a2_cpp (P)___ WKP.bindQueue.onSuccess next,publish message=[a2daze] ****important****
05:55:11'197 35a2_cpp[0] connection.close begin
05:55:11'197 35a2_cpp[0] channel.close begin
05:55:11'197 35a2_cpp[0] channel.close end
05:55:11'197 35a2_cpp[0] connection.close end
05:55:11'198 35a2_cpp (P)___ WKP.bindQueue.publish.commited [] **********important**********
05:55:11'203 35a2_cpp (P)___ WKP.SM_ ev_run end
05:55:11'203 35a2_cpp (P)___ WKP.SM_ end
05:55:11'203 35a2_cpp (P)___ WKP.SM_ begin
05:55:11'203 35a2_cpp (P)___ WKP.SM_.declareQueue begin
05:55:11'203 35a2_cpp[0] channel.open begin
05:55:11'203  35a2_cpp[0] connection.open begin
05:55:11'203 35a2_cpp[0] connection.open end

C++ 側で、publish に対応する commited がないペアがあることだった。
ロールバックすらしていない。
何が起こっているのか?

				getChannel(ENQUEUE_INDEX)->startTransaction();
				if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
					getChannel(ENQUEUE_INDEX)->commitTransaction()
						.onSuccess([&message]() {
						// パブリッシュ成功
						std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.bindQueue.publish.commited [" << message << "] **********important**********" << std::endl;
					})
						.onError([&message](const char *errMsg) {
						std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.publish.failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
					});
				}
				else {
					std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.publish.failuer message=[" << message << "]" << std::endl;
					getChannel(ENQUEUE_INDEX)->rollbackTransaction();
				}

				// 接続を切って ev_run ループから抜けよう。かつ、このハンドラーから抜けよう。パブリッシュのコミットに必要
				closeConnection(ENQUEUE_INDEX);
				break;

エラーメッセージは出力されていない。
じゃあ、全部 標準出力に出すか。

			if ("" != message)
			{
				// 末尾に daze を付ける。
				message += "daze";
				std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[" << message << "] ****important**** " << std::endl;

				getChannel(ENQUEUE_INDEX)->startTransaction();
				if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
					std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if begin" << std::endl;

					getChannel(ENQUEUE_INDEX)->commitTransaction()
						.onSuccess([&message]() {
						// パブリッシュ成功
						std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [" << message << "] **********important**********" << std::endl;
					})
						.onError([&message](const char *errMsg) {
						std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError [" << message << "]" << std::endl;
						std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
					});

					std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if end" << std::endl;
				} else {
					std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else begin" << std::endl;
					std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
					getChannel(ENQUEUE_INDEX)->rollbackTransaction();
					std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else end" << std::endl;
				}

				// 接続を切って ev_run ループから抜けよう。かつ、このハンドラーから抜けよう。パブリッシュのコミットに必要
				std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin" << std::endl;
				closeConnection(ENQUEUE_INDEX);
				std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end" << std::endl;
				break;
			}

ひとまず これだけ吐く。

エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
15:12:02 345 34_cs (P)___ enqueue begin [a1]
15:12:02 345 34_cs channel.open [0] begin
15:12:02 346 34_cs connection.open begin
15:12:02 356 34_cs connection.open end
15:12:02 358 34_cs channel.open [0] end
15:12:02 364 34_cs (P)___ enqueue end
15:12:02 365 34_cs connection.close [0] begin
15:12:02 365 34_cs channel.close [0] begin
15:12:02 374 34_cs channel.close [0] end
15:12:02 391 34_cs <-------- [interrupt!] Dequeue(^q^) a1daze
15:12:02 392 34_cs connection.close [0] end
Enqueue? > a2
15:12:03 775 34_cs (P)___ enqueue begin [a2]
15:12:03 775 34_cs channel.open [0] begin
15:12:03 775 34_cs connection.open begin
15:12:03 782 34_cs connection.open end
15:12:03 784 34_cs channel.open [0] end
15:12:03 784 34_cs (P)___ enqueue end
15:12:03 784 34_cs connection.close [0] begin
15:12:03 784 34_cs channel.close [0] begin
15:12:03 786 34_cs channel.close [0] end
15:12:03 786 34_cs connection.close [0] end
Enqueue? > 15:12:03 792 34_cs <-------- [interrupt!] Dequeue(^q^) a2daze
a3
15:12:06 120 34_cs (P)___ enqueue begin [a3]
15:12:06 122 34_cs channel.open [0] begin
15:12:06 123 34_cs connection.open begin
15:12:06 143 34_cs connection.open end
15:12:06 150 34_cs channel.open [0] end
15:12:06 151 34_cs (P)___ enqueue end
15:12:06 151 34_cs connection.close [0] begin
15:12:06 151 34_cs channel.close [0] begin
15:12:06 152 34_cs channel.close [0] end
15:12:06 153 34_cs connection.close [0] end
Enqueue? > a4
15:12:09 127 34_cs (P)___ enqueue begin [a4]
15:12:09 128 34_cs channel.open [0] begin
15:12:09 128 34_cs connection.open begin
15:12:09 140 34_cs connection.open end
15:12:09 142 34_cs channel.open [0] end
15:12:09 143 34_cs (P)___ enqueue end
15:12:09 143 34_cs connection.close [0] begin
15:12:09 143 34_cs channel.close [0] begin
15:12:09 143 34_cs channel.close [0] end
15:12:09 144 34_cs connection.close [0] end
Enqueue? > a5
15:12:10 727 34_cs (P)___ enqueue begin [a5]
15:12:10 727 34_cs channel.open [0] begin
15:12:10 727 34_cs connection.open begin
15:12:10 738 34_cs connection.open end
15:12:10 740 34_cs channel.open [0] end
15:12:10 740 34_cs (P)___ enqueue end
15:12:10 740 34_cs connection.close [0] begin
15:12:10 740 34_cs channel.close [0] begin
15:12:10 741 34_cs channel.close [0] end
15:12:10 742 34_cs connection.close [0] end

不具合の起こり方が変わった。
なんで 標準出力を書き足す と挙動が変わるのか……。

06:10:31'468 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
06:10:31'468 35a2_cpp main dump begin
06:10:31'468 tamesi35a2_cs.cs Dump
    name_queues         [ENQUEUE_INDEX]    =[1113]
    lifeSpan_queues     [ENQUEUE_INDEX]    =[1]
    ----
    name_queues         [DEQUEUE_INDEX]    =[1112]
    lifeSpan_queues     [DEQUEUE_INDEX]    =[3]
    ----
06:10:31'468 35a2_cpp main dump end
06:10:31'468 35a2_cpp ___(C) ___ begin
06:10:31'468 35a2_cpp ___(C) DQ_ begin
06:10:31'468 35a2_cpp[1] channel.open begin
06:10:31'468 35a2_cpp[1] connection.open begin
06:10:31'469 35a2_cpp (P)___ WKP begin
06:10:31'469 35a2_cpp (P)___ WKP.SM_ begin
06:10:31'469 35a2_cpp (P)___ WKP.SM_.declareQueue begin
06:10:31'469 35a2_cpp[0] channel.open begin
06:10:31'469 35a2_cpp[0] connection.open begin
06:10:31'469 35a2_cpp[0] connection.open end
06:10:31'470 35a2_cpp[0] channel.open end
06:10:31'470 35a2_cpp[1] connection.open end
06:10:31'470 35a2_cpp[1] channel.open end
06:10:31'471 35a2_cpp ___(C) DQ_ end
06:10:31'471 35a2_cpp ___(C) CNS begin
06:10:31'471 35a2_cpp ___(C) CNS end
06:10:31'471 35a2_cpp ___(C) ___ ev_run begin
06:10:31'471 35a2_cpp (P)___ WKP.SM_.declareQueue end
06:10:31'471 35a2_cpp (P)___ WKP.SM_.bindQueue begin
06:10:31'471 35a2_cpp (P)___ WKP.SM_.bindQueue end
06:10:31'471 35a2_cpp (P)___ WKP.SM_ ev_run begin
06:12:02'367 35a2_cpp ___(C) CNS.received begin [a1]
06:12:02'367 35a2_cpp rotationBuffer.putMessage begin [0] before lock [a1]
06:12:02'367 35a2_cpp rotationBuffer.putMessage begin [0] [a1]
06:12:02'367 35a2_cpp rotationBuffer.putMessage end next=[1]
06:12:02'367 35a2_cpp ___(C) CNS.received.ack begin [1] [a1]
06:12:02'367 35a2_cpp ___(C) CNS.received.ack end [1]
06:12:02'367 35a2_cpp ___(C) CNS.received end [a1]
06:12:02'375 35a2_cpp rotationBuffer.removeMessage begin [0] [a1]
06:12:02'375 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[1]
06:12:02'375 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a1daze] ****important****
06:12:02'375 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:12:02'375 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:12:02'375 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
06:12:02'375 35a2_cpp[0] connection.close begin
06:12:02'375 35a2_cpp[0] channel.close begin
06:12:02'375 35a2_cpp[0] channel.close end
06:12:02'375 35a2_cpp[0] connection.close end
06:12:02'375 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
06:12:02'377 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [] **********important**********
06:12:02'388 35a2_cpp (P)___ WKP.SM_ ev_run end
06:12:02'388 35a2_cpp (P)___ WKP.SM_ end
06:12:02'388 35a2_cpp (P)___ WKP.SM_ begin
06:12:02'388 35a2_cpp (P)___ WKP.SM_.declareQueue begin
06:12:02'388 35a2_cpp[0] channel.open begin
06:12:02'388 35a2_cpp[0] connection.open begin
06:12:02'388 35a2_cpp[0] connection.open end
06:12:02'388 35a2_cpp[0] channel.open end
06:12:02'388 35a2_cpp (P)___ WKP.SM_.declareQueue end
06:12:02'388 35a2_cpp (P)___ WKP.SM_.bindQueue begin
06:12:02'388 35a2_cpp (P)___ WKP.SM_.bindQueue end
06:12:02'388 35a2_cpp (P)___ WKP.SM_ ev_run begin
06:12:03'785 35a2_cpp ___(C) CNS.received begin [a2]
06:12:03'785 35a2_cpp rotationBuffer.putMessage begin [1] before lock [a2]
06:12:03'785 35a2_cpp rotationBuffer.putMessage begin [1] [a2]
06:12:03'785 35a2_cpp rotationBuffer.putMessage end next=[2]
06:12:03'785 35a2_cpp ___(C) CNS.received.ack begin [2] [a2]
06:12:03'785 35a2_cpp ___(C) CNS.received.ack end [2]
06:12:03'785 35a2_cpp ___(C) CNS.received end [a2]
06:12:03'791 35a2_cpp rotationBuffer.removeMessage begin [1] [a2]
06:12:03'791 35a2_cpp rotationBuffer.removeMessage end return=[a2] next=[2]
06:12:03'791 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a2daze] ****important****
06:12:03'791 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:12:03'791 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:12:03'791 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
06:12:03'791 35a2_cpp[0] connection.close begin
06:12:03'791 35a2_cpp[0] channel.close begin
06:12:03'791 35a2_cpp[0] channel.close end
06:12:03'791 35a2_cpp[0] connection.close end
06:12:03'791 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end

接続を切ることが コミットの条件の1つのようにも見えるが、ログも 接続を切ったところで止まっていてワロスwww

接続を切るのと、コミットが完了するのと、どっちが先か?

		bool infinityLoop = true;
		while (infinityLoop)
		{
			// 回転式バッファーからメッセージを切り取る。
			std::string message = rotationBuffer::removeMessage();

			if ("" != message)
			{
				// 末尾に daze を付ける。
				message += "daze";
				std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[" << message << "] ****important**** " << std::endl;

				getChannel(ENQUEUE_INDEX)->startTransaction();
				if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
					std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if begin" << std::endl;

					getChannel(ENQUEUE_INDEX)->commitTransaction()
						.onSuccess([&infinityLoop,&message]() {
						// パブリッシュ成功
						std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [" << message << "] **********important**********" << std::endl;

						// 接続を切って ev_run ループから抜けよう。かつ、このハンドラーから抜けよう。パブリッシュのコミットに必要
						std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess closeConnection begin" << std::endl;
						closeConnection(ENQUEUE_INDEX);
						std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess closeConnection end" << std::endl;
						infinityLoop = false;
					})
						.onError([&message](const char *errMsg) {
						std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError [" << message << "]" << std::endl;
						std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
					});

					std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if end" << std::endl;
				} else {
					std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else begin" << std::endl;
					std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
					getChannel(ENQUEUE_INDEX)->rollbackTransaction();
					std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else end" << std::endl;
				}

				//// 接続を切って ev_run ループから抜けよう。かつ、このハンドラーから抜けよう。パブリッシュのコミットに必要
				//std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin" << std::endl;
				//closeConnection(ENQUEUE_INDEX);
				//std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end" << std::endl;
				//break;
			}
			std::this_thread::sleep_for(std::chrono::milliseconds(20));
		}

書き換えた。

終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
15:23:09 612 34_cs (P)___ enqueue begin [a1]
15:23:09 613 34_cs channel.open [0] begin
15:23:09 613 34_cs connection.open begin
15:23:09 620 34_cs connection.open end
15:23:09 623 34_cs channel.open [0] end
15:23:09 627 34_cs (P)___ enqueue end
15:23:09 628 34_cs connection.close [0] begin
15:23:09 628 34_cs channel.close [0] begin
15:23:09 634 34_cs channel.close [0] end
15:23:09 643 34_cs connection.close [0] end
Enqueue? > a2
15:23:10 627 34_cs (P)___ enqueue begin [a2]
15:23:10 627 34_cs channel.open [0] begin
15:23:10 627 34_cs connection.open begin
15:23:10 632 34_cs connection.open end
15:23:10 634 34_cs channel.open [0] end
15:23:10 634 34_cs (P)___ enqueue end
15:23:10 634 34_cs connection.close [0] begin
15:23:10 634 34_cs channel.close [0] begin
15:23:10 635 34_cs channel.close [0] end
15:23:10 636 34_cs connection.close [0] end
Enqueue? > a3
15:23:11 355 34_cs (P)___ enqueue begin [a3]
15:23:11 355 34_cs channel.open [0] begin
15:23:11 355 34_cs connection.open begin
15:23:11 365 34_cs connection.open end
15:23:11 374 34_cs channel.open [0] end
15:23:11 375 34_cs (P)___ enqueue end
15:23:11 375 34_cs connection.close [0] begin
15:23:11 375 34_cs channel.close [0] begin
15:23:11 376 34_cs channel.close [0] end
15:23:11 377 34_cs connection.close [0] end
Enqueue? > a4
15:23:12 107 34_cs (P)___ enqueue begin [a4]
15:23:12 107 34_cs channel.open [0] begin
15:23:12 107 34_cs connection.open begin
15:23:12 114 34_cs connection.open end
15:23:12 115 34_cs channel.open [0] end
15:23:12 115 34_cs (P)___ enqueue end
15:23:12 115 34_cs connection.close [0] begin
15:23:12 116 34_cs channel.close [0] begin
15:23:12 118 34_cs channel.close [0] end
15:23:12 118 34_cs connection.close [0] end
Enqueue? > a54
15:23:12 939 34_cs (P)___ enqueue begin [a54]
15:23:12 939 34_cs channel.open [0] begin
15:23:12 939 34_cs connection.open begin
15:23:12 945 34_cs connection.open end
15:23:12 950 34_cs channel.open [0] end
15:23:12 950 34_cs (P)___ enqueue end
15:23:12 951 34_cs connection.close [0] begin
15:23:12 951 34_cs channel.close [0] begin
15:23:12 952 34_cs channel.close [0] end
15:23:12 953 34_cs connection.close [0] end
Enqueue? > a6
15:23:14 083 34_cs (P)___ enqueue begin [a6]
15:23:14 083 34_cs channel.open [0] begin
15:23:14 083 34_cs connection.open begin
15:23:14 093 34_cs connection.open end
15:23:14 095 34_cs channel.open [0] end
15:23:14 095 34_cs (P)___ enqueue end
15:23:14 095 34_cs connection.close [0] begin
15:23:14 095 34_cs channel.close [0] begin
15:23:14 099 34_cs channel.close [0] end
15:23:14 099 34_cs connection.close [0] end
Enqueue? > a6
15:23:14 979 34_cs (P)___ enqueue begin [a6]
15:23:14 979 34_cs channel.open [0] begin
15:23:14 979 34_cs connection.open begin
15:23:14 983 34_cs connection.open end
15:23:14 985 34_cs channel.open [0] end
15:23:14 985 34_cs (P)___ enqueue end
15:23:14 985 34_cs connection.close [0] begin
15:23:14 986 34_cs channel.close [0] begin
15:23:14 989 34_cs channel.close [0] end
15:23:14 989 34_cs connection.close [0] end
Enqueue? > a8
15:23:16 059 34_cs (P)___ enqueue begin [a8]
15:23:16 059 34_cs channel.open [0] begin
15:23:16 059 34_cs connection.open begin
15:23:16 074 34_cs connection.open end
15:23:16 076 34_cs channel.open [0] end
15:23:16 076 34_cs (P)___ enqueue end
15:23:16 076 34_cs connection.close [0] begin
15:23:16 076 34_cs channel.close [0] begin
15:23:16 077 34_cs channel.close [0] end
15:23:16 078 34_cs connection.close [0] end

まったく応答がなくなった。

06:22:19'409 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
06:22:19'409 35a2_cpp main dump begin
06:22:19'409 tamesi35a2_cs.cs Dump
    name_queues         [ENQUEUE_INDEX]    =[1113]
    lifeSpan_queues     [ENQUEUE_INDEX]    =[1]
    ----
    name_queues         [DEQUEUE_INDEX]    =[1112]
    lifeSpan_queues     [DEQUEUE_INDEX]    =[3]
    ----
06:22:19'409 35a2_cpp main dump end
06:22:19'410 35a2_cpp ___(C) ___ begin
06:22:19'410 35a2_cpp (P)___ WKP begin
06:22:19'410 35a2_cpp ___(C) DQ_ begin
06:22:19'410 35a2_cpp (P)___ WKP.SM_ begin
06:22:19'410 35a2_cpp (P)___ WKP.SM_.declareQueue begin
06:22:19'410 35a2_cpp[0] channel.open begin
06:22:19'410 35a2_cpp[1] channel.open begin
06:22:19'410 35a2_cpp[1] connection.open begin
06:22:19'410 35a2_cpp[0] connection.open begin
06:22:19'412 35a2_cpp[1] connection.open end
06:22:19'412 35a2_cpp[0] connection.open end
06:22:19'412 35a2_cpp[1] channel.open end
06:22:19'412 35a2_cpp[0] channel.open end
06:22:19'412 35a2_cpp ___(C) DQ_ end
06:22:19'412 35a2_cpp (P)___ WKP.SM_.declareQueue end
06:22:19'412 35a2_cpp ___(C) CNS begin
06:22:19'412 35a2_cpp (P)___ WKP.SM_.bindQueue begin
06:22:19'412 35a2_cpp (P)___ WKP.SM_.bindQueue end
06:22:19'412 35a2_cpp (P)___ WKP.SM_ ev_run begin
06:22:19'412 35a2_cpp ___(C) CNS end
06:22:19'412 35a2_cpp ___(C) ___ ev_run begin
06:22:19'427 35a2_cpp ___(C) CNS.received begin [a3]
06:22:19'427 35a2_cpp rotationBuffer.putMessage begin [0] before lock [a3]
06:22:19'427 35a2_cpp rotationBuffer.putMessage begin [0] [a3]
06:22:19'427 35a2_cpp rotationBuffer.putMessage end next=[1]
06:22:19'427 35a2_cpp ___(C) CNS.received.ack begin [1] [a3]
06:22:19'427 35a2_cpp ___(C) CNS.received.ack end [1]
06:22:19'427 35a2_cpp ___(C) CNS.received end [a3]
06:22:19'427 35a2_cpp ___(C) CNS.received begin [a4]
06:22:19'427 35a2_cpp rotationBuffer.putMessage begin [1] before lock [a4]
06:22:19'427 35a2_cpp rotationBuffer.putMessage begin [1] [a4]
06:22:19'427 35a2_cpp rotationBuffer.putMessage end next=[2]
06:22:19'427 35a2_cpp ___(C) CNS.received.ack begin [2] [a4]
06:22:19'427 35a2_cpp ___(C) CNS.received.ack end [2]
06:22:19'427 35a2_cpp ___(C) CNS.received end [a4]
06:22:19'428 35a2_cpp ___(C) CNS.received begin [a5]
06:22:19'428 35a2_cpp rotationBuffer.putMessage begin [2] before lock [a5]
06:22:19'428 35a2_cpp rotationBuffer.putMessage begin [2] [a5]
06:22:19'428 35a2_cpp rotationBuffer.putMessage end next=[3]
06:22:19'428 35a2_cpp ___(C) CNS.received.ack begin [3] [a5]
06:22:19'428 35a2_cpp ___(C) CNS.received.ack end [3]
06:22:19'428 35a2_cpp ___(C) CNS.received end [a5]
06:22:19'446 35a2_cpp rotationBuffer.removeMessage begin [0] [a3]
06:22:19'446 35a2_cpp rotationBuffer.removeMessage end return=[a3] next=[1]
06:22:19'446 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a3daze] ****important****
06:22:19'446 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:22:19'446 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:22:19'467 35a2_cpp rotationBuffer.removeMessage begin [1] [a4]
06:22:19'467 35a2_cpp rotationBuffer.removeMessage end return=[a4] next=[2]
06:22:19'467 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a4daze] ****important****
06:22:19'467 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:22:19'467 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:22:19'487 35a2_cpp rotationBuffer.removeMessage begin [2] [a5]
06:22:19'487 35a2_cpp rotationBuffer.removeMessage end return=[a5] next=[3]
06:22:19'487 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a5daze] ****important****
06:22:19'487 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:22:19'487 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:23:09'629 35a2_cpp ___(C) CNS.received begin [a1]
06:23:09'629 35a2_cpp rotationBuffer.putMessage begin [3] before lock [a1]
06:23:09'629 35a2_cpp rotationBuffer.putMessage begin [3] [a1]
06:23:09'629 35a2_cpp rotationBuffer.putMessage end next=[4]
06:23:09'629 35a2_cpp ___(C) CNS.received.ack begin [4] [a1]
06:23:09'629 35a2_cpp ___(C) CNS.received.ack end [4]
06:23:09'629 35a2_cpp ___(C) CNS.received end [a1]
06:23:09'636 35a2_cpp rotationBuffer.removeMessage begin [3] [a1]
06:23:09'636 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[4]
06:23:09'636 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a1daze] ****important****
06:23:09'636 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:23:09'636 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:23:10'635 35a2_cpp ___(C) CNS.received begin [a2]
06:23:10'635 35a2_cpp rotationBuffer.putMessage begin [4] before lock [a2]
06:23:10'635 35a2_cpp rotationBuffer.putMessage begin [4] [a2]
06:23:10'635 35a2_cpp rotationBuffer.putMessage end next=[5]
06:23:10'635 35a2_cpp ___(C) CNS.received.ack begin [5] [a2]
06:23:10'635 35a2_cpp ___(C) CNS.received.ack end [5]
06:23:10'635 35a2_cpp ___(C) CNS.received end [a2]
06:23:10'646 35a2_cpp rotationBuffer.removeMessage begin [4] [a2]
06:23:10'646 35a2_cpp rotationBuffer.removeMessage end return=[a2] next=[5]
06:23:10'646 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a2daze] ****important****
06:23:10'646 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:23:10'646 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:23:11'376 35a2_cpp ___(C) CNS.received begin [a3]
06:23:11'376 35a2_cpp rotationBuffer.putMessage begin [5] before lock [a3]
06:23:11'376 35a2_cpp rotationBuffer.putMessage begin [5] [a3]
06:23:11'376 35a2_cpp rotationBuffer.putMessage end next=[6]
06:23:11'376 35a2_cpp ___(C) CNS.received.ack begin [6] [a3]
06:23:11'376 35a2_cpp ___(C) CNS.received.ack end [6]
06:23:11'377 35a2_cpp ___(C) CNS.received end [a3]
06:23:11'393 35a2_cpp rotationBuffer.removeMessage begin [5] [a3]
06:23:11'393 35a2_cpp rotationBuffer.removeMessage end return=[a3] next=[6]
06:23:11'393 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a3daze] ****important****
06:23:11'393 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:23:11'393 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:23:12'117 35a2_cpp ___(C) CNS.received begin [a4]
06:23:12'117 35a2_cpp rotationBuffer.putMessage begin [6] before lock [a4]
06:23:12'117 35a2_cpp rotationBuffer.putMessage begin [6] [a4]
06:23:12'117 35a2_cpp rotationBuffer.putMessage end next=[7]
06:23:12'117 35a2_cpp ___(C) CNS.received.ack begin [7] [a4]
06:23:12'117 35a2_cpp ___(C) CNS.received.ack end [7]
06:23:12'117 35a2_cpp ___(C) CNS.received end [a4]
06:23:12'119 35a2_cpp rotationBuffer.removeMessage begin [6] [a4]
06:23:12'119 35a2_cpp rotationBuffer.removeMessage end return=[a4] next=[7]
06:23:12'119 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a4daze] ****important****
06:23:12'119 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:23:12'119 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:23:12'952 35a2_cpp ___(C) CNS.received begin [a54]
06:23:12'952 35a2_cpp rotationBuffer.putMessage begin [7] before lock [a54]
06:23:12'952 35a2_cpp rotationBuffer.putMessage begin [7] [a54]
06:23:12'952 35a2_cpp rotationBuffer.putMessage end next=[8]
06:23:12'952 35a2_cpp ___(C) CNS.received.ack begin [8] [a54]
06:23:12'952 35a2_cpp ___(C) CNS.received.ack end [8]
06:23:12'952 35a2_cpp ___(C) CNS.received end [a54]
06:23:12'967 35a2_cpp rotationBuffer.removeMessage begin [7] [a54]
06:23:12'967 35a2_cpp rotationBuffer.removeMessage end return=[a54] next=[8]
06:23:12'967 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a54daze] ****important****
06:23:12'967 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:23:12'967 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:23:14'098 35a2_cpp ___(C) CNS.received begin [a6]
06:23:14'098 35a2_cpp rotationBuffer.putMessage begin [8] before lock [a6]
06:23:14'098 35a2_cpp rotationBuffer.putMessage begin [8] [a6]
06:23:14'098 35a2_cpp rotationBuffer.putMessage end next=[9]
06:23:14'098 35a2_cpp ___(C) CNS.received.ack begin [9] [a6]
06:23:14'098 35a2_cpp ___(C) CNS.received.ack end [9]
06:23:14'098 35a2_cpp ___(C) CNS.received end [a6]
06:23:14'105 35a2_cpp rotationBuffer.removeMessage begin [8] [a6]
06:23:14'105 35a2_cpp rotationBuffer.removeMessage end return=[a6] next=[9]
06:23:14'105 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a6daze] ****important****
06:23:14'105 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:23:14'105 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:23:14'986 35a2_cpp ___(C) CNS.received begin [a6]
06:23:14'986 35a2_cpp rotationBuffer.putMessage begin [9] before lock [a6]
06:23:14'986 35a2_cpp rotationBuffer.putMessage begin [9] [a6]
06:23:14'986 35a2_cpp rotationBuffer.putMessage end next=[10]
06:23:14'986 35a2_cpp ___(C) CNS.received.ack begin [10] [a6]
06:23:14'986 35a2_cpp ___(C) CNS.received.ack end [10]
06:23:14'986 35a2_cpp ___(C) CNS.received end [a6]
06:23:14'993 35a2_cpp rotationBuffer.removeMessage begin [9] [a6]
06:23:14'993 35a2_cpp rotationBuffer.removeMessage end return=[a6] next=[10]
06:23:14'993 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a6daze] ****important****
06:23:14'993 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:23:14'993 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:23:16'077 35a2_cpp ___(C) CNS.received begin [a8]
06:23:16'077 35a2_cpp rotationBuffer.putMessage begin [10] before lock [a8]
06:23:16'077 35a2_cpp rotationBuffer.putMessage begin [10] [a8]
06:23:16'077 35a2_cpp rotationBuffer.putMessage end next=[11]
06:23:16'077 35a2_cpp ___(C) CNS.received.ack begin [11] [a8]
06:23:16'077 35a2_cpp ___(C) CNS.received.ack end [11]
06:23:16'077 35a2_cpp ___(C) CNS.received end [a8]
06:23:16'085 35a2_cpp rotationBuffer.removeMessage begin [10] [a8]
06:23:16'085 35a2_cpp rotationBuffer.removeMessage end return=[a8] next=[11]
06:23:16'085 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a8daze] ****important****
06:23:16'085 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:23:16'085 35a2_cpp (P)___ WKP.BQ_.OS_ if end

コミットもしなくなった。

		bool infinityLoop = true;
		bool enable = true;
		while (infinityLoop)
		{
			if (enable) {
				// 回転式バッファーからメッセージを切り取る。
				std::string message = rotationBuffer::removeMessage();

				if ("" != message)
				{
					// 末尾に daze を付ける。
					message += "daze";
					std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[" << message << "] ****important**** " << std::endl;

					// トランザクション開始
					getChannel(ENQUEUE_INDEX)->startTransaction();
					// 以後、このループは空回りさせる
					enable = false;
					// パブリッシュ開始
					if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
						std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if begin" << std::endl;

						// コミットを開始
						getChannel(ENQUEUE_INDEX)->commitTransaction()
							.onSuccess([&infinityLoop, &message]() {
							// コミット成功
							std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [" << message << "] **********important**********" << std::endl;

							// 無限ループから出て、このハンドラから抜けよう。そのあと ev_run ループから抜ける
							infinityLoop = false;
						})
							.onError([&message](const char *errMsg) {
							std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError [" << message << "]" << std::endl;
							std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
						});

						std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if end" << std::endl;
					}
					else {
						std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else begin" << std::endl;
						std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
						getChannel(ENQUEUE_INDEX)->rollbackTransaction();
						std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else end" << std::endl;
					}

					// 接続を切ったあと、トランザクションのコミットが実行されるようだ。
					std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin" << std::endl;
					closeConnection(ENQUEUE_INDEX);
					std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end" << std::endl;
					// break;
				}
			}
			std::this_thread::sleep_for(std::chrono::milliseconds(20));
		}

これでどうか?

終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
15:39:30 672 34_cs (P)___ enqueue begin [a1]
15:39:30 672 34_cs channel.open [0] begin
15:39:30 672 34_cs connection.open begin
15:39:30 679 34_cs connection.open end
15:39:30 681 34_cs channel.open [0] end
15:39:30 685 34_cs (P)___ enqueue end
15:39:30 685 34_cs connection.close [0] begin
15:39:30 686 34_cs channel.close [0] begin
15:39:30 692 34_cs channel.close [0] end
15:39:30 700 34_cs connection.close [0] end
Enqueue? > a2
15:39:32 999 34_cs (P)___ enqueue begin [a2]
15:39:33 000 34_cs channel.open [0] begin
15:39:33 000 34_cs connection.open begin
15:39:33 011 34_cs connection.open end
15:39:33 021 34_cs channel.open [0] end
15:39:33 021 34_cs (P)___ enqueue end
15:39:33 021 34_cs connection.close [0] begin
15:39:33 021 34_cs channel.close [0] begin
15:39:33 024 34_cs channel.close [0] end
15:39:33 025 34_cs connection.close [0] end
Enqueue? > a3
15:39:34 015 34_cs (P)___ enqueue begin [a3]
15:39:34 016 34_cs channel.open [0] begin
15:39:34 016 34_cs connection.open begin
15:39:34 021 34_cs connection.open end
15:39:34 023 34_cs channel.open [0] end
15:39:34 023 34_cs (P)___ enqueue end
15:39:34 023 34_cs connection.close [0] begin
15:39:34 023 34_cs channel.close [0] begin
15:39:34 035 34_cs channel.close [0] end
15:39:34 036 34_cs connection.close [0] end
Enqueue? > a4
15:39:34 871 34_cs (P)___ enqueue begin [a4]
15:39:34 872 34_cs channel.open [0] begin
15:39:34 872 34_cs connection.open begin
15:39:34 880 34_cs connection.open end
15:39:34 883 34_cs channel.open [0] end
15:39:34 883 34_cs (P)___ enqueue end
15:39:34 883 34_cs connection.close [0] begin
15:39:34 883 34_cs channel.close [0] begin
15:39:34 887 34_cs channel.close [0] end
15:39:34 888 34_cs connection.close [0] end
Enqueue? > a5
15:39:36 199 34_cs (P)___ enqueue begin [a5]
15:39:36 199 34_cs channel.open [0] begin
15:39:36 199 34_cs connection.open begin
15:39:36 206 34_cs connection.open end
15:39:36 216 34_cs channel.open [0] end
15:39:36 216 34_cs (P)___ enqueue end
15:39:36 216 34_cs connection.close [0] begin
15:39:36 216 34_cs channel.close [0] begin
15:39:36 218 34_cs channel.close [0] end
15:39:36 219 34_cs connection.close [0] end

これでもダメ。

06:36:55'766 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
06:36:55'766 35a2_cpp main dump begin
06:36:55'766 tamesi35a2_cs.cs Dump
    name_queues         [ENQUEUE_INDEX]    =[1113]
    lifeSpan_queues     [ENQUEUE_INDEX]    =[1]
    ----
    name_queues         [DEQUEUE_INDEX]    =[1112]
    lifeSpan_queues     [DEQUEUE_INDEX]    =[3]
    ----
06:36:55'766 35a2_cpp main dump end
06:36:55'767 35a2_cpp ___(C) ___ begin
06:36:55'767 35a2_cpp ___(C) DQ_ begin
06:36:55'767 35a2_cpp[1] channel.open begin
06:36:55'767 35a2_cpp[1] connection.open begin06:36:55'767
35a2_cpp (P)___ WKP begin
06:36:55'767 35a2_cpp (P)___ WKP.SM_ begin
06:36:55'767 35a2_cpp (P)___ WKP.SM_.declareQueue begin
06:36:55'767 35a2_cpp[0] channel.open begin
06:36:55'767 35a2_cpp[0] connection.open begin
06:36:55'769 35a2_cpp[0] connection.open end
06:36:55'769 35a2_cpp[1] connection.open end
06:36:55'769 35a2_cpp[0] channel.open end
06:36:55'769 35a2_cpp[1] channel.open end
06:36:55'770 35a2_cpp ___(C) DQ_ end
06:36:55'770 35a2_cpp ___(C) CNS begin
06:36:55'770 35a2_cpp ___(C) CNS end
06:36:55'770 35a2_cpp ___(C) ___ ev_run begin
06:36:55'770 35a2_cpp (P)___ WKP.SM_.declareQueue end
06:36:55'770 35a2_cpp (P)___ WKP.SM_.bindQueue begin
06:36:55'770 35a2_cpp (P)___ WKP.SM_.bindQueue end
06:36:55'770 35a2_cpp (P)___ WKP.SM_ ev_run begin
06:39:30'688 35a2_cpp ___(C) CNS.received begin [a1]
06:39:30'688 35a2_cpp rotationBuffer.putMessage begin [0] before lock [a1]
06:39:30'688 35a2_cpp rotationBuffer.putMessage begin [0] [a1]
06:39:30'688 35a2_cpp rotationBuffer.putMessage end next=[1]
06:39:30'688 35a2_cpp ___(C) CNS.received.ack begin [1] [a1]
06:39:30'688 35a2_cpp ___(C) CNS.received.ack end [1]
06:39:30'688 35a2_cpp ___(C) CNS.received end [a1]
06:39:30'694 35a2_cpp rotationBuffer.removeMessage begin [0] [a1]
06:39:30'694 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[1]
06:39:30'695 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a1daze] ****important****
06:39:30'695 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:39:30'695 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:39:30'695 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
06:39:30'695 35a2_cpp[0] connection.close begin
06:39:30'695 35a2_cpp[0] channel.close begin
06:39:30'695 35a2_cpp[0] channel.close end
06:39:30'695 35a2_cpp[0] connection.close end
06:39:30'695 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
06:39:33'023 35a2_cpp ___(C) CNS.received begin [a2]
06:39:33'023 35a2_cpp rotationBuffer.putMessage begin [1] before lock [a2]
06:39:33'023 35a2_cpp rotationBuffer.putMessage begin [1] [a2]
06:39:33'023 35a2_cpp rotationBuffer.putMessage end next=[2]
06:39:33'023 35a2_cpp ___(C) CNS.received.ack begin [2] [a2]
06:39:33'024 35a2_cpp ___(C) CNS.received.ack end [2]
06:39:33'024 35a2_cpp ___(C) CNS.received end [a2]
06:39:34'034 35a2_cpp ___(C) CNS.received begin [a3]
06:39:34'034 35a2_cpp rotationBuffer.putMessage begin [2] before lock [a3]
06:39:34'034 35a2_cpp rotationBuffer.putMessage begin [2] [a3]
06:39:34'034 35a2_cpp rotationBuffer.putMessage end next=[3]
06:39:34'034 35a2_cpp ___(C) CNS.received.ack begin [3] [a3]
06:39:34'034 35a2_cpp ___(C) CNS.received.ack end [3]
06:39:34'034 35a2_cpp ___(C) CNS.received end [a3]
06:39:34'887 35a2_cpp ___(C) CNS.received begin [a4]
06:39:34'887 35a2_cpp rotationBuffer.putMessage begin [3] before lock [a4]
06:39:34'887 35a2_cpp rotationBuffer.putMessage begin [3] [a4]
06:39:34'887 35a2_cpp rotationBuffer.putMessage end next=[4]
06:39:34'887 35a2_cpp ___(C) CNS.received.ack begin [4] [a4]
06:39:34'887 35a2_cpp ___(C) CNS.received.ack end [4]
06:39:34'887 35a2_cpp ___(C) CNS.received end [a4]
06:39:36'218 35a2_cpp ___(C) CNS.received begin [a5]
06:39:36'218 35a2_cpp rotationBuffer.putMessage begin [4] before lock [a5]
06:39:36'218 35a2_cpp rotationBuffer.putMessage begin [4] [a5]
06:39:36'218 35a2_cpp rotationBuffer.putMessage end next=[5]
06:39:36'218 35a2_cpp ___(C) CNS.received.ack begin [5] [a5]
06:39:36'219 35a2_cpp ___(C) CNS.received.ack end [5]
06:39:36'219 35a2_cpp ___(C) CNS.received end [a5]

先にハンドラを抜ければ?

	getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
		// ev_run から1回だけ呼び出される。接続を切るとev_runから抜ける。コミットするにはこのハンドラから抜けること。
		.onSuccess([]() {
			std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ begin" << std::endl;

			bool infinityLoop = true;
			bool enable = true;
			while (infinityLoop)
			{
				if (enable) {
					// 回転式バッファーからメッセージを切り取る。
					std::string message = rotationBuffer::removeMessage();

					if ("" != message)
					{
						// 末尾に daze を付ける。
						message += "daze";
						std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[" << message << "] ****important**** " << std::endl;

						// トランザクション開始
						getChannel(ENQUEUE_INDEX)->startTransaction();
						// 以後、このループは空回りさせる
						enable = false;
						// パブリッシュ開始
						if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
							std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if begin" << std::endl;

							// コミットを開始
							getChannel(ENQUEUE_INDEX)->commitTransaction()
								.onSuccess([&message]() {
									// コミット成功
									std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [" << message << "] **********important**********" << std::endl;

									// 接続を切ったあと、トランザクションのコミットが実行されるようだ。
									std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin" << std::endl;
									closeConnection(ENQUEUE_INDEX);
									std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end" << std::endl;
								})
									.onError([&message](const char *errMsg) {
									std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError [" << message << "]" << std::endl;
									std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
								});

							std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if end" << std::endl;
						}
						else {
							std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else begin" << std::endl;
							std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
							getChannel(ENQUEUE_INDEX)->rollbackTransaction();
							std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else end" << std::endl;
						}

						// 無限ループから出て、このハンドラから抜けよう。そのあと ev_run が処理の続き(コミットの続き等)を行う
						infinityLoop = false;
					}
				}
				std::this_thread::sleep_for(std::chrono::milliseconds(20));
			}
			std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ end" << std::endl;

		}).onError([](const char* errMsg) {
			std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
		});

こうすれば、

06:49:39'404 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
06:49:39'405 35a2_cpp main dump begin
06:49:39'405 tamesi35a2_cs.cs Dump
    name_queues         [ENQUEUE_INDEX]    =[1113]
    lifeSpan_queues     [ENQUEUE_INDEX]    =[1]
    ----
    name_queues         [DEQUEUE_INDEX]    =[1112]
    lifeSpan_queues     [DEQUEUE_INDEX]    =[3]
    ----
06:49:39'405 35a2_cpp main dump end
06:49:39'405 35a2_cpp ___(C) ___ begin
06:49:39'405 35a2_cpp ___(C) DQ_ begin
06:49:39'405 35a2_cpp[1] channel.open begin
06:49:39'405 35a2_cpp[1] connection.open begin
06:49:39'405 35a2_cpp (P)___ WKP begin
06:49:39'405 35a2_cpp (P)___ WKP.SM_ begin
06:49:39'405 35a2_cpp (P)___ WKP.SM_.declareQueue begin
06:49:39'405 35a2_cpp[0] channel.open begin
06:49:39'405 35a2_cpp[0] connection.open begin
06:49:39'408 35a2_cpp[0] connection.open end
06:49:39'408 35a2_cpp[1] connection.open end
06:49:39'409 35a2_cpp[1] channel.open end
06:49:39'409 35a2_cpp ___(C) DQ_ end
06:49:39'409 35a2_cpp ___(C) CNS begin
06:49:39'409 35a2_cpp ___(C) CNS end
06:49:39'409 35a2_cpp ___(C) ___ ev_run begin
06:49:39'409 35a2_cpp[0] channel.open end
06:49:39'410 35a2_cpp (P)___ WKP.SM_.declareQueue end
06:49:39'410 35a2_cpp (P)___ WKP.SM_.bindQueue begin
06:49:39'410 35a2_cpp (P)___ WKP.SM_.bindQueue end
06:49:39'410 35a2_cpp (P)___ WKP.SM_ ev_run begin
06:49:39'434 35a2_cpp (P)___ WKP.BQ_.OS_ begin
06:50:19'729 35a2_cpp ___(C) CNS.received begin [a1]
06:50:19'729 35a2_cpp rotationBuffer.putMessage begin [0] before lock [a1]
06:50:19'730 35a2_cpp rotationBuffer.putMessage begin [0] [a1]
06:50:19'730 35a2_cpp rotationBuffer.putMessage end next=[1]
06:50:19'730 35a2_cpp ___(C) CNS.received.ack begin [1] [a1]
06:50:19'730 35a2_cpp ___(C) CNS.received.ack end [1]
06:50:19'730 35a2_cpp ___(C) CNS.received end [a1]
06:50:19'730 35a2_cpp rotationBuffer.removeMessage begin [0] [a1]
06:50:19'730 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[1]
06:50:19'731 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a1daze] ****important****
06:50:19'731 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:50:19'731 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:50:19'751 35a2_cpp (P)___ WKP.BQ_.OS_ end
06:50:19'753 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [�^?A^@^@^@^@^@�^A^]��^?^@^@����������������PjC��^?^@^@@^C^]��^?^@^@0^B^]��^?^@^@p^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^PiC��^?^@$

文字化けがひどい。

どうも、ポインターの指しているところが おかしいんじゃないか?
ちょうど バグが再現するうちに 直してしまおう。

「ラムダ式(C++11)」(cpprefjp - C++日本語リファレンス)
https://cpprefjp.github.io/lang/cpp11/lambda_expressions.html

	getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
		// ev_run から1回だけ呼び出される。接続を切るとev_runから抜ける。コミットするにはこのハンドラから抜けること。
		.onSuccess([]() {
			std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ begin" << std::endl;

			bool infinityLoop = true;
			bool enable = true;
			while (infinityLoop)
			{
				if (enable) {
					// 回転式バッファーからメッセージを切り取る。
					std::string message = rotationBuffer::removeMessage();

					if ("" != message)
					{
						// 末尾に daze を付ける。
						message += "daze";
						std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[" << message << "] ****important**** " << std::endl;

						// トランザクション開始
						getChannel(ENQUEUE_INDEX)->startTransaction();
						// 以後、このループは空回りさせる
						enable = false;
						// パブリッシュ開始
						if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
							std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if begin" << std::endl;

							// コミットを開始
							getChannel(ENQUEUE_INDEX)->commitTransaction()
								.onSuccess([=]() {
									// コミット成功
									std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [" << message << "] **********important**********" << std::endl;

									// 接続を切ったあと、トランザクションのコミットが実行されるようだ。
									std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin" << std::endl;
									closeConnection(ENQUEUE_INDEX);
									std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end" << std::endl;
								})
									.onError([&message](const char *errMsg) {
									std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError [" << message << "]" << std::endl;
									std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
								});

							std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if end" << std::endl;
						}
						else {
							std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else begin" << std::endl;
							std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
							getChannel(ENQUEUE_INDEX)->rollbackTransaction();
							std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else end" << std::endl;
						}

						// 無限ループから出て、このハンドラから抜けよう。そのあと ev_run が処理の続き(コミットの続き等)を行う
						infinityLoop = false;
					}
				}
				std::this_thread::sleep_for(std::chrono::milliseconds(20));
			}
			std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ end" << std::endl;

		}).onError([](const char* errMsg) {
			std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
		});
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > 1
16:00:33 780 34_cs ___(C) start begin
16:00:33 787 34_cs consumer.create [1] begin
16:00:33 787 34_cs consumer.create [1] end
16:00:33 788 34_cs channel.open [1] begin
16:00:33 789 34_cs connection.open begin
16:00:33 958 34_cs connection.open end
16:00:33 963 34_cs channel.open [1] end
16:00:33 966 34_cs <-------- [interrupt!] Registered(^q^)
16:00:33 966 34_cs ___(C) start end
16:00:33 966 34_cs (P)___ waitRegistered begin
16:00:33 966 34_cs (P)___ waitRegistered end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
16:00:34 815 34_cs (P)___ enqueue begin [a1]
16:00:34 815 34_cs channel.open [0] begin
16:00:34 816 34_cs connection.open begin
16:00:34 825 34_cs connection.open end
16:00:34 828 34_cs channel.open [0] end
16:00:34 833 34_cs (P)___ enqueue end
16:00:34 834 34_cs connection.close [0] begin
16:00:34 835 34_cs channel.close [0] begin
16:00:34 839 34_cs channel.close [0] end
16:00:34 845 34_cs connection.close [0] end
Enqueue? > 16:00:34 881 34_cs <-------- [interrupt!] Dequeue(^q^) a1daze
a2
16:00:35 814 34_cs (P)___ enqueue begin [a2]
16:00:35 815 34_cs channel.open [0] begin
16:00:35 815 34_cs connection.open begin
16:00:35 818 34_cs connection.open end
16:00:35 821 34_cs channel.open [0] end
16:00:35 822 34_cs (P)___ enqueue end
16:00:35 822 34_cs connection.close [0] begin
16:00:35 822 34_cs channel.close [0] begin
16:00:35 823 34_cs channel.close [0] end
16:00:35 824 34_cs connection.close [0] end
Enqueue? > 16:00:35 853 34_cs <-------- [interrupt!] Dequeue(^q^) a2daze
a3
16:00:36 599 34_cs (P)___ enqueue begin [a3]
16:00:36 599 34_cs channel.open [0] begin
16:00:36 599 34_cs connection.open begin
16:00:36 604 34_cs connection.open end
16:00:36 605 34_cs channel.open [0] end
16:00:36 606 34_cs (P)___ enqueue end
16:00:36 606 34_cs connection.close [0] begin
16:00:36 606 34_cs channel.close [0] begin
16:00:36 607 34_cs channel.close [0] end
16:00:36 608 34_cs connection.close [0] end
Enqueue? > 16:00:36 631 34_cs <-------- [interrupt!] Dequeue(^q^) a3daze
a4
16:00:37 430 34_cs (P)___ enqueue begin [a4]
16:00:37 430 34_cs channel.open [0] begin
16:00:37 431 34_cs connection.open begin
16:00:37 436 34_cs connection.open end
16:00:37 442 34_cs channel.open [0] end
16:00:37 442 34_cs (P)___ enqueue end
16:00:37 442 34_cs connection.close [0] begin
16:00:37 442 34_cs channel.close [0] begin
16:00:37 443 34_cs channel.close [0] end
16:00:37 444 34_cs connection.close [0] end
Enqueue? > 16:00:37 467 34_cs <-------- [interrupt!] Dequeue(^q^) a4daze
a5
16:00:38 263 34_cs (P)___ enqueue begin [a5]
16:00:38 263 34_cs channel.open [0] begin
16:00:38 264 34_cs connection.open begin
16:00:38 269 34_cs connection.open end
16:00:38 270 34_cs channel.open [0] end
16:00:38 270 34_cs (P)___ enqueue end
16:00:38 270 34_cs connection.close [0] begin
16:00:38 270 34_cs channel.close [0] begin
16:00:38 271 34_cs channel.close [0] end
16:00:38 272 34_cs connection.close [0] end
Enqueue? > 16:00:38 312 34_cs <-------- [interrupt!] Dequeue(^q^) a5daze

いい感じだな

06:59:59'184 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
06:59:59'184 35a2_cpp main dump begin
06:59:59'184 tamesi35a2_cs.cs Dump
    name_queues         [ENQUEUE_INDEX]    =[1113]
    lifeSpan_queues     [ENQUEUE_INDEX]    =[1]
    ----
    name_queues         [DEQUEUE_INDEX]    =[1112]
    lifeSpan_queues     [DEQUEUE_INDEX]    =[3]
    ----
06:59:59'184 35a2_cpp main dump end
06:59:59'184 35a2_cpp ___(C) ___ begin
06:59:59'185 35a2_cpp ___(C) DQ_ begin
06:59:59'184 35a2_cpp (P)___ WKP begin
06:59:59'185 35a2_cpp[1] channel.open begin
06:59:59'185 35a2_cpp[1] connection.open begin
06:59:59'185 35a2_cpp (P)___ WKP.SM_ begin
06:59:59'185 35a2_cpp (P)___ WKP.SM_.declareQueue begin
06:59:59'185 35a2_cpp[0] channel.open begin
06:59:59'185 35a2_cpp[0] connection.open begin
06:59:59'186 35a2_cpp[0] connection.open end
06:59:59'186 35a2_cpp[1] connection.open end
06:59:59'186 35a2_cpp[0] channel.open end
06:59:59'186 35a2_cpp[1] channel.open end
06:59:59'187 35a2_cpp (P)___ WKP.SM_.declareQueue end
06:59:59'187 35a2_cpp ___(C) DQ_ end
06:59:59'187 35a2_cpp (P)___ WKP.SM_.bindQueue begin
06:59:59'187 35a2_cpp (P)___ WKP.SM_.bindQueue end
06:59:59'187 35a2_cpp (P)___ WKP.SM_ ev_run begin
06:59:59'187 35a2_cpp ___(C) CNS begin
06:59:59'187 35a2_cpp ___(C) CNS end
06:59:59'187 35a2_cpp ___(C) ___ ev_run begin
06:59:59'201 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:00:34'835 35a2_cpp ___(C) CNS.received begin [a1]
07:00:34'835 35a2_cpp rotationBuffer.putMessage begin [0] before lock [a1]
07:00:34'835 35a2_cpp rotationBuffer.putMessage begin [0] [a1]
07:00:34'835 35a2_cpp rotationBuffer.putMessage end next=[1]
07:00:34'835 35a2_cpp ___(C) CNS.received.ack begin [1] [a1]
07:00:34'836 35a2_cpp ___(C) CNS.received.ack end [1]
07:00:34'836 35a2_cpp ___(C) CNS.received end [a1]
07:00:34'851 35a2_cpp rotationBuffer.removeMessage begin [0] [a1]
07:00:34'851 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[1]
07:00:34'851 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a1daze] ****important****
07:00:34'851 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:00:34'851 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:00:34'871 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:00:34'873 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a1daze] **********important**********
07:00:34'873 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
07:00:34'873 35a2_cpp[0] connection.close begin
07:00:34'873 35a2_cpp[0] channel.close begin
07:00:34'873 35a2_cpp[0] channel.close end
07:00:34'873 35a2_cpp[0] connection.close end
07:00:34'873 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
07:00:34'875 35a2_cpp (P)___ WKP.SM_ ev_run end
07:00:34'875 35a2_cpp (P)___ WKP.SM_ end
07:00:34'875 35a2_cpp (P)___ WKP.SM_ begin
07:00:34'875 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:00:34'875 35a2_cpp[0] channel.open begin
07:00:34'875 35a2_cpp[0] connection.open begin
07:00:34'875 35a2_cpp[0] connection.open end
07:00:34'875 35a2_cpp[0] channel.open end
07:00:34'875 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:00:34'875 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:00:34'875 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:00:34'875 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:00:34'882 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:00:35'826 35a2_cpp ___(C) CNS.received begin [a2]
07:00:35'826 35a2_cpp rotationBuffer.putMessage begin [1] before lock [a2]
07:00:35'826 35a2_cpp rotationBuffer.putMessage begin [1] [a2]
07:00:35'826 35a2_cpp rotationBuffer.putMessage end next=[2]
07:00:35'826 35a2_cpp ___(C) CNS.received.ack begin [2] [a2]
07:00:35'827 35a2_cpp ___(C) CNS.received.ack end [2]
07:00:35'827 35a2_cpp ___(C) CNS.received end [a2]
07:00:35'830 35a2_cpp rotationBuffer.removeMessage begin [1] [a2]
07:00:35'830 35a2_cpp rotationBuffer.removeMessage end return=[a2] next=[2]
07:00:35'830 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a2daze] ****important****
07:00:35'830 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:00:35'830 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:00:35'851 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:00:35'853 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a2daze] **********important**********
07:00:35'853 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
07:00:35'853 35a2_cpp[0] connection.close begin
07:00:35'853 35a2_cpp[0] channel.close begin
07:00:35'854 35a2_cpp[0] channel.close end
07:00:35'854 35a2_cpp[0] connection.close end
07:00:35'854 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
07:00:35'855 35a2_cpp (P)___ WKP.SM_ ev_run end
07:00:35'856 35a2_cpp (P)___ WKP.SM_ end
07:00:35'856 35a2_cpp (P)___ WKP.SM_ begin
07:00:35'856 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:00:35'856 35a2_cpp[0] channel.open begin
07:00:35'856 35a2_cpp[0] connection.open begin
07:00:35'856 35a2_cpp[0] connection.open end
07:00:35'856 35a2_cpp[0] channel.open end
07:00:35'856 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:00:35'856 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:00:35'856 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:00:35'856 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:00:35'862 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:00:36'607 35a2_cpp ___(C) CNS.received begin [a3]
07:00:36'607 35a2_cpp rotationBuffer.putMessage begin [2] before lock [a3]
07:00:36'607 35a2_cpp rotationBuffer.putMessage begin [2] [a3]
07:00:36'607 35a2_cpp rotationBuffer.putMessage end next=[3]
07:00:36'607 35a2_cpp ___(C) CNS.received.ack begin [3] [a3]
07:00:36'607 35a2_cpp ___(C) CNS.received.ack end [3]
07:00:36'607 35a2_cpp ___(C) CNS.received end [a3]
07:00:36'608 35a2_cpp rotationBuffer.removeMessage begin [2] [a3]
07:00:36'608 35a2_cpp rotationBuffer.removeMessage end return=[a3] next=[3]
07:00:36'608 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a3daze] ****important****
07:00:36'608 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:00:36'608 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:00:36'628 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:00:36'631 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a3daze] **********important**********
07:00:36'631 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
07:00:36'631 35a2_cpp[0] connection.close begin
07:00:36'631 35a2_cpp[0] channel.close begin
07:00:36'631 35a2_cpp[0] channel.close end
07:00:36'631 35a2_cpp[0] connection.close end
07:00:36'631 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
07:00:36'632 35a2_cpp (P)___ WKP.SM_ ev_run end
07:00:36'632 35a2_cpp (P)___ WKP.SM_ end
07:00:36'632 35a2_cpp (P)___ WKP.SM_ begin
07:00:36'632 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:00:36'632 35a2_cpp[0] channel.open begin
07:00:36'632 35a2_cpp[0] connection.open begin
07:00:36'632 35a2_cpp[0] connection.open end
07:00:36'632 35a2_cpp[0] channel.open end
07:00:36'632 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:00:36'632 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:00:36'632 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:00:36'632 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:00:36'637 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:00:37'443 35a2_cpp ___(C) CNS.received begin [a4]
07:00:37'443 35a2_cpp rotationBuffer.putMessage begin [3] before lock [a4]
07:00:37'443 35a2_cpp rotationBuffer.putMessage begin [3] [a4]
07:00:37'443 35a2_cpp rotationBuffer.putMessage end next=[4]
07:00:37'443 35a2_cpp ___(C) CNS.received.ack begin [4] [a4]
07:00:37'443 35a2_cpp ___(C) CNS.received.ack end [4]
07:00:37'443 35a2_cpp ___(C) CNS.received end [a4]
07:00:37'445 35a2_cpp rotationBuffer.removeMessage begin [3] [a4]
07:00:37'445 35a2_cpp rotationBuffer.removeMessage end return=[a4] next=[4]
07:00:37'445 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a4daze] ****important****
07:00:37'445 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:00:37'445 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:00:37'466 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:00:37'467 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a4daze] **********important**********
07:00:37'467 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
07:00:37'467 35a2_cpp[0] connection.close begin
07:00:37'467 35a2_cpp[0] channel.close begin
07:00:37'467 35a2_cpp[0] channel.close end
07:00:37'467 35a2_cpp[0] connection.close end
07:00:37'467 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
07:00:37'468 35a2_cpp (P)___ WKP.SM_ ev_run end
07:00:37'468 35a2_cpp (P)___ WKP.SM_ end
07:00:37'468 35a2_cpp (P)___ WKP.SM_ begin
07:00:37'468 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:00:37'468 35a2_cpp[0] channel.open begin
07:00:37'468 35a2_cpp[0] connection.open begin
07:00:37'469 35a2_cpp[0] connection.open end
07:00:37'469 35a2_cpp[0] channel.open end
07:00:37'469 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:00:37'469 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:00:37'469 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:00:37'469 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:00:37'483 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:00:38'271 35a2_cpp ___(C) CNS.received begin [a5]
07:00:38'271 35a2_cpp rotationBuffer.putMessage begin [4] before lock [a5]
07:00:38'271 35a2_cpp rotationBuffer.putMessage begin [4] [a5]
07:00:38'271 35a2_cpp rotationBuffer.putMessage end next=[5]
07:00:38'271 35a2_cpp ___(C) CNS.received.ack begin [5] [a5]
07:00:38'271 35a2_cpp ___(C) CNS.received.ack end [5]
07:00:38'271 35a2_cpp ___(C) CNS.received end [a5]
07:00:38'290 35a2_cpp rotationBuffer.removeMessage begin [4] [a5]
07:00:38'290 35a2_cpp rotationBuffer.removeMessage end return=[a5] next=[5]
07:00:38'290 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a5daze] ****important****
07:00:38'290 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:00:38'290 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:00:38'310 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:00:38'312 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a5daze] **********important**********
07:00:38'312 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
07:00:38'312 35a2_cpp[0] connection.close begin
07:00:38'312 35a2_cpp[0] channel.close begin
07:00:38'312 35a2_cpp[0] channel.close end
07:00:38'312 35a2_cpp[0] connection.close end
07:00:38'313 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
07:00:38'314 35a2_cpp (P)___ WKP.SM_ ev_run end
07:00:38'314 35a2_cpp (P)___ WKP.SM_ end
07:00:38'314 35a2_cpp (P)___ WKP.SM_ begin
07:00:38'314 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:00:38'314 35a2_cpp[0] channel.open begin
07:00:38'314 35a2_cpp[0] connection.open begin
07:00:38'314 35a2_cpp[0] connection.open end
07:00:38'314 35a2_cpp[0] channel.open end
07:00:38'314 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:00:38'314 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:00:38'314 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:00:38'314 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:00:38'319 35a2_cpp (P)___ WKP.BQ_.OS_ begin

接続を切ることより、ハンドラを抜けることの方が 肝心なんじゃないか?
あと、ラムダのキャプチャをコピーにした。

接続を切るのは 本当に必要なのか?

試してみよう。

デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > 1
16:19:38 949 34_cs ___(C) start begin
16:19:38 955 34_cs consumer.create [1] begin
16:19:38 956 34_cs consumer.create [1] end
16:19:38 957 34_cs channel.open [1] begin
16:19:38 958 34_cs connection.open begin
16:19:39 165 34_cs connection.open end
16:19:39 170 34_cs channel.open [1] end
16:19:39 172 34_cs <-------- [interrupt!] Registered(^q^)
16:19:39 172 34_cs ___(C) start end
16:19:39 172 34_cs (P)___ waitRegistered begin
16:19:39 172 34_cs (P)___ waitRegistered end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
16:19:39 924 34_cs (P)___ enqueue begin [a1]
16:19:39 924 34_cs channel.open [0] begin
16:19:39 925 34_cs connection.open begin
16:19:39 932 34_cs connection.open end
16:19:39 934 34_cs channel.open [0] end
16:19:39 937 34_cs (P)___ enqueue end
16:19:39 938 34_cs connection.close [0] begin
16:19:39 938 34_cs channel.close [0] begin
16:19:39 945 34_cs channel.close [0] end
16:19:39 951 34_cs connection.close [0] end
Enqueue? > 16:19:39 978 34_cs <-------- [interrupt!] Dequeue(^q^) a1daze
a2
16:19:40 818 34_cs (P)___ enqueue begin [a2]
16:19:40 819 34_cs channel.open [0] begin
16:19:40 819 34_cs connection.open begin
16:19:40 831 34_cs connection.open end
16:19:40 833 34_cs channel.open [0] end
16:19:40 833 34_cs (P)___ enqueue end
16:19:40 833 34_cs connection.close [0] begin
16:19:40 833 34_cs channel.close [0] begin
16:19:40 835 34_cs channel.close [0] end
16:19:40 835 34_cs connection.close [0] end
Enqueue? > a3
16:19:41 755 34_cs (P)___ enqueue begin [a3]
16:19:41 755 34_cs channel.open [0] begin
16:19:41 755 34_cs connection.open begin
16:19:41 762 34_cs connection.open end
16:19:41 764 34_cs channel.open [0] end
16:19:41 764 34_cs (P)___ enqueue end
16:19:41 764 34_cs connection.close [0] begin
16:19:41 764 34_cs channel.close [0] begin
16:19:41 766 34_cs channel.close [0] end
16:19:41 767 34_cs connection.close [0] end
Enqueue? > a4
16:19:42 835 34_cs (P)___ enqueue begin [a4]
16:19:42 835 34_cs channel.open [0] begin
16:19:42 835 34_cs connection.open begin
16:19:42 851 34_cs connection.open end
16:19:42 853 34_cs channel.open [0] end
16:19:42 854 34_cs (P)___ enqueue end
16:19:42 854 34_cs connection.close [0] begin
16:19:42 854 34_cs channel.close [0] begin
16:19:42 856 34_cs channel.close [0] end
16:19:42 857 34_cs connection.close [0] end
Enqueue? > a5
16:19:43 795 34_cs (P)___ enqueue begin [a5]
16:19:43 795 34_cs channel.open [0] begin
16:19:43 795 34_cs connection.open begin
16:19:43 820 34_cs connection.open end
16:19:43 822 34_cs channel.open [0] end
16:19:43 823 34_cs (P)___ enqueue end
16:19:43 823 34_cs connection.close [0] begin
16:19:43 823 34_cs channel.close [0] begin
16:19:43 825 34_cs channel.close [0] end
16:19:43 826 34_cs connection.close [0] end

接続を切るのは必要みたいだな、理屈は分からないが……。

07:19:11'188 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
07:19:11'189 35a2_cpp main dump begin
07:19:11'189 tamesi35a2_cs.cs Dump
    name_queues         [ENQUEUE_INDEX]    =[1113]
    lifeSpan_queues     [ENQUEUE_INDEX]    =[1]
    ----
    name_queues         [DEQUEUE_INDEX]    =[1112]
    lifeSpan_queues     [DEQUEUE_INDEX]    =[3]
    ----
07:19:11'189 35a2_cpp main dump end
07:19:11'189 35a2_cpp ___(C) ___ begin
07:19:11'189 35a2_cpp ___(C) DQ_ begin
07:19:11'189 35a2_cpp[1] channel.open begin
07:19:11'189 35a2_cpp (P)___ WKP begin
07:19:11'189 35a2_cpp (P)___ WKP.SM_ begin
07:19:11'189 35a2_cpp[1] connection.open begin
07:19:11'189 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:19:11'189 35a2_cpp[0] channel.open begin
07:19:11'189 35a2_cpp[0] connection.open begin
07:19:11'191 35a2_cpp[107:19:11'191 35a2_cpp[] connection.open end0] connection.open end

07:19:11'191 35a2_cpp[0] channel.open end
07:19:11'191 35a2_cpp[1] channel.open end
07:19:11'191 35a2_cpp ___(C) DQ_ end
07:19:11'191 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:19:11'191 35a2_cpp ___(C) CNS begin
07:19:11'191 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:19:11'191 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:19:11'191 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:19:11'191 35a2_cpp ___(C) CNS end
07:19:11'191 35a2_cpp ___(C) ___ ev_run begin
07:19:11'211 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:19:39'939 35a2_cpp ___(C) CNS.received begin [a1]
07:19:39'939 35a2_cpp rotationBuffer.putMessage begin [0] before lock [a1]
07:19:39'939 35a2_cpp rotationBuffer.putMessage begin [0] [a1]
07:19:39'939 35a2_cpp rotationBuffer.putMessage end next=[1]
07:19:39'939 35a2_cpp ___(C) CNS.received.ack begin [1] [a1]
07:19:39'940 35a2_cpp ___(C) CNS.received.ack end [1]
07:19:39'940 35a2_cpp ___(C) CNS.received end [a1]
07:19:39'950 35a2_cpp rotationBuffer.removeMessage begin [0] [a1]
07:19:39'950 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[1]
07:19:39'950 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a1daze] ****important****
07:19:39'951 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:19:39'951 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:19:39'971 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:19:39'977 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a1daze] **********important**********
07:19:40'834 35a2_cpp ___(C) CNS.received begin [a2]
07:19:40'834 35a2_cpp rotationBuffer.putMessage begin [1] before lock [a2]
07:19:40'834 35a2_cpp rotationBuffer.putMessage begin [1] [a2]
07:19:40'834 35a2_cpp rotationBuffer.putMessage end next=[2]
07:19:40'834 35a2_cpp ___(C) CNS.received.ack begin [2] [a2]
07:19:40'834 35a2_cpp ___(C) CNS.received.ack end [2]
07:19:40'835 35a2_cpp ___(C) CNS.received end [a2]
07:19:41'766 35a2_cpp ___(C) CNS.received begin [a3]
07:19:41'766 35a2_cpp rotationBuffer.putMessage begin [2] before lock [a3]
07:19:41'766 35a2_cpp rotationBuffer.putMessage begin [2] [a3]
07:19:41'766 35a2_cpp rotationBuffer.putMessage end next=[3]
07:19:41'766 35a2_cpp ___(C) CNS.received.ack begin [3] [a3]
07:19:41'766 35a2_cpp ___(C) CNS.received.ack end [3]
07:19:41'766 35a2_cpp ___(C) CNS.received end [a3]
07:19:42'856 35a2_cpp ___(C) CNS.received begin [a4]
07:19:42'856 35a2_cpp rotationBuffer.putMessage begin [3] before lock [a4]
07:19:42'856 35a2_cpp rotationBuffer.putMessage begin [3] [a4]
07:19:42'856 35a2_cpp rotationBuffer.putMessage end next=[4]
07:19:42'856 35a2_cpp ___(C) CNS.received.ack begin [4] [a4]
07:19:42'856 35a2_cpp ___(C) CNS.received.ack end [4]
07:19:42'856 35a2_cpp ___(C) CNS.received end [a4]
07:19:43'825 35a2_cpp ___(C) CNS.received begin [a5]
07:19:43'825 35a2_cpp rotationBuffer.putMessage begin [4] before lock [a5]
07:19:43'825 35a2_cpp rotationBuffer.putMessage begin [4] [a5]
07:19:43'825 35a2_cpp rotationBuffer.putMessage end next=[5]
07:19:43'825 35a2_cpp ___(C) CNS.received.ack begin [5] [a5]
07:19:43'825 35a2_cpp ___(C) CNS.received.ack end [5]
07:19:43'825 35a2_cpp ___(C) CNS.received end [a5]

接続を切って、bindQueue をし直せ、ということなのだろうか?

だったら、接続を切る代わりに、チャンネルを閉じるとどうなるのか?

終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
16:26:34 388 34_cs (P)___ enqueue begin [a1]
16:26:34 389 34_cs channel.open [0] begin
16:26:34 390 34_cs connection.open begin
16:26:34 405 34_cs connection.open end
16:26:34 407 34_cs channel.open [0] end
16:26:34 414 34_cs (P)___ enqueue end
16:26:34 415 34_cs connection.close [0] begin
16:26:34 416 34_cs channel.close [0] begin
16:26:34 423 34_cs channel.close [0] end
16:26:34 429 34_cs connection.close [0] end
Enqueue? > 16:26:34 458 34_cs <-------- [interrupt!] Dequeue(^q^) a1daze
a2
16:26:35 490 34_cs (P)___ enqueue begin [a2]
16:26:35 491 34_cs channel.open [0] begin
16:26:35 491 34_cs connection.open begin
16:26:35 512 34_cs connection.open end
16:26:35 517 34_cs channel.open [0] end
16:26:35 517 34_cs (P)___ enqueue end
16:26:35 517 34_cs connection.close [0] begin
16:26:35 517 34_cs channel.close [0] begin
16:26:35 519 34_cs channel.close [0] end
16:26:35 521 34_cs connection.close [0] end
Enqueue? > a3
16:26:36 513 34_cs (P)___ enqueue begin [a3]
16:26:36 513 34_cs channel.open [0] begin
16:26:36 514 34_cs connection.open begin
16:26:36 520 34_cs connection.open end
16:26:36 522 34_cs channel.open [0] end
16:26:36 522 34_cs (P)___ enqueue end
16:26:36 522 34_cs connection.close [0] begin
16:26:36 522 34_cs channel.close [0] begin
16:26:36 524 34_cs channel.close [0] end
16:26:36 525 34_cs connection.close [0] end
Enqueue? > a4
16:26:38 130 34_cs (P)___ enqueue begin [a4]
16:26:38 130 34_cs channel.open [0] begin
16:26:38 130 34_cs connection.open begin
16:26:38 137 34_cs connection.open end
16:26:38 138 34_cs channel.open [0] end
16:26:38 139 34_cs (P)___ enqueue end
16:26:38 139 34_cs connection.close [0] begin
16:26:38 139 34_cs channel.close [0] begin
16:26:38 140 34_cs channel.close [0] end
16:26:38 141 34_cs connection.close [0] end
07:26:11'524 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
07:26:11'525 35a2_cpp main dump begin
07:26:11'525 tamesi35a2_cs.cs Dump
    name_queues         [ENQUEUE_INDEX]    =[1113]
    lifeSpan_queues     [ENQUEUE_INDEX]    =[1]
    ----
    name_queues         [DEQUEUE_INDEX]    =[1112]
    lifeSpan_queues     [DEQUEUE_INDEX]    =[3]
    ----
07:26:11'525 35a2_cpp main dump end
07:26:11'525 35a2_cpp ___(C) ___ begin
07:26:11'525 35a2_cpp ___(C) DQ_ begin
07:26:11'525 35a2_cpp (P)___ WKP begin
07:26:11'525 35a2_cpp[1] channel.open begin
07:26:11'525 35a2_cpp (P)___ WKP.SM_ begin
07:26:11'525 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:26:11'525 35a2_cpp[1] connection.open begin
07:26:11'525 35a2_cpp[0] channel.open begin
07:26:11'525 35a2_cpp[0] connection.open begin
07:26:11'527 35a2_cpp[0] connection.open end
07:26:11'527 35a2_cpp[1] connection.open end
07:26:11'527 35a2_cpp[0] channel.open end
07:26:11'527 35a2_cpp[1] channel.open end
07:26:11'527 35a2_cpp ___(C) DQ_ end
07:26:11'527 35a2_cpp ___(C) CNS begin
07:26:11'528 35a2_cpp ___(C) CNS end
07:26:11'528 35a2_cpp ___(C) ___ ev_run begin
07:26:11'527 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:26:11'528 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:26:11'528 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:26:11'528 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:26:11'540 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:26:34'415 35a2_cpp ___(C) CNS.received begin [a1]
07:26:34'415 35a2_cpp rotationBuffer.putMessage begin [0] before lock [a1]
07:26:34'415 35a2_cpp rotationBuffer.putMessage begin [0] [a1]
07:26:34'415 35a2_cpp rotationBuffer.putMessage end next=[1]
07:26:34'415 35a2_cpp ___(C) CNS.received.ack begin [1] [a1]
07:26:34'415 35a2_cpp ___(C) CNS.received.ack end [1]
07:26:34'415 35a2_cpp ___(C) CNS.received end [a1]
07:26:34'431 35a2_cpp rotationBuffer.removeMessage begin [0] [a1]
07:26:34'431 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[1]
07:26:34'431 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a1daze] ****important****
07:26:34'431 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:26:34'431 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:26:34'451 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:26:34'453 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a1daze] **********important**********
07:26:34'454 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
07:26:34'454 35a2_cpp[0] channel.close begin
07:26:34'454 35a2_cpp[0] channel.close end
07:26:34'454 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
07:26:35'519 35a2_cpp ___(C) CNS.received begin [a2]
07:26:35'519 35a2_cpp rotationBuffer.putMessage begin [1] before lock [a2]
07:26:35'519 35a2_cpp rotationBuffer.putMessage begin [1] [a2]
07:26:35'519 35a2_cpp rotationBuffer.putMessage end next=[2]
07:26:35'519 35a2_cpp ___(C) CNS.received.ack begin [2] [a2]
07:26:35'519 35a2_cpp ___(C) CNS.received.ack end [2]
07:26:35'519 35a2_cpp ___(C) CNS.received end [a2]
07:26:36'524 35a2_cpp ___(C) CNS.received begin [a3]
07:26:36'524 35a2_cpp rotationBuffer.putMessage begin [2] before lock [a3]
07:26:36'524 35a2_cpp rotationBuffer.putMessage begin [2] [a3]
07:26:36'524 35a2_cpp rotationBuffer.putMessage end next=[3]
07:26:36'524 35a2_cpp ___(C) CNS.received.ack begin [3] [a3]
07:26:36'524 35a2_cpp ___(C) CNS.received.ack end [3]
07:26:36'524 35a2_cpp ___(C) CNS.received end [a3]
07:26:38'141 35a2_cpp ___(C) CNS.received begin [a4]
07:26:38'141 35a2_cpp rotationBuffer.putMessage begin [3] before lock [a4]
07:26:38'141 35a2_cpp rotationBuffer.putMessage begin [3] [a4]
07:26:38'141 35a2_cpp rotationBuffer.putMessage end next=[4]
07:26:38'141 35a2_cpp ___(C) CNS.received.ack begin [4] [a4]
07:26:38'142 35a2_cpp ___(C) CNS.received.ack end [4]
07:26:38'142 35a2_cpp ___(C) CNS.received end [a4]

接続を切ることは 必要な条件のようだ。理屈は分からないが。

と思って2回目を試すと

終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
16:31:20 445 34_cs (P)___ enqueue begin [a1]
16:31:20 445 34_cs channel.open [0] begin
16:31:20 445 34_cs connection.open begin
16:31:20 455 34_cs connection.open end
16:31:20 458 34_cs channel.open [0] end
16:31:20 461 34_cs (P)___ enqueue end
16:31:20 462 34_cs connection.close [0] begin
16:31:20 462 34_cs channel.close [0] begin
16:31:20 467 34_cs channel.close [0] end
16:31:20 479 34_cs connection.close [0] end
Enqueue? > a2
16:31:21 291 34_cs (P)___ enqueue begin [a2]
16:31:21 291 34_cs channel.open [0] begin
16:31:21 292 34_cs connection.open begin
16:31:21 297 34_cs connection.open end
16:31:21 299 34_cs channel.open [0] end
16:31:21 299 34_cs (P)___ enqueue end
16:31:21 299 34_cs connection.close [0] begin
16:31:21 299 34_cs channel.close [0] begin
16:31:21 301 34_cs channel.close [0] end
16:31:21 301 34_cs connection.close [0] end
Enqueue? > 16:31:21 331 34_cs <-------- [interrupt!] Dequeue(^q^) a2daze
a3
16:31:22 915 34_cs (P)___ enqueue begin [a3]
16:31:22 916 34_cs channel.open [0] begin
16:31:22 916 34_cs connection.open begin
16:31:22 931 34_cs connection.open end
16:31:22 932 34_cs channel.open [0] end
16:31:22 933 34_cs (P)___ enqueue end
16:31:22 933 34_cs connection.close [0] begin
16:31:22 933 34_cs channel.close [0] begin
16:31:22 935 34_cs channel.close [0] end
16:31:22 936 34_cs connection.close [0] end
Enqueue? > a4
16:31:29 779 34_cs (P)___ enqueue begin [a4]
16:31:29 779 34_cs channel.open [0] begin
16:31:29 779 34_cs connection.open begin
16:31:29 792 34_cs connection.open end
16:31:29 794 34_cs channel.open [0] end
16:31:29 795 34_cs (P)___ enqueue end
16:31:29 795 34_cs connection.close [0] begin
16:31:29 795 34_cs channel.close [0] begin
16:31:29 798 34_cs channel.close [0] end
16:31:29 799 34_cs connection.close [0] end
Enqueue? > 16:31:29 829 34_cs <-------- [interrupt!] Dequeue(^q^) a4daze

接続を切ると、まぐれで うまく行くことがある、ということか。

07:30:44'066 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
07:30:44'066 35a2_cpp main dump begin
07:30:44'066 tamesi35a2_cs.cs Dump
    name_queues         [ENQUEUE_INDEX]    =[1113]
    lifeSpan_queues     [ENQUEUE_INDEX]    =[1]
    ----
    name_queues         [DEQUEUE_INDEX]    =[1112]
    lifeSpan_queues     [DEQUEUE_INDEX]    =[3]
    ----
07:30:44'066 35a2_cpp main dump end
07:30:44'067 35a2_cpp ___(C) ___ begin
07:30:44'067 35a2_cpp ___(C) DQ_ begin
07:30:44'067 35a2_cpp (P)___ WKP begin
07:30:44'067 35a2_cpp[1] channel.open begin
07:30:44'067 35a2_cpp (P)___ WKP.SM_ begin
07:30:44'067 35a2_cpp[1] connection.open begin
07:30:44'067 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:30:44'067 35a2_cpp[0] channel.open begin
07:30:44'067 35a2_cpp[0] connection.open begin
07:30:44'071 35a2_cpp[0] connection.open end
07:30:44'071 35a2_cpp[1] connection.open end
07:30:44'072 35a2_cpp[1] channel.open end
07:30:44'072 35a2_cpp[0] channel.open end
07:30:44'072 35a2_cpp ___(C) DQ_ end
07:30:44'072 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:30:44'072 35a2_cpp ___(C) CNS begin
07:30:44'072 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:30:44'072 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:30:44'072 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:30:44'072 35a2_cpp ___(C) CNS end
07:30:44'072 35a2_cpp ___(C) ___ ev_run begin
07:30:44'095 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:31:21'301 35a2_cpp ___(C) CNS.received begin [a2]
07:31:21'301 35a2_cpp rotationBuffer.putMessage begin [0] before lock [a2]
07:31:21'301 35a2_cpp rotationBuffer.putMessage begin [0] [a2]
07:31:21'301 35a2_cpp rotationBuffer.putMessage end next=[1]
07:31:21'301 35a2_cpp ___(C) CNS.received.ack begin [1] [a2]
07:31:21'301 35a2_cpp ___(C) CNS.received.ack end [1]
07:31:21'301 35a2_cpp ___(C) CNS.received end [a2]
07:31:21'302 35a2_cpp rotationBuffer.removeMessage begin [0] [a2]
07:31:21'302 35a2_cpp rotationBuffer.removeMessage end return=[a2] next=[1]
07:31:21'302 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a2daze] ****important****
07:31:21'302 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:31:21'302 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:31:21'323 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:31:21'324 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a2daze] **********important**********
07:31:21'324 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
07:31:21'324 35a2_cpp[0] connection.close begin
07:31:21'324 35a2_cpp[0] channel.close begin
07:31:21'324 35a2_cpp[0] channel.close end
07:31:21'324 35a2_cpp[0] connection.close end
07:31:21'324 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
07:31:21'330 35a2_cpp (P)___ WKP.SM_ ev_run end
07:31:21'330 35a2_cpp (P)___ WKP.SM_ end
07:31:21'330 35a2_cpp (P)___ WKP.SM_ begin
07:31:21'330 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:31:21'330 35a2_cpp[0] channel.open begin
07:31:21'330 35a2_cpp[0] connection.open begin
07:31:21'330 35a2_cpp[0] connection.open end
07:31:21'330 35a2_cpp[0] channel.open end
07:31:21'330 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:31:21'330 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:31:21'330 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:31:21'330 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:31:21'344 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:31:29'797 35a2_cpp ___(C) CNS.received begin [a4]
07:31:29'797 35a2_cpp rotationBuffer.putMessage begin [1] before lock [a4]
07:31:29'797 35a2_cpp rotationBuffer.putMessage begin [1] [a4]
07:31:29'798 35a2_cpp rotationBuffer.putMessage end next=[2]
07:31:29'798 35a2_cpp ___(C) CNS.received.ack begin [2] [a4]
07:31:29'798 35a2_cpp ___(C) CNS.received.ack end [2]
07:31:29'798 35a2_cpp ___(C) CNS.received end [a4]
07:31:29'806 35a2_cpp rotationBuffer.removeMessage begin [1] [a4]
07:31:29'807 35a2_cpp rotationBuffer.removeMessage end return=[a4] next=[2]
07:31:29'807 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a4daze] ****important****
07:31:29'807 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:31:29'807 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:31:29'827 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:31:29'829 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a4daze] **********important**********
07:31:29'829 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
07:31:29'829 35a2_cpp[0] connection.close begin
07:31:29'829 35a2_cpp[0] channel.close begin
07:31:29'829 35a2_cpp[0] channel.close end
07:31:29'829 35a2_cpp[0] connection.close end
07:31:29'829 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
07:31:29'831 35a2_cpp (P)___ WKP.SM_ ev_run end
07:31:29'831 35a2_cpp (P)___ WKP.SM_ end
07:31:29'831 35a2_cpp (P)___ WKP.SM_ begin
07:31:29'831 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:31:29'831 35a2_cpp[0] channel.open begin
07:31:29'831 35a2_cpp[0] connection.open begin
07:31:29'831 35a2_cpp[0] connection.open end
07:31:29'831 35a2_cpp[0] channel.open end
07:31:29'831 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:31:29'831 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:31:29'831 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:31:29'831 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:31:29'844 35a2_cpp (P)___ WKP.BQ_.OS_ begin

なんだろう、また同じ不具合だ。

# jobs
[1]   Running                 rabbitmq-server &  (wd: /home/★user/shogi/csharp_service)
[2]-  Running                 ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
[3]+  Running                 ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &

2つ走っているのが怪しい。

直った。

# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
Name         : キューを削除します
"" + [Enter] : 次のステップへ進む
ex.) BrokenBox
Delete ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) SunnyBox
Enqueue ? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 3
Enqueue ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > 1
16:37:39 253 34_cs ___(C) start begin
16:37:39 259 34_cs consumer.create [1] begin
16:37:39 259 34_cs consumer.create [1] end
16:37:39 260 34_cs channel.open [1] begin
16:37:39 261 34_cs connection.open begin
16:37:39 415 34_cs connection.open end
16:37:39 420 34_cs channel.open [1] end
16:37:39 423 34_cs <-------- [interrupt!] Registered(^q^)
16:37:39 423 34_cs ___(C) start end
16:37:39 423 34_cs (P)___ waitRegistered begin
16:37:39 423 34_cs (P)___ waitRegistered end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a
16:37:40 100 34_cs (P)___ enqueue begin [a]
16:37:40 101 34_cs channel.open [0] begin
16:37:40 101 34_cs connection.open begin
16:37:40 110 34_cs connection.open end
16:37:40 118 34_cs channel.open [0] end
16:37:40 123 34_cs (P)___ enqueue end
16:37:40 124 34_cs connection.close [0] begin
16:37:40 124 34_cs channel.close [0] begin
16:37:40 130 34_cs channel.close [0] end
16:37:40 134 34_cs connection.close [0] end
Enqueue? > 16:37:40 156 34_cs <-------- [interrupt!] Dequeue(^q^) adaze
b
16:37:40 811 34_cs (P)___ enqueue begin [b]
16:37:40 812 34_cs channel.open [0] begin
16:37:40 812 34_cs connection.open begin
16:37:40 817 34_cs connection.open end
16:37:40 819 34_cs channel.open [0] end
16:37:40 819 34_cs (P)___ enqueue end
16:37:40 819 34_cs connection.close [0] begin
16:37:40 820 34_cs channel.close [0] begin
16:37:40 821 34_cs channel.close [0] end
16:37:40 822 34_cs connection.close [0] end
Enqueue? > 16:37:40 856 34_cs <-------- [interrupt!] Dequeue(^q^) bdaze
c
16:37:41 603 34_cs (P)___ enqueue begin [c]
16:37:41 604 34_cs channel.open [0] begin
16:37:41 604 34_cs connection.open begin
16:37:41 608 34_cs connection.open end
16:37:41 610 34_cs channel.open [0] end
16:37:41 610 34_cs (P)___ enqueue end
16:37:41 610 34_cs connection.close [0] begin
16:37:41 610 34_cs channel.close [0] begin
16:37:41 612 34_cs channel.close [0] end
16:37:41 613 34_cs connection.close [0] end
Enqueue? > 16:37:41 638 34_cs <-------- [interrupt!] Dequeue(^q^) cdaze
d
16:37:42 227 34_cs (P)___ enqueue begin [d]
16:37:42 227 34_cs channel.open [0] begin
16:37:42 227 34_cs connection.open begin
16:37:42 247 34_cs connection.open end
16:37:42 249 34_cs channel.open [0] end
16:37:42 250 34_cs (P)___ enqueue end
16:37:42 250 34_cs connection.close [0] begin
16:37:42 250 34_cs channel.close [0] begin
16:37:42 252 34_cs channel.close [0] end
16:37:42 252 34_cs connection.close [0] end
Enqueue? > 16:37:42 277 34_cs <-------- [interrupt!] Dequeue(^q^) ddaze
e
16:37:42 867 34_cs (P)___ enqueue begin [e]
16:37:42 868 34_cs channel.open [0] begin
16:37:42 868 34_cs connection.open begin
16:37:42 882 34_cs connection.open end
16:37:42 884 34_cs channel.open [0] end
16:37:42 884 34_cs (P)___ enqueue end
16:37:42 884 34_cs connection.close [0] begin
16:37:42 884 34_cs channel.close [0] begin
16:37:42 886 34_cs channel.close [0] end
16:37:42 887 34_cs connection.close [0] end
Enqueue? > 16:37:42 910 34_cs <-------- [interrupt!] Dequeue(^q^) edaze
f
16:37:43 427 34_cs (P)___ enqueue begin [f]
16:37:43 428 34_cs channel.open [0] begin
16:37:43 428 34_cs connection.open begin
16:37:43 433 34_cs connection.open end
16:37:43 434 34_cs channel.open [0] end
16:37:43 434 34_cs (P)___ enqueue end
16:37:43 434 34_cs connection.close [0] begin
16:37:43 434 34_cs channel.close [0] begin
16:37:43 436 34_cs channel.close [0] end
16:37:43 437 34_cs connection.close [0] end
Enqueue? > 16:37:43 463 34_cs <-------- [interrupt!] Dequeue(^q^) fdaze

じゃあ、これで。

ソースコード

tamesi34_cs.cs

// OS      : Windows 10
// IDE     : Visual Studio 2015
//           Install : NuGet   : Install-Package RabbitMQ.Client -Version 4.1.1
//
// OS      : Ubuntu 16.04

// Service : RabbitMQ
//         : Install   : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
//         : Reference : Web site : Top page http://www.rabbitmq.com/
//                     : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
//                     : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
//         : Manual    : Command  : man rabbitmqctl
//         : Start     : Command  : (1) rabbitmq-server
//                                : (2) [Ctrl] + [Z]
//                                : (3) bg (Job Number)
//         : Stop      : Command  : rabbitmqctl stop
//         : Check     : Command  : rabbitmqctl status
//                     : Command  : rabbitmqctl list_queues
//                                : // Count messages in queues.
//
// Program : this
//         : Author                         : Satoshi TAKAHASHI (Handle. Muzudho)
//         : License                        : MIT License https://opensource.org/licenses/MIT
//         : Explain                        : For test. Practice of interprocess communication.
//                                          : This program run on the background side.
//                                          : Delete queue,
//                                          :   Or Send message to message queue.
//                                          :      Interrupt received message from queue.
//                                          : Please settings source queue, destination queue to command line argument.
//                                : Explain : command line argument of life span of queue (Compositable)
//                                          :     durable     The queue exists, Even if the RabbitMQ service closed.
//                                          :     autodelete  The queue delete when consumers to be nothing.
//                                          :     passive     Just check what queue exists. Don't use read/write to queue.
//                                          :     exclusive   The queue delete when connection closed. The queue exists is this connection only.
//         : Compile   : Command            : mcs /r:RabbitMQ.Client.dll -define:UBUNTU tamesi34_cs.cs
//                                : Explain :     mcs                     Mono c# compiler
//                                          :     /r:RabbitMQ.Client.dll  Append reference RabbitMQ
//                                          :     -define:UBUNTU          #define UBUNTU
//                                          :     tamesi34_cs.cs          Source file
//         : Execute   : Command            : ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
//                                          :     // Run on the foreground.
//                                : Explain :     ./tamesi34_cs.exe         Executable file
//                                          :     2> ./tamesi34_cs.err.log  Standard error to log file
//         : Reference            : Web site: QueueDeclare (v1.0) http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
//                                          : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
//                                          : QueueDelete (v1.4) https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.QueueDelete(System.UInt16,System.String,System.Boolean,System.Boolean,System.Boolean)
//                                          : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
//                                          : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
//
//--------------------------------------------------------------------------------
// tamesi34_cs.cs

// Ubuntu version. Source code is old. API is difference.
// #define UBUNTU

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace UsagiMQ
{
    /// <summary>
    /// This number is this program only. AMQP-CPP implements is difference.
    /// AMQP-CPP implements(C++) : AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
    /// </summary>
    [Flags]
    enum LifeSpanType
    {
        Durable = 1,
        Autodelete = 0x1<<1,
        Passive = 0x1 << 2,
        Exclusive = 0x1 << 3
    }

    class Program
    {
        static string Dump()
        {
            StringBuilder sb = new StringBuilder();
            sb.AppendLine("tamesi34_cs.cs Dump");
            sb.AppendLine("    name_queues         [ENQUEUE_INDEX]    =[" + name_queues[ENQUEUE_INDEX]+"]");
            sb.AppendLine("    lifeSpan_queues     [ENQUEUE_INDEX]    =[" + lifeSpan_queues[ENQUEUE_INDEX] + "]");
            sb.AppendLine("    durable_lifeSpans   [ENQUEUE_INDEX]    =[" + durable_lifeSpans[ENQUEUE_INDEX] + "]");
            sb.AppendLine("    autodelete_lifeSpans[ENQUEUE_INDEX]    =[" + autodelete_lifeSpans[ENQUEUE_INDEX] + "]");
            sb.AppendLine("    passive_lifeSpans   [ENQUEUE_INDEX]    =[" + passive_lifeSpans[ENQUEUE_INDEX] + "]");
            sb.AppendLine("    exclusive_lifeSpans [ENQUEUE_INDEX]    =[" + exclusive_lifeSpans[ENQUEUE_INDEX] + "]");
            sb.AppendLine("    ----");
            sb.AppendLine("    name_queues         [DEQUEUE_INDEX]    =[" + name_queues[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    lifeSpan_queues     [DEQUEUE_INDEX]    =[" + lifeSpan_queues[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    durable_lifeSpans   [DEQUEUE_INDEX]    =[" + durable_lifeSpans[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    autodelete_lifeSpans[DEQUEUE_INDEX]    =[" + autodelete_lifeSpans[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    passive_lifeSpans   [DEQUEUE_INDEX]    =[" + passive_lifeSpans[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    exclusive_lifeSpans [DEQUEUE_INDEX]    =[" + exclusive_lifeSpans[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    ----");
            sb.AppendLine("    name_queues         [DELETEQUEUE_INDEX]=[" + name_queues[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    lifeSpan_queues     [DELETEQUEUE_INDEX]=[" + lifeSpan_queues[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    durable_lifeSpans   [DELETEQUEUE_INDEX]=[" + durable_lifeSpans[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    autodelete_lifeSpans[DELETEQUEUE_INDEX]=[" + autodelete_lifeSpans[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    passive_lifeSpans   [DELETEQUEUE_INDEX]=[" + passive_lifeSpans[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    exclusive_lifeSpans [DELETEQUEUE_INDEX]=[" + exclusive_lifeSpans[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    ----");
            return sb.ToString();
        }
        static string GetNow()
        {
            return DateTime.Now.ToString("HH:mm:ss fff ");
        }

        const int ENQUEUE_INDEX = 0;
        const int DEQUEUE_INDEX = 1;
        const int DELETEQUEUE_INDEX = 2;
        const int NUM_INDEX = 3;
        const string HOST_NAME = "localhost";
        static string[] name_queues = new string[NUM_INDEX];
        static LifeSpanType[] lifeSpan_queues = new LifeSpanType[NUM_INDEX];
        static bool[]
            durable_lifeSpans = new bool[NUM_INDEX],
            autodelete_lifeSpans = new bool[NUM_INDEX],
            passive_lifeSpans = new bool[NUM_INDEX],
            exclusive_lifeSpans = new bool[NUM_INDEX];
        static void SetLifeSpan(int index_queue, string name_queue, LifeSpanType lifeSpan)
        {
            name_queues[index_queue] = name_queue;
            lifeSpan_queues[index_queue] = lifeSpan;

            // 一旦クリアー
            durable_lifeSpans[index_queue] = false;
            autodelete_lifeSpans[index_queue] = false;
            passive_lifeSpans[index_queue] = false;
            exclusive_lifeSpans[index_queue] = false;

            // durable
            if (((int)lifeSpan & (int)LifeSpanType.Durable) == (int)LifeSpanType.Durable) { durable_lifeSpans[index_queue] = true; }
            // autodelete
            if (((int)lifeSpan & (int)LifeSpanType.Autodelete) == (int)LifeSpanType.Autodelete) { autodelete_lifeSpans[index_queue] = true; }
            // passive
            if (((int)lifeSpan & (int)LifeSpanType.Passive) == (int)LifeSpanType.Passive) { passive_lifeSpans[index_queue] = true; }
            // exclusive
            if (((int)lifeSpan & (int)LifeSpanType.Exclusive) == (int)LifeSpanType.Exclusive) { exclusive_lifeSpans[index_queue] = true; }
        }

        public static ConnectionFactory GetFactory()
        {
            if (null == m_factory_)
            {
                m_factory_ = new ConnectionFactory() { HostName = HOST_NAME };
            }
            return m_factory_;
        }
        static ConnectionFactory m_factory_;

        public static IConnection GetConnection(int index)
        {
            if (null == m_connection_[index])
            {
                Console.WriteLine(GetNow()+"34_cs connection.open begin");
                m_connection_[index] = GetFactory().CreateConnection();
                Console.WriteLine(GetNow() + "34_cs connection.open end");
            }
            return m_connection_[index];
        }
        static IConnection[] m_connection_ = new IConnection[NUM_INDEX];

        public static IModel GetChannel(int index)
        {
            if (null == m_channels_[index])
            {
                Console.WriteLine(GetNow() + "34_cs channel.open [" + index + "] begin");
                m_channels_[index] = GetConnection(index).CreateModel();

                // 引数が7つの QueueDeclare でエラーを吐くことがあるので、情報を出力しておく。
                //Console.Error.WriteLine(Dump());
                //Console.Error.WriteLine("(tamesi34_cs.cs?) GetChannel index = [" + index + "]");
                //Console.Error.WriteLine("    name_queues[index] = [" + name_queues[index] + "]");
                //Console.Error.WriteLine("    passive_lifeSpans[index] = [" + passive_lifeSpans[index] + "]");
                //Console.Error.WriteLine("    durable_lifeSpans[index] = [" + durable_lifeSpans[index] + "]");
                //Console.Error.WriteLine("    exclusive_lifeSpans[index] = [" + exclusive_lifeSpans[index] + "]");
                //Console.Error.WriteLine("    autodelete_lifeSpans[index] = [" + autodelete_lifeSpans[index] + "]");
                //Console.Error.WriteLine("    nowait は仮に false 固定");
                //Console.Error.WriteLine("    arguments は仮に null 固定");

#if UBUNTU
                // Ubuntuでは何故か Spring.Messaging.Amqp.Rabbit の引数 7 つのやつになっている。
                // RabbitMQ.Client.Framing.Impl.v0_8.Model.QueueDeclare (System.String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Boolean nowait, IDictionary arguments)
                // そして false になっているようだ。
                m_channels_[index].QueueDeclare(name_queues[index], passive_lifeSpans[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], false, null);
                //m_channels_[index].QueueDeclare(name_queues[index], false, false, false, false, false, null);
#else
                m_channels_[index].QueueDeclare(name_queues[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], null);
#endif
                Console.WriteLine(GetNow() + "34_cs channel.open [" + index + "] end");
            }
            return m_channels_[index];
        }
        static IModel[] m_channels_ = new IModel[NUM_INDEX];

        public static EventingBasicConsumer GetConsumer(int index)
        {
            if (null == m_consumers_[index])
            {
                Console.WriteLine(GetNow() + "34_cs consumer.create [" + index + "] begin");
#if UBUNTU
                // Ubuntuでは何故か v1.4.0 の引数が 0 個のやつになっている。調べたが引数が1個~6個のものは無かった。
                m_consumers_[index] = new EventingBasicConsumer();
#else
                m_consumers_[index] = new EventingBasicConsumer(GetChannel(index));
#endif
                Console.WriteLine(GetNow() + "34_cs consumer.create [" + index + "] end");
            }
            return m_consumers_[index];
        }
        static EventingBasicConsumer[] m_consumers_ = new EventingBasicConsumer[NUM_INDEX];

        /// <summary>
        /// コンシューマーが準備できたときに割り込んでくる処理
        /// </summary>
#if UBUNTU
        public static ConsumerEventHandler GetRegisteredHandler()
#else
        public static EventHandler<ConsumerEventArgs> GetRegisteredHandler()
#endif
        {
            if (null == m_registeredHandler_)
            {
#if UBUNTU
                m_registeredHandler_ = new ConsumerEventHandler((model, ea) =>
#else
                m_registeredHandler_ = new EventHandler<ConsumerEventArgs>((model, ea) =>
#endif
                {
                    Console.WriteLine(GetNow() + "34_cs <-------- [interrupt!] Registered(^q^)");
                    flag_registered = true;
                });
            }

            return m_registeredHandler_;
        }

#if UBUNTU
        static ConsumerEventHandler m_registeredHandler_;
#else
        static EventHandler<ConsumerEventArgs> m_registeredHandler_;
#endif
        static bool flag_registered;

        /// <summary>
        /// 受信できたときに割り込んでくる処理
        /// </summary>
#if UBUNTU
        public static BasicDeliverEventHandler GetDequeueHandler()
#else
        public static EventHandler<BasicDeliverEventArgs> GetDequeueHandler()
#endif
        {
            if (null == m_dequeueHandler_)
            {
#if UBUNTU
                m_dequeueHandler_ = new BasicDeliverEventHandler((model, ea) =>
#else
                m_dequeueHandler_ = new EventHandler<BasicDeliverEventArgs>((model, ea) =>
#endif
                {
                    byte[] body = ea.Body;
                    string message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(GetNow() + "34_cs <-------- [interrupt!] Dequeue(^q^) {0}", message);
                });
            }

            return m_dequeueHandler_;
        }

#if UBUNTU
        static BasicDeliverEventHandler m_dequeueHandler_;
#else
        static EventHandler<BasicDeliverEventArgs> m_dequeueHandler_;
#endif

        /// <summary>
        /// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
        /// </summary>
        static void CloseConnection(int index)
        {
            if (null != m_connection_[index])
            {
                Console.WriteLine(GetNow() + "34_cs connection.close [" + index + "] begin");
                // フラグとして、ヌルを入れる
                CloseChannel(index);

                m_connection_[index].Close();
                m_connection_[index] = null;
                Console.WriteLine(GetNow() + "34_cs connection.close [" + index + "] end");
            }
        }
        /// <summary>
        /// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
        /// </summary>
        static void CloseChannel(int index)
        {
            if (null != m_channels_[index])
            {
                Console.WriteLine(GetNow() + "34_cs channel.close [" + index + "] begin");
                m_channels_[index].Close();
                m_channels_[index] = null;
                Console.WriteLine(GetNow() + "34_cs channel.close [" + index + "] end");
            }
        }

        static void Main(string[] args)
        {
            //----------------------------------------
            // Delete
            //----------------------------------------
            for (;;)
            {
                Console.Write(@"削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
Name         : キューを削除します
"""" + [Enter] : 次のステップへ進む
ex.) BrokenBox
Delete ? > ");
                string queueName_delete = Console.ReadLine();
                if (""== queueName_delete.Trim())
                {
                    break;
                }

                
                Console.Write(@"削除するメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
("+ ((int)LifeSpanType.Durable) + @") durable    : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Delete ? > ");
                LifeSpanType lifeSpan_delete = (LifeSpanType)int.Parse(Console.ReadLine());
                SetLifeSpan(DELETEQUEUE_INDEX, queueName_delete, lifeSpan_delete);
                uint result = DeleteQueue();
                Console.WriteLine(@"["+ queueName_delete + "]キューを削除したはずだぜ☆(^~^) result=["+ result + "] ただし、中にメッセージが残っていると消すのに失敗するみたいだが。");
            }

            //----------------------------------------
            // Enqueue settings
            //----------------------------------------
            for (;;)
            {
                Console.Write(@"エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) SunnyBox
Enqueue ? > ");
                string queueName_enqueue = Console.ReadLine();

                Console.Write(@"エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(" + ((int)LifeSpanType.Durable) + @") durable    : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 3
Enqueue ? > ");
                int lifeSpan_enqueue;
                if(int.TryParse(Console.ReadLine(),out lifeSpan_enqueue))
                {
                    SetLifeSpan(ENQUEUE_INDEX, queueName_enqueue, (LifeSpanType)lifeSpan_enqueue);
                    break;
                }
            }

            //----------------------------------------
            // Enqueue settings
            //----------------------------------------
            EventingBasicConsumer consumer = null;
            for (;;)
            {
                Console.Write(@"デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > ");
                string queueName_dequeue = Console.ReadLine();

                Console.Write(@"デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(" + ((int)LifeSpanType.Durable) + @") durable    : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > ");
                int lifeSpan_dequeue;
                if(int.TryParse(Console.ReadLine(),out lifeSpan_dequeue))
                {
                    SetLifeSpan(DEQUEUE_INDEX, queueName_dequeue, (LifeSpanType)lifeSpan_dequeue);
                    break;
                }
            }

            StartConsume(out consumer);

            Console.WriteLine(GetNow() + "34_cs (P)___ waitRegistered begin");
            while (!flag_registered)
            {
                Thread.Sleep(20);
            }
            Console.WriteLine(GetNow() + "34_cs (P)___ waitRegistered end");

            Console.Write(@"終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > ");
            for (;;)
            {
                // "Hello World!" などを入力
                string line = Console.ReadLine();
                Enqueue(line);
                Console.Write(@"Enqueue? > ");
            }

            // ここには来ない
            // CloseConnection();
        }

        static uint DeleteQueue()
        {
            IModel channel = GetChannel(DELETEQUEUE_INDEX);

#if UBUNTU
            // Ubuntuでは何故か 昔の .Net 用の引数 5 つのやつの変則版になっている。
            // uint QueueDelete(ushort ticket, string queue, bool ifUnused, bool ifEmpty, bool nowait)
            uint result = channel.QueueDelete( name_queues[DELETEQUEUE_INDEX],true,true,true);
#else
            uint result = channel.QueueDelete(name_queues[DELETEQUEUE_INDEX],true,true);
#endif

            // 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
            CloseConnection(DELETEQUEUE_INDEX);
            //CloseChannel(DELETEQUEUE_INDEX);
            return result;
        }

        static void Enqueue(string message)
        {
            byte[] body = Encoding.UTF8.GetBytes(message);

            Console.WriteLine(GetNow() + "34_cs (P)___ enqueue begin [" + message + "]");
            GetChannel(ENQUEUE_INDEX).BasicPublish("", name_queues[ENQUEUE_INDEX], null, body);
            Console.WriteLine(GetNow() + "34_cs (P)___ enqueue end");

            // 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
            CloseConnection(ENQUEUE_INDEX);
            //CloseChannel(ENQUEUE_INDEX);
        }

        static void StartConsume(out EventingBasicConsumer consumer)
        {
            Console.WriteLine(GetNow() + "34_cs ___(C) start begin");

            consumer = GetConsumer(DEQUEUE_INDEX); // ローカル変数だとスコープが切れて寿命が尽きてしまうので、外側に返す

            // 受信できたときに割り込んでくる処理
            consumer.Received += GetDequeueHandler();
            consumer.Registered += GetRegisteredHandler();

#if UBUNTU
            // Ubuntuでは何故か引数が 6 個のやつに似ている。
            // string BasicConsume(ushort ticket, string queue, bool noAck, string consumerTag, IDictionary filter, IBasicConsumer consumer)
            GetChannel(DEQUEUE_INDEX).BasicConsume( name_queues[DEQUEUE_INDEX], true, "", null, consumer);
#else
            GetChannel(DEQUEUE_INDEX).BasicConsume( name_queues[DEQUEUE_INDEX], true, consumer);
#endif
            // 終了はさせない
            // consumer.Received -= GetReceiveHandler();
            // CloseChannel(DEQUEUE_INDEX);

            Console.WriteLine(GetNow() + "34_cs ___(C) start end");
        }
    }
}

tamesi35a2_cpp.cpp

//--------------------------------------------------------------------------------
// OS      : Windows10 : It not work. This program for Ubuntu 16.04.
//
// OS      : Ubuntu 16.04
// Library : libev
//         : Install   : Command  : sudo apt-get update
//                                : sudo apt-get install libev-dev
// Service : RabbitMQ
//         : Install   : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
//         : Reference : Web site : Top page http://www.rabbitmq.com/
//                     : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
//                     : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
//         : Manual    : Command  : man rabbitmqctl
//         : Start     : Command  : (1) rabbitmq-server
//                                : (2) [Ctrl] + [Z]
//                                : (3) bg (Job Number)
//         : Stop      : Command  : rabbitmqctl stop
//         : Check     : Command  : rabbitmqctl status
//         :           : Command  : rabbitmqctl list_queues
//                                : // Count messages in queues.
// Library : AMQP-CPP
//         : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
//         : Reference : Web site : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
//         : Reference : Web site : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
//         : Reference : Web site : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
//         : Reference : Web site : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
//         : Reference : Web site : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//
// Program : this
//         : Author                         : Satoshi TAKAHASHI (Handle. Muzudho)
//         : License                        : MIT License https://opensource.org/licenses/MIT
//         : Explain                        : For test. Practice of interprocess communication.
//                                          : This program run on the background side.
//                                          : Append daze to received message on interprocess communication.
//                                          : Daze meaning is dialect in the suburbs of Tokyo.
//                                          : Please settings source queue, destination queue to command line argument.
//                                          :   And standard input to /dev/null
//         : Compile   : Command            : g++ -std=c++11 tamesi35a2_cpp.cpp -o tamesi35a2_cpp.exe -lev -lamqpcpp -pthread
//                                : Explain :     g++                    C++ Compiler
//                                          :     -std=c++11             C++ 11
//                                          :     tamesi35a2_cpp.cpp     Source file
//                                          :     -o tamesi35a2_cpp.exe  Output executable file
//                                          :     -lev                   Used library libev
//                                          :     -lamqpcpp              Used library AMQP-CPP
//                                          :     -pthread               Used library thread
//         : Execute   : Command            : ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
//                                          : Run on the background.
//                                : Explain :     ./tamesi35a2_cpp.exe         Executable file
//                                          :     --enqueue                    Destination queue settings section
//                                          :     1113                         Destination queue name             (string ok)
//                                          :     durable                      Destination queue life span        (multiple)
//                                          :     --dequeue                    Source queue settings section
//                                          :     1112                         Source queue name                  (string ok)
//                                          :     durable                      Source queue life span
//                                          :     autodelete                   Source queue life span
//                                          :     > ./tamesi35a2_cpp.out.log   Standard output to   ./tamesi35a2_cpp.out.log
//                                          :     2> ./tamesi35a2_cpp.err.log  Standard error  to   ./tamesi35a2_cpp.err.log
//                                          :     < /dev/null                  Standard input  from nothing
//                                          :     &                            Process goto the background
//                                : Explain : command line argument of life span of queue (Compositable)
//                                          :     durable     The queue exists, Even if the RabbitMQ service closed.
//                                          :     autodelete  The queue delete when consumers to be nothing.
//                                          :     passive     Just check what queue exists. Don't use read/write to queue.
//                                          :     exclusive   The queue delete when connection closed. The queue exists is this connection only.
//                                : Example : Windows 10   (This program not work) default queue life span.  durable
//                                          : Ubuntu 16.04                         default queue life span.  durable autodelete
//         : Stop      : Typing   : [Ctrl]+[C]
//         : Referrence: ラムダ式(C++11) (cpprefjp - C++日本語リファレンス) https ://cpprefjp.github.io/lang/cpp11/lambda_expressions.html
//--------------------------------------------------------------------------------

#include <string> // std::string
#include <iostream> // std::cout
#include <iomanip> // std::setfill(...)
#include <sstream> // std::ostringstream
#include <chrono>
#include <thread> // sleep_for
#include <future> // thread

// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

// AMQP-CPPでの実装 :
// AMQP::durable=[1] RabbitMQが止まってもキューを残す
// AMQP::autodelete=[2] コンシューマーが1人も接続していなかったら消す
// AMQP::passive=[8] キューが存在するかどうかチェックするだけ。中身見ない時これ
// AMQP::exclusive=[512] この接続でだけ使える。この接続が切れたら消す
typedef int LifeSpan_t;

static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
// 0 : enqueue用設定
// 1 : dequeue用設定
static const int ENQUEUE_INDEX = 0;
static const int DEQUEUE_INDEX = 1;
static const int NUM_INDEX = 2;
static std::string name_queues[] = { "", "" };
static LifeSpan_t lifeSpan_queues[] = { (LifeSpan_t)0, (LifeSpan_t)0 };
static LifeSpan_t LifeSpanString_To_Int(std::string lifeSpan)
{
	if ("durable"==lifeSpan) {
		return AMQP::durable;
	} else if ("autodelete" == lifeSpan) {
		return AMQP::autodelete;
	} else if ("passive" == lifeSpan) {
		return AMQP::passive;
	} else if ("exclusive" == lifeSpan) {
		return AMQP::exclusive;
	} else {
		std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
		exit(1);
	}
}
std::string exchange_name = "myexchange";
std::string routing_key = "";

static std::string Dump()
{
	static std::ostringstream sb;
	sb << "tamesi35a2_cs.cs Dump" << std::endl
		<< "    name_queues         [ENQUEUE_INDEX]    =[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
		<< "    lifeSpan_queues     [ENQUEUE_INDEX]    =[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl
		<< "    ----" << std::endl
		<< "    name_queues         [DEQUEUE_INDEX]    =[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl
		<< "    lifeSpan_queues     [DEQUEUE_INDEX]    =[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl
		<< "    ----" << std::endl;
	return sb.str();
}
// 時計はずれているが、現在時刻が返ってくる
static std::string getTimeslipNow()
{
	// どこか外国の 1969年12月31日19時ぐらいから数えたミリ秒だろうか?
	std::chrono::milliseconds ms = std::chrono::duration_cast< std::chrono::milliseconds >(
		std::chrono::system_clock::now().time_since_epoch()
		);

	long long hours = ms.count() / 1000 / 60 / 60 % 24;
	long long minutes = ms.count() / 1000 / 60 % 60;
	long long seconds = ms.count() / 1000 % 60;
	long long milliseconds = ms.count() % 1000;

	std::ostringstream oss;
	oss << std::setfill('0') << std::setw(2) << hours << ":" << std::setfill('0') << std::setw(2) << minutes << ":" << std::setfill('0') << std::setw(2) << seconds << "'" << std::setfill('0') << std::setw(3) << milliseconds << " ";
	return oss.str();
}


// 接続はシングルトンにします
static struct ev_loop* pLoop_ev[2] = { ev_loop_new(), ev_loop_new() };
static AMQP::LibEvHandler* pHandler_ev[2] = { new AMQP::LibEvHandler(pLoop_ev[ENQUEUE_INDEX]), new AMQP::LibEvHandler(pLoop_ev[DEQUEUE_INDEX]) };

static AMQP::TcpConnection* pConnection_ev[2] = { nullptr,nullptr };
static AMQP::TcpConnection* getConnection(int index)
{
	if (nullptr== pConnection_ev[index]) {
		std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] connection.open begin" << std::endl;
		pConnection_ev[index] = new AMQP::TcpConnection( pHandler_ev[index], ADDRESS );
		std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] connection.open end" << std::endl;
	}
	return pConnection_ev[index];
}
static void closeChannel(int index);
static void closeConnection(int index)
{
	if (nullptr != pConnection_ev[index]) {
		std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] connection.close begin" << std::endl;
		// チャンネルにもヌルのフラグを入れる
		closeChannel(index);

		pConnection_ev[index]->close();
		pConnection_ev[index] = nullptr;
		std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] connection.close end" << std::endl;
	}
}

// チャンネルはシングルトンにします。
static AMQP::TcpChannel* pChannel_ev[2] = { nullptr,nullptr };
static AMQP::TcpChannel* getChannel(int index)
{
	if (nullptr== pChannel_ev[index])
	{
		std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] channel.open begin" << std::endl;
		pChannel_ev[index] = new AMQP::TcpChannel(getConnection(index));
		std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] channel.open end" << std::endl;
	}
	return pChannel_ev[index];
}
static void closeChannel(int index)
{
	if (nullptr != pChannel_ev[index]) {
		std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] channel.close begin" << std::endl;
		pChannel_ev[index]->close();
		pChannel_ev[index] = nullptr;
		std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] channel.close end" << std::endl;
	}
}


/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
	const int bufferSize = 1024;
	static std::string buffer[bufferSize] = {};
	static int bufferCursors[2] = { 0, 0 };
	const int PUT_INDEX = 0;
	const int GET_INDEX = 1;

	std::mutex _mutex;
	static void putMessage(std::string message)
	{
		std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.putMessage begin [" << bufferCursors[PUT_INDEX] << "] before lock [" << message << "]" << std::endl;
		std::lock_guard<std::mutex> lock(_mutex);

		buffer[bufferCursors[PUT_INDEX]] = message;
		std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.putMessage begin [" << bufferCursors[PUT_INDEX] << "] [" << message << "]" << std::endl;
		bufferCursors[PUT_INDEX]++;
		if (!(bufferCursors[PUT_INDEX] < bufferSize))
		{
			bufferCursors[PUT_INDEX] = 0;
		}
		std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.putMessage end next=[" << bufferCursors[PUT_INDEX] << "]" << std::endl;
	}
	static std::string removeMessage()
	{
		//std::cout << "35a2_cpp rotationBuffer.removeMessage begin [" << bufferCursors[GET_INDEX] << "] before lock" << std::endl;
		std::lock_guard<std::mutex> lock(_mutex);

		if ("" != buffer[bufferCursors[GET_INDEX]])
		{
			std::string message = buffer[bufferCursors[GET_INDEX]];
			std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.removeMessage begin [" << bufferCursors[GET_INDEX] << "] [" << message << "]" << std::endl;

			buffer[bufferCursors[GET_INDEX]] = "";
			bufferCursors[GET_INDEX]++;
			if (!(bufferCursors[GET_INDEX] < bufferSize))
			{
				bufferCursors[GET_INDEX] = 0;
			}

			std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.removeMessage end return=[" << message << "] next=[" << bufferCursors[GET_INDEX] << "]" << std::endl;
			return message;
		}
		return "";
	}
}

static void sendMessage()
{
	std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_ begin" << std::endl;

	std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_.declareQueue begin" << std::endl;
	getChannel(ENQUEUE_INDEX)->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan_queues[ENQUEUE_INDEX])
		.onError([](const char* errMsg) {
		std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.SM_.declareQueue.onError [" << errMsg << "] queue name=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan=[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << "\n";
	});
	std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_.declareQueue end" << std::endl;

	std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_.bindQueue begin" << std::endl;
	getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
		// ev_run から1回だけ呼び出される。接続を切るとev_runから抜ける。コミットするにはこのハンドラから抜けること。
		.onSuccess([]() {
			std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ begin" << std::endl;

			bool infinityLoop = true;
			bool enable = true;
			while (infinityLoop)
			{
				if (enable) {
					// 回転式バッファーからメッセージを切り取る。
					std::string message = rotationBuffer::removeMessage();

					if ("" != message)
					{
						// 末尾に daze を付ける。
						message += "daze";
						std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[" << message << "] ****important**** " << std::endl;

						// トランザクション開始
						getChannel(ENQUEUE_INDEX)->startTransaction();
						// 以後、このループは空回りさせる
						enable = false;
						// パブリッシュ開始
						if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
							std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if begin" << std::endl;

							// コミットを開始
							getChannel(ENQUEUE_INDEX)->commitTransaction()
								.onSuccess([=]() {
									// コミット成功
									std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [" << message << "] **********important**********" << std::endl;

									// 接続を切ったあと、トランザクションのコミットが実行されるようだ。
									std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin" << std::endl;
									closeConnection(ENQUEUE_INDEX);
									std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end" << std::endl;
								})
									.onError([&message](const char *errMsg) {
									std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError [" << message << "]" << std::endl;
									std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
								});

							std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if end" << std::endl;
						}
						else {
							std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else begin" << std::endl;
							std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
							getChannel(ENQUEUE_INDEX)->rollbackTransaction();
							std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else end" << std::endl;
						}

						// 無限ループから出て、このハンドラから抜けよう。そのあと ev_run が処理の続き(コミットの続き等)を行う
						infinityLoop = false;
					}
				}
				std::this_thread::sleep_for(std::chrono::milliseconds(20));
			}
			std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ end" << std::endl;

		}).onError([](const char* errMsg) {
			std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
		});
	std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_.bindQueue end" << std::endl;

	std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_ ev_run begin" << std::endl;
	// ここで、バインドキューのオンサクセスの処理へ。
	ev_run(pLoop_ev[ENQUEUE_INDEX], 0);
	std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_ ev_run end" << std::endl;

	std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_ end" << std::endl;
}
// メッセージ・キューの送信を担当
static void workProduce()
{
	std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP begin" << std::endl;

	// おっきな無限ループ
	for (;;) {
		sendMessage();
	}
	std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP outOfInfinityLoop error" << std::endl;
}

// メッセージ・キューの監視を開始
static void workConsume()
{
	std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ begin" << std::endl;

	std::cout << getTimeslipNow() << "35a2_cpp ___(C) DQ_ begin" << std::endl;
	// I will go to the front of the box named "1111".
	LifeSpan_t lifeSpan = lifeSpan_queues[DEQUEUE_INDEX];
	getChannel(DEQUEUE_INDEX)->declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);
	std::cout << getTimeslipNow() << "35a2_cpp ___(C) DQ_ end" << std::endl;

	std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS begin" << std::endl;
	// I look inside the box.
	auto errorCb = [&lifeSpan](const char *errMsg) {
		std::cerr << getTimeslipNow() << "(>_<)35a2_cpp ___(C) CNS.onError My ID watching failed [" << errMsg << "] lifeSpan=[" << lifeSpan << "]" << std::endl;
	};
	auto messageCb = [](const AMQP::Message &amqpMessage, uint64_t deliveryTag, bool redelivered) {

		std::string message(amqpMessage.body(), amqpMessage.bodySize());
		std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received begin [" << message << "]" << std::endl;

		rotationBuffer::putMessage(message);

		std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received.ack begin [" << deliveryTag << "] [" << message << "]" << std::endl;
		getChannel(DEQUEUE_INDEX)->ack(deliveryTag);
		std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received.ack end [" << deliveryTag << "]" << std::endl;
		std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received end [" << message << "]" << std::endl;
	};
	getChannel(DEQUEUE_INDEX)->consume(name_queues[DEQUEUE_INDEX])
		.onReceived(messageCb)
		.onError(errorCb);
	std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS end" << std::endl;

	std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ ev_run begin" << std::endl;
	// I will keep on forever.
	ev_run(pLoop_ev[DEQUEUE_INDEX]);
	std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ ev_run end" << std::endl;

	std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ end" << std::endl;

	// I will not come here.
	return;
}

int main(int argc, char* argv[])
{
	// 引数の解析
	if(0<argc)
	{
		// プログラム名を省き、コマンドライン引数だけをつなげる。
		std::string cmdArg;
		for (int i = 1; i < argc; ++i)
		{
			cmdArg += std::string(argv[i]);
			if (i < argc) {
				cmdArg += " ";
			}
		}
		std::istringstream data(cmdArg);
		std::cout << getTimeslipNow() << "35a2_cpp main commandLineArguments=[" << data.str() << "]" << std::endl;

		// 与件
		// 「--enqueue 1117 durable autodelete --dequeue 1116 durable autodelete」
		// 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」

		// 受け皿
		// name_queues 結果はこれらの配列に入れる
		// lifeSpan_queues

		// 記憶
		int m0 = -1; // enqueue or dequeue index.
		int m1 = -1; // lifespans index.

		// 解析器
		std::string a;
		while (data >> a) {

			if ("--enqueue" == a) { m0 = ENQUEUE_INDEX;	m1 = -1; }
			else if ("--dequeue" == a) { m0 = DEQUEUE_INDEX; m1 = -1; }
			else if (-1 == m1 && 0 == m0) { // エンキュー先キュー名
				name_queues[m0] = a;
				lifeSpan_queues[m0] = 0;
				m1++;
			}
			else if (-1 == m1 && 1 == m0) { // デキュー元キュー名
				name_queues[m0] = a;
				lifeSpan_queues[m0] = 0;
				m1++;
			}
			else { lifeSpan_queues[m0] |= LifeSpanString_To_Int(a); }
		}

		if ("" == name_queues[ENQUEUE_INDEX] || "" == name_queues[DEQUEUE_INDEX]) {
			goto gt_CommandlineParameterError;
		}

		std::cout << getTimeslipNow() << "35a2_cpp main dump begin" << std::endl;
		std::cout << getTimeslipNow() << Dump();
		std::cout << getTimeslipNow() << "35a2_cpp main dump end" << std::endl;
	}
	else
	{
		goto gt_CommandlineParameterError;
	}

	// キューの監視スレッド、メッセージの送信スレッドをスタート
	{
		auto th1 = std::thread([] { workConsume(); });
		auto th2 = std::thread([] { workProduce(); });

		for (;;) // このメイン・スレッドはずっと寝てる
		{
			std::this_thread::sleep_for(std::chrono::seconds(1));
		}
		// このプログラムは、自分では接続を切らないので、ここから下には来ない
		th2.join();
		th1.join();
	}
	return 0;

gt_CommandlineParameterError:
	std::cerr << getTimeslipNow() << "コマンドライン引数の「--enqueue 送信先キュー名 寿命」「--dequeue 受信元キュー名 寿命」を漏れなく指定してください。" << std::endl
		<< "例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete" << std::endl
		<< "寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」" << std::endl
		<< "queueNames[0]=[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
		<< "queueNames[1]=[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl;
	exit(1);
}

次は浮かむ瀬に組み込むので、次の記事で

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?