前回の記事 : http://qiita.com/muzudho1/items/f73f562dabd3cfd655c4
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
14:55:09 109 34_cs (P)___ enqueue begin [a1]
14:55:09 109 34_cs channel.open [0] begin
14:55:09 110 34_cs connection.open begin
14:55:09 116 34_cs connection.open end
14:55:09 118 34_cs channel.open [0] end
14:55:09 124 34_cs (P)___ enqueue end
14:55:09 125 34_cs connection.close [0] begin
14:55:09 125 34_cs channel.close [0] begin
14:55:09 130 34_cs channel.close [0] end
14:55:09 136 34_cs connection.close [0] end
Enqueue? > a2
14:55:11 181 34_cs (P)___ enqueue begin [a2]
14:55:11 181 34_cs channel.open [0] begin
14:55:11 181 34_cs connection.open begin
14:55:11 186 34_cs connection.open end
14:55:11 187 34_cs channel.open [0] end
14:55:11 187 34_cs (P)___ enqueue end
14:55:11 187 34_cs connection.close [0] begin
14:55:11 187 34_cs channel.close [0] begin
14:55:11 189 34_cs channel.close [0] end
14:55:11 190 34_cs connection.close [0] end
Enqueue? > 14:55:11 202 34_cs <-------- [interrupt!] Dequeue(^q^) a2daze
a44
14:55:17 412 34_cs (P)___ enqueue begin [4]
14:55:17 412 34_cs channel.open [0] begin
14:55:17 412 34_cs connection.open begin
14:55:17 423 34_cs connection.open end
14:55:17 425 34_cs channel.open [0] end
14:55:17 425 34_cs (P)___ enqueue end
14:55:17 425 34_cs connection.close [0] begin
14:55:17 425 34_cs channel.close [0] begin
14:55:17 427 34_cs channel.close [0] end
14:55:17 428 34_cs connection.close [0] end
Enqueue? > 14:55:17 439 34_cs <-------- [interrupt!] Dequeue(^q^) 4daze
3秒程度待っても 1回目の打鍵に応答し損ねるが、7秒待つと 1回目の打鍵に応答してくることがあるのは前回の記事までだった。
結局 確定的ではない。
そして問題は
05:48:13'015 35a2_cpp[0] connection.open begin
05:48:13'015 35a2_cpp[0] connection.open end
05:48:13'015 35a2_cpp[0] channel.open end
05:48:13'015 35a2_cpp (P)___ WKP.SM_.declareQueue end
05:48:13'015 35a2_cpp (P)___ WKP.SM_.bindQueue begin
05:48:13'015 35a2_cpp (P)___ WKP.SM_.bindQueue end
05:48:13'015 35a2_cpp (P)___ WKP.SM_ ev_run begin
05:55:09'123 35a2_cpp ___(C) CNS.received begin [a1]
05:55:09'124 35a2_cpp rotationBuffer.putMessage begin [5] before lock [a1]
05:55:09'124 35a2_cpp rotationBuffer.putMessage begin [5] [a1]
05:55:09'124 35a2_cpp rotationBuffer.putMessage end next=[6]
05:55:09'124 35a2_cpp ___(C) CNS.received.ack begin [6] [a1]
05:55:09'124 35a2_cpp ___(C) CNS.received.ack end [6]
05:55:09'124 35a2_cpp ___(C) CNS.received end [a1]
05:55:09'141 35a2_cpp rotationBuffer.removeMessage begin [5] [a1]
05:55:09'141 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[6]
05:55:09'141 35a2_cpp (P)___ WKP.bindQueue.onSuccess next,publish message=[a1daze] ****important****
05:55:09'141 35a2_cpp[0] connection.close begin
05:55:09'141 35a2_cpp[0] channel.close begin
05:55:09'141 35a2_cpp[0] channel.close end
05:55:09'141 35a2_cpp[0] connection.close end
05:55:09'142 35a2_cpp (P)___ WKP.SM_ ev_run end
05:55:09'142 35a2_cpp (P)___ WKP.SM_ end
05:55:09'142 35a2_cpp (P)___ WKP.SM_ begin
05:55:09'142 35a2_cpp (P)___ WKP.SM_.declareQueue begin
05:55:09'142 35a2_cpp[0] channel.open begin
05:55:09'142 35a2_cpp[0] connection.open begin
05:55:09'142 35a2_cpp[0] connection.open end
05:55:09'142 35a2_cpp[0] channel.open end
05:55:09'142 35a2_cpp (P)___ WKP.SM_.declareQueue end
05:55:09'142 35a2_cpp (P)___ WKP.SM_.bindQueue begin
05:55:09'142 35a2_cpp (P)___ WKP.SM_.bindQueue end
05:55:09'142 35a2_cpp (P)___ WKP.SM_ ev_run begin
05:55:11'189 35a2_cpp ___(C) CNS.received begin [a2]
05:55:11'189 35a2_cpp rotationBuffer.putMessage begin [6] before lock [a2]
05:55:11'189 35a2_cpp rotationBuffer.putMessage begin [6] [a2]
05:55:11'189 35a2_cpp rotationBuffer.putMessage end next=[7]
05:55:11'189 35a2_cpp ___(C) CNS.received.ack begin [7] [a2]
05:55:11'189 35a2_cpp ___(C) CNS.received.ack end [7]
05:55:11'189 35a2_cpp ___(C) CNS.received end [a2]
05:55:11'197 35a2_cpp rotationBuffer.removeMessage begin [6] [a2]
05:55:11'197 35a2_cpp rotationBuffer.removeMessage end return=[a2] next=[7]
05:55:11'197 35a2_cpp (P)___ WKP.bindQueue.onSuccess next,publish message=[a2daze] ****important****
05:55:11'197 35a2_cpp[0] connection.close begin
05:55:11'197 35a2_cpp[0] channel.close begin
05:55:11'197 35a2_cpp[0] channel.close end
05:55:11'197 35a2_cpp[0] connection.close end
05:55:11'198 35a2_cpp (P)___ WKP.bindQueue.publish.commited [] **********important**********
05:55:11'203 35a2_cpp (P)___ WKP.SM_ ev_run end
05:55:11'203 35a2_cpp (P)___ WKP.SM_ end
05:55:11'203 35a2_cpp (P)___ WKP.SM_ begin
05:55:11'203 35a2_cpp (P)___ WKP.SM_.declareQueue begin
05:55:11'203 35a2_cpp[0] channel.open begin
05:55:11'203 35a2_cpp[0] connection.open begin
05:55:11'203 35a2_cpp[0] connection.open end
C++ 側で、publish に対応する commited がないペアがあることだった。
ロールバックすらしていない。
何が起こっているのか?
getChannel(ENQUEUE_INDEX)->startTransaction();
if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
getChannel(ENQUEUE_INDEX)->commitTransaction()
.onSuccess([&message]() {
// パブリッシュ成功
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.bindQueue.publish.commited [" << message << "] **********important**********" << std::endl;
})
.onError([&message](const char *errMsg) {
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.publish.failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
});
}
else {
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.publish.failuer message=[" << message << "]" << std::endl;
getChannel(ENQUEUE_INDEX)->rollbackTransaction();
}
// 接続を切って ev_run ループから抜けよう。かつ、このハンドラーから抜けよう。パブリッシュのコミットに必要
closeConnection(ENQUEUE_INDEX);
break;
エラーメッセージは出力されていない。
じゃあ、全部 標準出力に出すか。
if ("" != message)
{
// 末尾に daze を付ける。
message += "daze";
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[" << message << "] ****important**** " << std::endl;
getChannel(ENQUEUE_INDEX)->startTransaction();
if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if begin" << std::endl;
getChannel(ENQUEUE_INDEX)->commitTransaction()
.onSuccess([&message]() {
// パブリッシュ成功
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [" << message << "] **********important**********" << std::endl;
})
.onError([&message](const char *errMsg) {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError [" << message << "]" << std::endl;
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
});
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if end" << std::endl;
} else {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else begin" << std::endl;
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
getChannel(ENQUEUE_INDEX)->rollbackTransaction();
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else end" << std::endl;
}
// 接続を切って ev_run ループから抜けよう。かつ、このハンドラーから抜けよう。パブリッシュのコミットに必要
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin" << std::endl;
closeConnection(ENQUEUE_INDEX);
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end" << std::endl;
break;
}
ひとまず これだけ吐く。
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
15:12:02 345 34_cs (P)___ enqueue begin [a1]
15:12:02 345 34_cs channel.open [0] begin
15:12:02 346 34_cs connection.open begin
15:12:02 356 34_cs connection.open end
15:12:02 358 34_cs channel.open [0] end
15:12:02 364 34_cs (P)___ enqueue end
15:12:02 365 34_cs connection.close [0] begin
15:12:02 365 34_cs channel.close [0] begin
15:12:02 374 34_cs channel.close [0] end
15:12:02 391 34_cs <-------- [interrupt!] Dequeue(^q^) a1daze
15:12:02 392 34_cs connection.close [0] end
Enqueue? > a2
15:12:03 775 34_cs (P)___ enqueue begin [a2]
15:12:03 775 34_cs channel.open [0] begin
15:12:03 775 34_cs connection.open begin
15:12:03 782 34_cs connection.open end
15:12:03 784 34_cs channel.open [0] end
15:12:03 784 34_cs (P)___ enqueue end
15:12:03 784 34_cs connection.close [0] begin
15:12:03 784 34_cs channel.close [0] begin
15:12:03 786 34_cs channel.close [0] end
15:12:03 786 34_cs connection.close [0] end
Enqueue? > 15:12:03 792 34_cs <-------- [interrupt!] Dequeue(^q^) a2daze
a3
15:12:06 120 34_cs (P)___ enqueue begin [a3]
15:12:06 122 34_cs channel.open [0] begin
15:12:06 123 34_cs connection.open begin
15:12:06 143 34_cs connection.open end
15:12:06 150 34_cs channel.open [0] end
15:12:06 151 34_cs (P)___ enqueue end
15:12:06 151 34_cs connection.close [0] begin
15:12:06 151 34_cs channel.close [0] begin
15:12:06 152 34_cs channel.close [0] end
15:12:06 153 34_cs connection.close [0] end
Enqueue? > a4
15:12:09 127 34_cs (P)___ enqueue begin [a4]
15:12:09 128 34_cs channel.open [0] begin
15:12:09 128 34_cs connection.open begin
15:12:09 140 34_cs connection.open end
15:12:09 142 34_cs channel.open [0] end
15:12:09 143 34_cs (P)___ enqueue end
15:12:09 143 34_cs connection.close [0] begin
15:12:09 143 34_cs channel.close [0] begin
15:12:09 143 34_cs channel.close [0] end
15:12:09 144 34_cs connection.close [0] end
Enqueue? > a5
15:12:10 727 34_cs (P)___ enqueue begin [a5]
15:12:10 727 34_cs channel.open [0] begin
15:12:10 727 34_cs connection.open begin
15:12:10 738 34_cs connection.open end
15:12:10 740 34_cs channel.open [0] end
15:12:10 740 34_cs (P)___ enqueue end
15:12:10 740 34_cs connection.close [0] begin
15:12:10 740 34_cs channel.close [0] begin
15:12:10 741 34_cs channel.close [0] end
15:12:10 742 34_cs connection.close [0] end
不具合の起こり方が変わった。
なんで 標準出力を書き足す と挙動が変わるのか……。
06:10:31'468 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
06:10:31'468 35a2_cpp main dump begin
06:10:31'468 tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
06:10:31'468 35a2_cpp main dump end
06:10:31'468 35a2_cpp ___(C) ___ begin
06:10:31'468 35a2_cpp ___(C) DQ_ begin
06:10:31'468 35a2_cpp[1] channel.open begin
06:10:31'468 35a2_cpp[1] connection.open begin
06:10:31'469 35a2_cpp (P)___ WKP begin
06:10:31'469 35a2_cpp (P)___ WKP.SM_ begin
06:10:31'469 35a2_cpp (P)___ WKP.SM_.declareQueue begin
06:10:31'469 35a2_cpp[0] channel.open begin
06:10:31'469 35a2_cpp[0] connection.open begin
06:10:31'469 35a2_cpp[0] connection.open end
06:10:31'470 35a2_cpp[0] channel.open end
06:10:31'470 35a2_cpp[1] connection.open end
06:10:31'470 35a2_cpp[1] channel.open end
06:10:31'471 35a2_cpp ___(C) DQ_ end
06:10:31'471 35a2_cpp ___(C) CNS begin
06:10:31'471 35a2_cpp ___(C) CNS end
06:10:31'471 35a2_cpp ___(C) ___ ev_run begin
06:10:31'471 35a2_cpp (P)___ WKP.SM_.declareQueue end
06:10:31'471 35a2_cpp (P)___ WKP.SM_.bindQueue begin
06:10:31'471 35a2_cpp (P)___ WKP.SM_.bindQueue end
06:10:31'471 35a2_cpp (P)___ WKP.SM_ ev_run begin
06:12:02'367 35a2_cpp ___(C) CNS.received begin [a1]
06:12:02'367 35a2_cpp rotationBuffer.putMessage begin [0] before lock [a1]
06:12:02'367 35a2_cpp rotationBuffer.putMessage begin [0] [a1]
06:12:02'367 35a2_cpp rotationBuffer.putMessage end next=[1]
06:12:02'367 35a2_cpp ___(C) CNS.received.ack begin [1] [a1]
06:12:02'367 35a2_cpp ___(C) CNS.received.ack end [1]
06:12:02'367 35a2_cpp ___(C) CNS.received end [a1]
06:12:02'375 35a2_cpp rotationBuffer.removeMessage begin [0] [a1]
06:12:02'375 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[1]
06:12:02'375 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a1daze] ****important****
06:12:02'375 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:12:02'375 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:12:02'375 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
06:12:02'375 35a2_cpp[0] connection.close begin
06:12:02'375 35a2_cpp[0] channel.close begin
06:12:02'375 35a2_cpp[0] channel.close end
06:12:02'375 35a2_cpp[0] connection.close end
06:12:02'375 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
06:12:02'377 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [] **********important**********
06:12:02'388 35a2_cpp (P)___ WKP.SM_ ev_run end
06:12:02'388 35a2_cpp (P)___ WKP.SM_ end
06:12:02'388 35a2_cpp (P)___ WKP.SM_ begin
06:12:02'388 35a2_cpp (P)___ WKP.SM_.declareQueue begin
06:12:02'388 35a2_cpp[0] channel.open begin
06:12:02'388 35a2_cpp[0] connection.open begin
06:12:02'388 35a2_cpp[0] connection.open end
06:12:02'388 35a2_cpp[0] channel.open end
06:12:02'388 35a2_cpp (P)___ WKP.SM_.declareQueue end
06:12:02'388 35a2_cpp (P)___ WKP.SM_.bindQueue begin
06:12:02'388 35a2_cpp (P)___ WKP.SM_.bindQueue end
06:12:02'388 35a2_cpp (P)___ WKP.SM_ ev_run begin
06:12:03'785 35a2_cpp ___(C) CNS.received begin [a2]
06:12:03'785 35a2_cpp rotationBuffer.putMessage begin [1] before lock [a2]
06:12:03'785 35a2_cpp rotationBuffer.putMessage begin [1] [a2]
06:12:03'785 35a2_cpp rotationBuffer.putMessage end next=[2]
06:12:03'785 35a2_cpp ___(C) CNS.received.ack begin [2] [a2]
06:12:03'785 35a2_cpp ___(C) CNS.received.ack end [2]
06:12:03'785 35a2_cpp ___(C) CNS.received end [a2]
06:12:03'791 35a2_cpp rotationBuffer.removeMessage begin [1] [a2]
06:12:03'791 35a2_cpp rotationBuffer.removeMessage end return=[a2] next=[2]
06:12:03'791 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a2daze] ****important****
06:12:03'791 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:12:03'791 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:12:03'791 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
06:12:03'791 35a2_cpp[0] connection.close begin
06:12:03'791 35a2_cpp[0] channel.close begin
06:12:03'791 35a2_cpp[0] channel.close end
06:12:03'791 35a2_cpp[0] connection.close end
06:12:03'791 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
接続を切ることが コミットの条件の1つのようにも見えるが、ログも 接続を切ったところで止まっていてワロスwww
接続を切るのと、コミットが完了するのと、どっちが先か?
bool infinityLoop = true;
while (infinityLoop)
{
// 回転式バッファーからメッセージを切り取る。
std::string message = rotationBuffer::removeMessage();
if ("" != message)
{
// 末尾に daze を付ける。
message += "daze";
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[" << message << "] ****important**** " << std::endl;
getChannel(ENQUEUE_INDEX)->startTransaction();
if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if begin" << std::endl;
getChannel(ENQUEUE_INDEX)->commitTransaction()
.onSuccess([&infinityLoop,&message]() {
// パブリッシュ成功
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [" << message << "] **********important**********" << std::endl;
// 接続を切って ev_run ループから抜けよう。かつ、このハンドラーから抜けよう。パブリッシュのコミットに必要
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess closeConnection begin" << std::endl;
closeConnection(ENQUEUE_INDEX);
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess closeConnection end" << std::endl;
infinityLoop = false;
})
.onError([&message](const char *errMsg) {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError [" << message << "]" << std::endl;
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
});
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if end" << std::endl;
} else {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else begin" << std::endl;
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
getChannel(ENQUEUE_INDEX)->rollbackTransaction();
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else end" << std::endl;
}
//// 接続を切って ev_run ループから抜けよう。かつ、このハンドラーから抜けよう。パブリッシュのコミットに必要
//std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin" << std::endl;
//closeConnection(ENQUEUE_INDEX);
//std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end" << std::endl;
//break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
書き換えた。
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
15:23:09 612 34_cs (P)___ enqueue begin [a1]
15:23:09 613 34_cs channel.open [0] begin
15:23:09 613 34_cs connection.open begin
15:23:09 620 34_cs connection.open end
15:23:09 623 34_cs channel.open [0] end
15:23:09 627 34_cs (P)___ enqueue end
15:23:09 628 34_cs connection.close [0] begin
15:23:09 628 34_cs channel.close [0] begin
15:23:09 634 34_cs channel.close [0] end
15:23:09 643 34_cs connection.close [0] end
Enqueue? > a2
15:23:10 627 34_cs (P)___ enqueue begin [a2]
15:23:10 627 34_cs channel.open [0] begin
15:23:10 627 34_cs connection.open begin
15:23:10 632 34_cs connection.open end
15:23:10 634 34_cs channel.open [0] end
15:23:10 634 34_cs (P)___ enqueue end
15:23:10 634 34_cs connection.close [0] begin
15:23:10 634 34_cs channel.close [0] begin
15:23:10 635 34_cs channel.close [0] end
15:23:10 636 34_cs connection.close [0] end
Enqueue? > a3
15:23:11 355 34_cs (P)___ enqueue begin [a3]
15:23:11 355 34_cs channel.open [0] begin
15:23:11 355 34_cs connection.open begin
15:23:11 365 34_cs connection.open end
15:23:11 374 34_cs channel.open [0] end
15:23:11 375 34_cs (P)___ enqueue end
15:23:11 375 34_cs connection.close [0] begin
15:23:11 375 34_cs channel.close [0] begin
15:23:11 376 34_cs channel.close [0] end
15:23:11 377 34_cs connection.close [0] end
Enqueue? > a4
15:23:12 107 34_cs (P)___ enqueue begin [a4]
15:23:12 107 34_cs channel.open [0] begin
15:23:12 107 34_cs connection.open begin
15:23:12 114 34_cs connection.open end
15:23:12 115 34_cs channel.open [0] end
15:23:12 115 34_cs (P)___ enqueue end
15:23:12 115 34_cs connection.close [0] begin
15:23:12 116 34_cs channel.close [0] begin
15:23:12 118 34_cs channel.close [0] end
15:23:12 118 34_cs connection.close [0] end
Enqueue? > a54
15:23:12 939 34_cs (P)___ enqueue begin [a54]
15:23:12 939 34_cs channel.open [0] begin
15:23:12 939 34_cs connection.open begin
15:23:12 945 34_cs connection.open end
15:23:12 950 34_cs channel.open [0] end
15:23:12 950 34_cs (P)___ enqueue end
15:23:12 951 34_cs connection.close [0] begin
15:23:12 951 34_cs channel.close [0] begin
15:23:12 952 34_cs channel.close [0] end
15:23:12 953 34_cs connection.close [0] end
Enqueue? > a6
15:23:14 083 34_cs (P)___ enqueue begin [a6]
15:23:14 083 34_cs channel.open [0] begin
15:23:14 083 34_cs connection.open begin
15:23:14 093 34_cs connection.open end
15:23:14 095 34_cs channel.open [0] end
15:23:14 095 34_cs (P)___ enqueue end
15:23:14 095 34_cs connection.close [0] begin
15:23:14 095 34_cs channel.close [0] begin
15:23:14 099 34_cs channel.close [0] end
15:23:14 099 34_cs connection.close [0] end
Enqueue? > a6
15:23:14 979 34_cs (P)___ enqueue begin [a6]
15:23:14 979 34_cs channel.open [0] begin
15:23:14 979 34_cs connection.open begin
15:23:14 983 34_cs connection.open end
15:23:14 985 34_cs channel.open [0] end
15:23:14 985 34_cs (P)___ enqueue end
15:23:14 985 34_cs connection.close [0] begin
15:23:14 986 34_cs channel.close [0] begin
15:23:14 989 34_cs channel.close [0] end
15:23:14 989 34_cs connection.close [0] end
Enqueue? > a8
15:23:16 059 34_cs (P)___ enqueue begin [a8]
15:23:16 059 34_cs channel.open [0] begin
15:23:16 059 34_cs connection.open begin
15:23:16 074 34_cs connection.open end
15:23:16 076 34_cs channel.open [0] end
15:23:16 076 34_cs (P)___ enqueue end
15:23:16 076 34_cs connection.close [0] begin
15:23:16 076 34_cs channel.close [0] begin
15:23:16 077 34_cs channel.close [0] end
15:23:16 078 34_cs connection.close [0] end
まったく応答がなくなった。
06:22:19'409 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
06:22:19'409 35a2_cpp main dump begin
06:22:19'409 tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
06:22:19'409 35a2_cpp main dump end
06:22:19'410 35a2_cpp ___(C) ___ begin
06:22:19'410 35a2_cpp (P)___ WKP begin
06:22:19'410 35a2_cpp ___(C) DQ_ begin
06:22:19'410 35a2_cpp (P)___ WKP.SM_ begin
06:22:19'410 35a2_cpp (P)___ WKP.SM_.declareQueue begin
06:22:19'410 35a2_cpp[0] channel.open begin
06:22:19'410 35a2_cpp[1] channel.open begin
06:22:19'410 35a2_cpp[1] connection.open begin
06:22:19'410 35a2_cpp[0] connection.open begin
06:22:19'412 35a2_cpp[1] connection.open end
06:22:19'412 35a2_cpp[0] connection.open end
06:22:19'412 35a2_cpp[1] channel.open end
06:22:19'412 35a2_cpp[0] channel.open end
06:22:19'412 35a2_cpp ___(C) DQ_ end
06:22:19'412 35a2_cpp (P)___ WKP.SM_.declareQueue end
06:22:19'412 35a2_cpp ___(C) CNS begin
06:22:19'412 35a2_cpp (P)___ WKP.SM_.bindQueue begin
06:22:19'412 35a2_cpp (P)___ WKP.SM_.bindQueue end
06:22:19'412 35a2_cpp (P)___ WKP.SM_ ev_run begin
06:22:19'412 35a2_cpp ___(C) CNS end
06:22:19'412 35a2_cpp ___(C) ___ ev_run begin
06:22:19'427 35a2_cpp ___(C) CNS.received begin [a3]
06:22:19'427 35a2_cpp rotationBuffer.putMessage begin [0] before lock [a3]
06:22:19'427 35a2_cpp rotationBuffer.putMessage begin [0] [a3]
06:22:19'427 35a2_cpp rotationBuffer.putMessage end next=[1]
06:22:19'427 35a2_cpp ___(C) CNS.received.ack begin [1] [a3]
06:22:19'427 35a2_cpp ___(C) CNS.received.ack end [1]
06:22:19'427 35a2_cpp ___(C) CNS.received end [a3]
06:22:19'427 35a2_cpp ___(C) CNS.received begin [a4]
06:22:19'427 35a2_cpp rotationBuffer.putMessage begin [1] before lock [a4]
06:22:19'427 35a2_cpp rotationBuffer.putMessage begin [1] [a4]
06:22:19'427 35a2_cpp rotationBuffer.putMessage end next=[2]
06:22:19'427 35a2_cpp ___(C) CNS.received.ack begin [2] [a4]
06:22:19'427 35a2_cpp ___(C) CNS.received.ack end [2]
06:22:19'427 35a2_cpp ___(C) CNS.received end [a4]
06:22:19'428 35a2_cpp ___(C) CNS.received begin [a5]
06:22:19'428 35a2_cpp rotationBuffer.putMessage begin [2] before lock [a5]
06:22:19'428 35a2_cpp rotationBuffer.putMessage begin [2] [a5]
06:22:19'428 35a2_cpp rotationBuffer.putMessage end next=[3]
06:22:19'428 35a2_cpp ___(C) CNS.received.ack begin [3] [a5]
06:22:19'428 35a2_cpp ___(C) CNS.received.ack end [3]
06:22:19'428 35a2_cpp ___(C) CNS.received end [a5]
06:22:19'446 35a2_cpp rotationBuffer.removeMessage begin [0] [a3]
06:22:19'446 35a2_cpp rotationBuffer.removeMessage end return=[a3] next=[1]
06:22:19'446 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a3daze] ****important****
06:22:19'446 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:22:19'446 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:22:19'467 35a2_cpp rotationBuffer.removeMessage begin [1] [a4]
06:22:19'467 35a2_cpp rotationBuffer.removeMessage end return=[a4] next=[2]
06:22:19'467 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a4daze] ****important****
06:22:19'467 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:22:19'467 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:22:19'487 35a2_cpp rotationBuffer.removeMessage begin [2] [a5]
06:22:19'487 35a2_cpp rotationBuffer.removeMessage end return=[a5] next=[3]
06:22:19'487 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a5daze] ****important****
06:22:19'487 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:22:19'487 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:23:09'629 35a2_cpp ___(C) CNS.received begin [a1]
06:23:09'629 35a2_cpp rotationBuffer.putMessage begin [3] before lock [a1]
06:23:09'629 35a2_cpp rotationBuffer.putMessage begin [3] [a1]
06:23:09'629 35a2_cpp rotationBuffer.putMessage end next=[4]
06:23:09'629 35a2_cpp ___(C) CNS.received.ack begin [4] [a1]
06:23:09'629 35a2_cpp ___(C) CNS.received.ack end [4]
06:23:09'629 35a2_cpp ___(C) CNS.received end [a1]
06:23:09'636 35a2_cpp rotationBuffer.removeMessage begin [3] [a1]
06:23:09'636 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[4]
06:23:09'636 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a1daze] ****important****
06:23:09'636 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:23:09'636 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:23:10'635 35a2_cpp ___(C) CNS.received begin [a2]
06:23:10'635 35a2_cpp rotationBuffer.putMessage begin [4] before lock [a2]
06:23:10'635 35a2_cpp rotationBuffer.putMessage begin [4] [a2]
06:23:10'635 35a2_cpp rotationBuffer.putMessage end next=[5]
06:23:10'635 35a2_cpp ___(C) CNS.received.ack begin [5] [a2]
06:23:10'635 35a2_cpp ___(C) CNS.received.ack end [5]
06:23:10'635 35a2_cpp ___(C) CNS.received end [a2]
06:23:10'646 35a2_cpp rotationBuffer.removeMessage begin [4] [a2]
06:23:10'646 35a2_cpp rotationBuffer.removeMessage end return=[a2] next=[5]
06:23:10'646 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a2daze] ****important****
06:23:10'646 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:23:10'646 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:23:11'376 35a2_cpp ___(C) CNS.received begin [a3]
06:23:11'376 35a2_cpp rotationBuffer.putMessage begin [5] before lock [a3]
06:23:11'376 35a2_cpp rotationBuffer.putMessage begin [5] [a3]
06:23:11'376 35a2_cpp rotationBuffer.putMessage end next=[6]
06:23:11'376 35a2_cpp ___(C) CNS.received.ack begin [6] [a3]
06:23:11'376 35a2_cpp ___(C) CNS.received.ack end [6]
06:23:11'377 35a2_cpp ___(C) CNS.received end [a3]
06:23:11'393 35a2_cpp rotationBuffer.removeMessage begin [5] [a3]
06:23:11'393 35a2_cpp rotationBuffer.removeMessage end return=[a3] next=[6]
06:23:11'393 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a3daze] ****important****
06:23:11'393 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:23:11'393 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:23:12'117 35a2_cpp ___(C) CNS.received begin [a4]
06:23:12'117 35a2_cpp rotationBuffer.putMessage begin [6] before lock [a4]
06:23:12'117 35a2_cpp rotationBuffer.putMessage begin [6] [a4]
06:23:12'117 35a2_cpp rotationBuffer.putMessage end next=[7]
06:23:12'117 35a2_cpp ___(C) CNS.received.ack begin [7] [a4]
06:23:12'117 35a2_cpp ___(C) CNS.received.ack end [7]
06:23:12'117 35a2_cpp ___(C) CNS.received end [a4]
06:23:12'119 35a2_cpp rotationBuffer.removeMessage begin [6] [a4]
06:23:12'119 35a2_cpp rotationBuffer.removeMessage end return=[a4] next=[7]
06:23:12'119 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a4daze] ****important****
06:23:12'119 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:23:12'119 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:23:12'952 35a2_cpp ___(C) CNS.received begin [a54]
06:23:12'952 35a2_cpp rotationBuffer.putMessage begin [7] before lock [a54]
06:23:12'952 35a2_cpp rotationBuffer.putMessage begin [7] [a54]
06:23:12'952 35a2_cpp rotationBuffer.putMessage end next=[8]
06:23:12'952 35a2_cpp ___(C) CNS.received.ack begin [8] [a54]
06:23:12'952 35a2_cpp ___(C) CNS.received.ack end [8]
06:23:12'952 35a2_cpp ___(C) CNS.received end [a54]
06:23:12'967 35a2_cpp rotationBuffer.removeMessage begin [7] [a54]
06:23:12'967 35a2_cpp rotationBuffer.removeMessage end return=[a54] next=[8]
06:23:12'967 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a54daze] ****important****
06:23:12'967 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:23:12'967 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:23:14'098 35a2_cpp ___(C) CNS.received begin [a6]
06:23:14'098 35a2_cpp rotationBuffer.putMessage begin [8] before lock [a6]
06:23:14'098 35a2_cpp rotationBuffer.putMessage begin [8] [a6]
06:23:14'098 35a2_cpp rotationBuffer.putMessage end next=[9]
06:23:14'098 35a2_cpp ___(C) CNS.received.ack begin [9] [a6]
06:23:14'098 35a2_cpp ___(C) CNS.received.ack end [9]
06:23:14'098 35a2_cpp ___(C) CNS.received end [a6]
06:23:14'105 35a2_cpp rotationBuffer.removeMessage begin [8] [a6]
06:23:14'105 35a2_cpp rotationBuffer.removeMessage end return=[a6] next=[9]
06:23:14'105 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a6daze] ****important****
06:23:14'105 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:23:14'105 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:23:14'986 35a2_cpp ___(C) CNS.received begin [a6]
06:23:14'986 35a2_cpp rotationBuffer.putMessage begin [9] before lock [a6]
06:23:14'986 35a2_cpp rotationBuffer.putMessage begin [9] [a6]
06:23:14'986 35a2_cpp rotationBuffer.putMessage end next=[10]
06:23:14'986 35a2_cpp ___(C) CNS.received.ack begin [10] [a6]
06:23:14'986 35a2_cpp ___(C) CNS.received.ack end [10]
06:23:14'986 35a2_cpp ___(C) CNS.received end [a6]
06:23:14'993 35a2_cpp rotationBuffer.removeMessage begin [9] [a6]
06:23:14'993 35a2_cpp rotationBuffer.removeMessage end return=[a6] next=[10]
06:23:14'993 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a6daze] ****important****
06:23:14'993 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:23:14'993 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:23:16'077 35a2_cpp ___(C) CNS.received begin [a8]
06:23:16'077 35a2_cpp rotationBuffer.putMessage begin [10] before lock [a8]
06:23:16'077 35a2_cpp rotationBuffer.putMessage begin [10] [a8]
06:23:16'077 35a2_cpp rotationBuffer.putMessage end next=[11]
06:23:16'077 35a2_cpp ___(C) CNS.received.ack begin [11] [a8]
06:23:16'077 35a2_cpp ___(C) CNS.received.ack end [11]
06:23:16'077 35a2_cpp ___(C) CNS.received end [a8]
06:23:16'085 35a2_cpp rotationBuffer.removeMessage begin [10] [a8]
06:23:16'085 35a2_cpp rotationBuffer.removeMessage end return=[a8] next=[11]
06:23:16'085 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a8daze] ****important****
06:23:16'085 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:23:16'085 35a2_cpp (P)___ WKP.BQ_.OS_ if end
コミットもしなくなった。
bool infinityLoop = true;
bool enable = true;
while (infinityLoop)
{
if (enable) {
// 回転式バッファーからメッセージを切り取る。
std::string message = rotationBuffer::removeMessage();
if ("" != message)
{
// 末尾に daze を付ける。
message += "daze";
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[" << message << "] ****important**** " << std::endl;
// トランザクション開始
getChannel(ENQUEUE_INDEX)->startTransaction();
// 以後、このループは空回りさせる
enable = false;
// パブリッシュ開始
if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if begin" << std::endl;
// コミットを開始
getChannel(ENQUEUE_INDEX)->commitTransaction()
.onSuccess([&infinityLoop, &message]() {
// コミット成功
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [" << message << "] **********important**********" << std::endl;
// 無限ループから出て、このハンドラから抜けよう。そのあと ev_run ループから抜ける
infinityLoop = false;
})
.onError([&message](const char *errMsg) {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError [" << message << "]" << std::endl;
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
});
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if end" << std::endl;
}
else {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else begin" << std::endl;
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
getChannel(ENQUEUE_INDEX)->rollbackTransaction();
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else end" << std::endl;
}
// 接続を切ったあと、トランザクションのコミットが実行されるようだ。
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin" << std::endl;
closeConnection(ENQUEUE_INDEX);
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end" << std::endl;
// break;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
これでどうか?
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
15:39:30 672 34_cs (P)___ enqueue begin [a1]
15:39:30 672 34_cs channel.open [0] begin
15:39:30 672 34_cs connection.open begin
15:39:30 679 34_cs connection.open end
15:39:30 681 34_cs channel.open [0] end
15:39:30 685 34_cs (P)___ enqueue end
15:39:30 685 34_cs connection.close [0] begin
15:39:30 686 34_cs channel.close [0] begin
15:39:30 692 34_cs channel.close [0] end
15:39:30 700 34_cs connection.close [0] end
Enqueue? > a2
15:39:32 999 34_cs (P)___ enqueue begin [a2]
15:39:33 000 34_cs channel.open [0] begin
15:39:33 000 34_cs connection.open begin
15:39:33 011 34_cs connection.open end
15:39:33 021 34_cs channel.open [0] end
15:39:33 021 34_cs (P)___ enqueue end
15:39:33 021 34_cs connection.close [0] begin
15:39:33 021 34_cs channel.close [0] begin
15:39:33 024 34_cs channel.close [0] end
15:39:33 025 34_cs connection.close [0] end
Enqueue? > a3
15:39:34 015 34_cs (P)___ enqueue begin [a3]
15:39:34 016 34_cs channel.open [0] begin
15:39:34 016 34_cs connection.open begin
15:39:34 021 34_cs connection.open end
15:39:34 023 34_cs channel.open [0] end
15:39:34 023 34_cs (P)___ enqueue end
15:39:34 023 34_cs connection.close [0] begin
15:39:34 023 34_cs channel.close [0] begin
15:39:34 035 34_cs channel.close [0] end
15:39:34 036 34_cs connection.close [0] end
Enqueue? > a4
15:39:34 871 34_cs (P)___ enqueue begin [a4]
15:39:34 872 34_cs channel.open [0] begin
15:39:34 872 34_cs connection.open begin
15:39:34 880 34_cs connection.open end
15:39:34 883 34_cs channel.open [0] end
15:39:34 883 34_cs (P)___ enqueue end
15:39:34 883 34_cs connection.close [0] begin
15:39:34 883 34_cs channel.close [0] begin
15:39:34 887 34_cs channel.close [0] end
15:39:34 888 34_cs connection.close [0] end
Enqueue? > a5
15:39:36 199 34_cs (P)___ enqueue begin [a5]
15:39:36 199 34_cs channel.open [0] begin
15:39:36 199 34_cs connection.open begin
15:39:36 206 34_cs connection.open end
15:39:36 216 34_cs channel.open [0] end
15:39:36 216 34_cs (P)___ enqueue end
15:39:36 216 34_cs connection.close [0] begin
15:39:36 216 34_cs channel.close [0] begin
15:39:36 218 34_cs channel.close [0] end
15:39:36 219 34_cs connection.close [0] end
これでもダメ。
06:36:55'766 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
06:36:55'766 35a2_cpp main dump begin
06:36:55'766 tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
06:36:55'766 35a2_cpp main dump end
06:36:55'767 35a2_cpp ___(C) ___ begin
06:36:55'767 35a2_cpp ___(C) DQ_ begin
06:36:55'767 35a2_cpp[1] channel.open begin
06:36:55'767 35a2_cpp[1] connection.open begin06:36:55'767
35a2_cpp (P)___ WKP begin
06:36:55'767 35a2_cpp (P)___ WKP.SM_ begin
06:36:55'767 35a2_cpp (P)___ WKP.SM_.declareQueue begin
06:36:55'767 35a2_cpp[0] channel.open begin
06:36:55'767 35a2_cpp[0] connection.open begin
06:36:55'769 35a2_cpp[0] connection.open end
06:36:55'769 35a2_cpp[1] connection.open end
06:36:55'769 35a2_cpp[0] channel.open end
06:36:55'769 35a2_cpp[1] channel.open end
06:36:55'770 35a2_cpp ___(C) DQ_ end
06:36:55'770 35a2_cpp ___(C) CNS begin
06:36:55'770 35a2_cpp ___(C) CNS end
06:36:55'770 35a2_cpp ___(C) ___ ev_run begin
06:36:55'770 35a2_cpp (P)___ WKP.SM_.declareQueue end
06:36:55'770 35a2_cpp (P)___ WKP.SM_.bindQueue begin
06:36:55'770 35a2_cpp (P)___ WKP.SM_.bindQueue end
06:36:55'770 35a2_cpp (P)___ WKP.SM_ ev_run begin
06:39:30'688 35a2_cpp ___(C) CNS.received begin [a1]
06:39:30'688 35a2_cpp rotationBuffer.putMessage begin [0] before lock [a1]
06:39:30'688 35a2_cpp rotationBuffer.putMessage begin [0] [a1]
06:39:30'688 35a2_cpp rotationBuffer.putMessage end next=[1]
06:39:30'688 35a2_cpp ___(C) CNS.received.ack begin [1] [a1]
06:39:30'688 35a2_cpp ___(C) CNS.received.ack end [1]
06:39:30'688 35a2_cpp ___(C) CNS.received end [a1]
06:39:30'694 35a2_cpp rotationBuffer.removeMessage begin [0] [a1]
06:39:30'694 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[1]
06:39:30'695 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a1daze] ****important****
06:39:30'695 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:39:30'695 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:39:30'695 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
06:39:30'695 35a2_cpp[0] connection.close begin
06:39:30'695 35a2_cpp[0] channel.close begin
06:39:30'695 35a2_cpp[0] channel.close end
06:39:30'695 35a2_cpp[0] connection.close end
06:39:30'695 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
06:39:33'023 35a2_cpp ___(C) CNS.received begin [a2]
06:39:33'023 35a2_cpp rotationBuffer.putMessage begin [1] before lock [a2]
06:39:33'023 35a2_cpp rotationBuffer.putMessage begin [1] [a2]
06:39:33'023 35a2_cpp rotationBuffer.putMessage end next=[2]
06:39:33'023 35a2_cpp ___(C) CNS.received.ack begin [2] [a2]
06:39:33'024 35a2_cpp ___(C) CNS.received.ack end [2]
06:39:33'024 35a2_cpp ___(C) CNS.received end [a2]
06:39:34'034 35a2_cpp ___(C) CNS.received begin [a3]
06:39:34'034 35a2_cpp rotationBuffer.putMessage begin [2] before lock [a3]
06:39:34'034 35a2_cpp rotationBuffer.putMessage begin [2] [a3]
06:39:34'034 35a2_cpp rotationBuffer.putMessage end next=[3]
06:39:34'034 35a2_cpp ___(C) CNS.received.ack begin [3] [a3]
06:39:34'034 35a2_cpp ___(C) CNS.received.ack end [3]
06:39:34'034 35a2_cpp ___(C) CNS.received end [a3]
06:39:34'887 35a2_cpp ___(C) CNS.received begin [a4]
06:39:34'887 35a2_cpp rotationBuffer.putMessage begin [3] before lock [a4]
06:39:34'887 35a2_cpp rotationBuffer.putMessage begin [3] [a4]
06:39:34'887 35a2_cpp rotationBuffer.putMessage end next=[4]
06:39:34'887 35a2_cpp ___(C) CNS.received.ack begin [4] [a4]
06:39:34'887 35a2_cpp ___(C) CNS.received.ack end [4]
06:39:34'887 35a2_cpp ___(C) CNS.received end [a4]
06:39:36'218 35a2_cpp ___(C) CNS.received begin [a5]
06:39:36'218 35a2_cpp rotationBuffer.putMessage begin [4] before lock [a5]
06:39:36'218 35a2_cpp rotationBuffer.putMessage begin [4] [a5]
06:39:36'218 35a2_cpp rotationBuffer.putMessage end next=[5]
06:39:36'218 35a2_cpp ___(C) CNS.received.ack begin [5] [a5]
06:39:36'219 35a2_cpp ___(C) CNS.received.ack end [5]
06:39:36'219 35a2_cpp ___(C) CNS.received end [a5]
先にハンドラを抜ければ?
getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
// ev_run から1回だけ呼び出される。接続を切るとev_runから抜ける。コミットするにはこのハンドラから抜けること。
.onSuccess([]() {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ begin" << std::endl;
bool infinityLoop = true;
bool enable = true;
while (infinityLoop)
{
if (enable) {
// 回転式バッファーからメッセージを切り取る。
std::string message = rotationBuffer::removeMessage();
if ("" != message)
{
// 末尾に daze を付ける。
message += "daze";
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[" << message << "] ****important**** " << std::endl;
// トランザクション開始
getChannel(ENQUEUE_INDEX)->startTransaction();
// 以後、このループは空回りさせる
enable = false;
// パブリッシュ開始
if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if begin" << std::endl;
// コミットを開始
getChannel(ENQUEUE_INDEX)->commitTransaction()
.onSuccess([&message]() {
// コミット成功
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [" << message << "] **********important**********" << std::endl;
// 接続を切ったあと、トランザクションのコミットが実行されるようだ。
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin" << std::endl;
closeConnection(ENQUEUE_INDEX);
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end" << std::endl;
})
.onError([&message](const char *errMsg) {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError [" << message << "]" << std::endl;
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
});
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if end" << std::endl;
}
else {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else begin" << std::endl;
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
getChannel(ENQUEUE_INDEX)->rollbackTransaction();
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else end" << std::endl;
}
// 無限ループから出て、このハンドラから抜けよう。そのあと ev_run が処理の続き(コミットの続き等)を行う
infinityLoop = false;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ end" << std::endl;
}).onError([](const char* errMsg) {
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
});
こうすれば、
06:49:39'404 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
06:49:39'405 35a2_cpp main dump begin
06:49:39'405 tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
06:49:39'405 35a2_cpp main dump end
06:49:39'405 35a2_cpp ___(C) ___ begin
06:49:39'405 35a2_cpp ___(C) DQ_ begin
06:49:39'405 35a2_cpp[1] channel.open begin
06:49:39'405 35a2_cpp[1] connection.open begin
06:49:39'405 35a2_cpp (P)___ WKP begin
06:49:39'405 35a2_cpp (P)___ WKP.SM_ begin
06:49:39'405 35a2_cpp (P)___ WKP.SM_.declareQueue begin
06:49:39'405 35a2_cpp[0] channel.open begin
06:49:39'405 35a2_cpp[0] connection.open begin
06:49:39'408 35a2_cpp[0] connection.open end
06:49:39'408 35a2_cpp[1] connection.open end
06:49:39'409 35a2_cpp[1] channel.open end
06:49:39'409 35a2_cpp ___(C) DQ_ end
06:49:39'409 35a2_cpp ___(C) CNS begin
06:49:39'409 35a2_cpp ___(C) CNS end
06:49:39'409 35a2_cpp ___(C) ___ ev_run begin
06:49:39'409 35a2_cpp[0] channel.open end
06:49:39'410 35a2_cpp (P)___ WKP.SM_.declareQueue end
06:49:39'410 35a2_cpp (P)___ WKP.SM_.bindQueue begin
06:49:39'410 35a2_cpp (P)___ WKP.SM_.bindQueue end
06:49:39'410 35a2_cpp (P)___ WKP.SM_ ev_run begin
06:49:39'434 35a2_cpp (P)___ WKP.BQ_.OS_ begin
06:50:19'729 35a2_cpp ___(C) CNS.received begin [a1]
06:50:19'729 35a2_cpp rotationBuffer.putMessage begin [0] before lock [a1]
06:50:19'730 35a2_cpp rotationBuffer.putMessage begin [0] [a1]
06:50:19'730 35a2_cpp rotationBuffer.putMessage end next=[1]
06:50:19'730 35a2_cpp ___(C) CNS.received.ack begin [1] [a1]
06:50:19'730 35a2_cpp ___(C) CNS.received.ack end [1]
06:50:19'730 35a2_cpp ___(C) CNS.received end [a1]
06:50:19'730 35a2_cpp rotationBuffer.removeMessage begin [0] [a1]
06:50:19'730 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[1]
06:50:19'731 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a1daze] ****important****
06:50:19'731 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
06:50:19'731 35a2_cpp (P)___ WKP.BQ_.OS_ if end
06:50:19'751 35a2_cpp (P)___ WKP.BQ_.OS_ end
06:50:19'753 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [�^?A^@^@^@^@^@�^A^]��^?^@^@����������������PjC��^?^@^@@^C^]��^?^@^@0^B^]��^?^@^@p^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^PiC��^?^@$
文字化けがひどい。
どうも、ポインターの指しているところが おかしいんじゃないか?
ちょうど バグが再現するうちに 直してしまおう。
「ラムダ式(C++11)」(cpprefjp - C++日本語リファレンス)
https://cpprefjp.github.io/lang/cpp11/lambda_expressions.html
getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
// ev_run から1回だけ呼び出される。接続を切るとev_runから抜ける。コミットするにはこのハンドラから抜けること。
.onSuccess([]() {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ begin" << std::endl;
bool infinityLoop = true;
bool enable = true;
while (infinityLoop)
{
if (enable) {
// 回転式バッファーからメッセージを切り取る。
std::string message = rotationBuffer::removeMessage();
if ("" != message)
{
// 末尾に daze を付ける。
message += "daze";
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[" << message << "] ****important**** " << std::endl;
// トランザクション開始
getChannel(ENQUEUE_INDEX)->startTransaction();
// 以後、このループは空回りさせる
enable = false;
// パブリッシュ開始
if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if begin" << std::endl;
// コミットを開始
getChannel(ENQUEUE_INDEX)->commitTransaction()
.onSuccess([=]() {
// コミット成功
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [" << message << "] **********important**********" << std::endl;
// 接続を切ったあと、トランザクションのコミットが実行されるようだ。
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin" << std::endl;
closeConnection(ENQUEUE_INDEX);
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end" << std::endl;
})
.onError([&message](const char *errMsg) {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError [" << message << "]" << std::endl;
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
});
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if end" << std::endl;
}
else {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else begin" << std::endl;
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
getChannel(ENQUEUE_INDEX)->rollbackTransaction();
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else end" << std::endl;
}
// 無限ループから出て、このハンドラから抜けよう。そのあと ev_run が処理の続き(コミットの続き等)を行う
infinityLoop = false;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ end" << std::endl;
}).onError([](const char* errMsg) {
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
});
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > 1
16:00:33 780 34_cs ___(C) start begin
16:00:33 787 34_cs consumer.create [1] begin
16:00:33 787 34_cs consumer.create [1] end
16:00:33 788 34_cs channel.open [1] begin
16:00:33 789 34_cs connection.open begin
16:00:33 958 34_cs connection.open end
16:00:33 963 34_cs channel.open [1] end
16:00:33 966 34_cs <-------- [interrupt!] Registered(^q^)
16:00:33 966 34_cs ___(C) start end
16:00:33 966 34_cs (P)___ waitRegistered begin
16:00:33 966 34_cs (P)___ waitRegistered end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
16:00:34 815 34_cs (P)___ enqueue begin [a1]
16:00:34 815 34_cs channel.open [0] begin
16:00:34 816 34_cs connection.open begin
16:00:34 825 34_cs connection.open end
16:00:34 828 34_cs channel.open [0] end
16:00:34 833 34_cs (P)___ enqueue end
16:00:34 834 34_cs connection.close [0] begin
16:00:34 835 34_cs channel.close [0] begin
16:00:34 839 34_cs channel.close [0] end
16:00:34 845 34_cs connection.close [0] end
Enqueue? > 16:00:34 881 34_cs <-------- [interrupt!] Dequeue(^q^) a1daze
a2
16:00:35 814 34_cs (P)___ enqueue begin [a2]
16:00:35 815 34_cs channel.open [0] begin
16:00:35 815 34_cs connection.open begin
16:00:35 818 34_cs connection.open end
16:00:35 821 34_cs channel.open [0] end
16:00:35 822 34_cs (P)___ enqueue end
16:00:35 822 34_cs connection.close [0] begin
16:00:35 822 34_cs channel.close [0] begin
16:00:35 823 34_cs channel.close [0] end
16:00:35 824 34_cs connection.close [0] end
Enqueue? > 16:00:35 853 34_cs <-------- [interrupt!] Dequeue(^q^) a2daze
a3
16:00:36 599 34_cs (P)___ enqueue begin [a3]
16:00:36 599 34_cs channel.open [0] begin
16:00:36 599 34_cs connection.open begin
16:00:36 604 34_cs connection.open end
16:00:36 605 34_cs channel.open [0] end
16:00:36 606 34_cs (P)___ enqueue end
16:00:36 606 34_cs connection.close [0] begin
16:00:36 606 34_cs channel.close [0] begin
16:00:36 607 34_cs channel.close [0] end
16:00:36 608 34_cs connection.close [0] end
Enqueue? > 16:00:36 631 34_cs <-------- [interrupt!] Dequeue(^q^) a3daze
a4
16:00:37 430 34_cs (P)___ enqueue begin [a4]
16:00:37 430 34_cs channel.open [0] begin
16:00:37 431 34_cs connection.open begin
16:00:37 436 34_cs connection.open end
16:00:37 442 34_cs channel.open [0] end
16:00:37 442 34_cs (P)___ enqueue end
16:00:37 442 34_cs connection.close [0] begin
16:00:37 442 34_cs channel.close [0] begin
16:00:37 443 34_cs channel.close [0] end
16:00:37 444 34_cs connection.close [0] end
Enqueue? > 16:00:37 467 34_cs <-------- [interrupt!] Dequeue(^q^) a4daze
a5
16:00:38 263 34_cs (P)___ enqueue begin [a5]
16:00:38 263 34_cs channel.open [0] begin
16:00:38 264 34_cs connection.open begin
16:00:38 269 34_cs connection.open end
16:00:38 270 34_cs channel.open [0] end
16:00:38 270 34_cs (P)___ enqueue end
16:00:38 270 34_cs connection.close [0] begin
16:00:38 270 34_cs channel.close [0] begin
16:00:38 271 34_cs channel.close [0] end
16:00:38 272 34_cs connection.close [0] end
Enqueue? > 16:00:38 312 34_cs <-------- [interrupt!] Dequeue(^q^) a5daze
いい感じだな。
06:59:59'184 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
06:59:59'184 35a2_cpp main dump begin
06:59:59'184 tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
06:59:59'184 35a2_cpp main dump end
06:59:59'184 35a2_cpp ___(C) ___ begin
06:59:59'185 35a2_cpp ___(C) DQ_ begin
06:59:59'184 35a2_cpp (P)___ WKP begin
06:59:59'185 35a2_cpp[1] channel.open begin
06:59:59'185 35a2_cpp[1] connection.open begin
06:59:59'185 35a2_cpp (P)___ WKP.SM_ begin
06:59:59'185 35a2_cpp (P)___ WKP.SM_.declareQueue begin
06:59:59'185 35a2_cpp[0] channel.open begin
06:59:59'185 35a2_cpp[0] connection.open begin
06:59:59'186 35a2_cpp[0] connection.open end
06:59:59'186 35a2_cpp[1] connection.open end
06:59:59'186 35a2_cpp[0] channel.open end
06:59:59'186 35a2_cpp[1] channel.open end
06:59:59'187 35a2_cpp (P)___ WKP.SM_.declareQueue end
06:59:59'187 35a2_cpp ___(C) DQ_ end
06:59:59'187 35a2_cpp (P)___ WKP.SM_.bindQueue begin
06:59:59'187 35a2_cpp (P)___ WKP.SM_.bindQueue end
06:59:59'187 35a2_cpp (P)___ WKP.SM_ ev_run begin
06:59:59'187 35a2_cpp ___(C) CNS begin
06:59:59'187 35a2_cpp ___(C) CNS end
06:59:59'187 35a2_cpp ___(C) ___ ev_run begin
06:59:59'201 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:00:34'835 35a2_cpp ___(C) CNS.received begin [a1]
07:00:34'835 35a2_cpp rotationBuffer.putMessage begin [0] before lock [a1]
07:00:34'835 35a2_cpp rotationBuffer.putMessage begin [0] [a1]
07:00:34'835 35a2_cpp rotationBuffer.putMessage end next=[1]
07:00:34'835 35a2_cpp ___(C) CNS.received.ack begin [1] [a1]
07:00:34'836 35a2_cpp ___(C) CNS.received.ack end [1]
07:00:34'836 35a2_cpp ___(C) CNS.received end [a1]
07:00:34'851 35a2_cpp rotationBuffer.removeMessage begin [0] [a1]
07:00:34'851 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[1]
07:00:34'851 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a1daze] ****important****
07:00:34'851 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:00:34'851 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:00:34'871 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:00:34'873 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a1daze] **********important**********
07:00:34'873 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
07:00:34'873 35a2_cpp[0] connection.close begin
07:00:34'873 35a2_cpp[0] channel.close begin
07:00:34'873 35a2_cpp[0] channel.close end
07:00:34'873 35a2_cpp[0] connection.close end
07:00:34'873 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
07:00:34'875 35a2_cpp (P)___ WKP.SM_ ev_run end
07:00:34'875 35a2_cpp (P)___ WKP.SM_ end
07:00:34'875 35a2_cpp (P)___ WKP.SM_ begin
07:00:34'875 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:00:34'875 35a2_cpp[0] channel.open begin
07:00:34'875 35a2_cpp[0] connection.open begin
07:00:34'875 35a2_cpp[0] connection.open end
07:00:34'875 35a2_cpp[0] channel.open end
07:00:34'875 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:00:34'875 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:00:34'875 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:00:34'875 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:00:34'882 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:00:35'826 35a2_cpp ___(C) CNS.received begin [a2]
07:00:35'826 35a2_cpp rotationBuffer.putMessage begin [1] before lock [a2]
07:00:35'826 35a2_cpp rotationBuffer.putMessage begin [1] [a2]
07:00:35'826 35a2_cpp rotationBuffer.putMessage end next=[2]
07:00:35'826 35a2_cpp ___(C) CNS.received.ack begin [2] [a2]
07:00:35'827 35a2_cpp ___(C) CNS.received.ack end [2]
07:00:35'827 35a2_cpp ___(C) CNS.received end [a2]
07:00:35'830 35a2_cpp rotationBuffer.removeMessage begin [1] [a2]
07:00:35'830 35a2_cpp rotationBuffer.removeMessage end return=[a2] next=[2]
07:00:35'830 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a2daze] ****important****
07:00:35'830 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:00:35'830 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:00:35'851 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:00:35'853 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a2daze] **********important**********
07:00:35'853 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
07:00:35'853 35a2_cpp[0] connection.close begin
07:00:35'853 35a2_cpp[0] channel.close begin
07:00:35'854 35a2_cpp[0] channel.close end
07:00:35'854 35a2_cpp[0] connection.close end
07:00:35'854 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
07:00:35'855 35a2_cpp (P)___ WKP.SM_ ev_run end
07:00:35'856 35a2_cpp (P)___ WKP.SM_ end
07:00:35'856 35a2_cpp (P)___ WKP.SM_ begin
07:00:35'856 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:00:35'856 35a2_cpp[0] channel.open begin
07:00:35'856 35a2_cpp[0] connection.open begin
07:00:35'856 35a2_cpp[0] connection.open end
07:00:35'856 35a2_cpp[0] channel.open end
07:00:35'856 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:00:35'856 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:00:35'856 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:00:35'856 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:00:35'862 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:00:36'607 35a2_cpp ___(C) CNS.received begin [a3]
07:00:36'607 35a2_cpp rotationBuffer.putMessage begin [2] before lock [a3]
07:00:36'607 35a2_cpp rotationBuffer.putMessage begin [2] [a3]
07:00:36'607 35a2_cpp rotationBuffer.putMessage end next=[3]
07:00:36'607 35a2_cpp ___(C) CNS.received.ack begin [3] [a3]
07:00:36'607 35a2_cpp ___(C) CNS.received.ack end [3]
07:00:36'607 35a2_cpp ___(C) CNS.received end [a3]
07:00:36'608 35a2_cpp rotationBuffer.removeMessage begin [2] [a3]
07:00:36'608 35a2_cpp rotationBuffer.removeMessage end return=[a3] next=[3]
07:00:36'608 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a3daze] ****important****
07:00:36'608 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:00:36'608 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:00:36'628 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:00:36'631 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a3daze] **********important**********
07:00:36'631 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
07:00:36'631 35a2_cpp[0] connection.close begin
07:00:36'631 35a2_cpp[0] channel.close begin
07:00:36'631 35a2_cpp[0] channel.close end
07:00:36'631 35a2_cpp[0] connection.close end
07:00:36'631 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
07:00:36'632 35a2_cpp (P)___ WKP.SM_ ev_run end
07:00:36'632 35a2_cpp (P)___ WKP.SM_ end
07:00:36'632 35a2_cpp (P)___ WKP.SM_ begin
07:00:36'632 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:00:36'632 35a2_cpp[0] channel.open begin
07:00:36'632 35a2_cpp[0] connection.open begin
07:00:36'632 35a2_cpp[0] connection.open end
07:00:36'632 35a2_cpp[0] channel.open end
07:00:36'632 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:00:36'632 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:00:36'632 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:00:36'632 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:00:36'637 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:00:37'443 35a2_cpp ___(C) CNS.received begin [a4]
07:00:37'443 35a2_cpp rotationBuffer.putMessage begin [3] before lock [a4]
07:00:37'443 35a2_cpp rotationBuffer.putMessage begin [3] [a4]
07:00:37'443 35a2_cpp rotationBuffer.putMessage end next=[4]
07:00:37'443 35a2_cpp ___(C) CNS.received.ack begin [4] [a4]
07:00:37'443 35a2_cpp ___(C) CNS.received.ack end [4]
07:00:37'443 35a2_cpp ___(C) CNS.received end [a4]
07:00:37'445 35a2_cpp rotationBuffer.removeMessage begin [3] [a4]
07:00:37'445 35a2_cpp rotationBuffer.removeMessage end return=[a4] next=[4]
07:00:37'445 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a4daze] ****important****
07:00:37'445 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:00:37'445 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:00:37'466 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:00:37'467 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a4daze] **********important**********
07:00:37'467 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
07:00:37'467 35a2_cpp[0] connection.close begin
07:00:37'467 35a2_cpp[0] channel.close begin
07:00:37'467 35a2_cpp[0] channel.close end
07:00:37'467 35a2_cpp[0] connection.close end
07:00:37'467 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
07:00:37'468 35a2_cpp (P)___ WKP.SM_ ev_run end
07:00:37'468 35a2_cpp (P)___ WKP.SM_ end
07:00:37'468 35a2_cpp (P)___ WKP.SM_ begin
07:00:37'468 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:00:37'468 35a2_cpp[0] channel.open begin
07:00:37'468 35a2_cpp[0] connection.open begin
07:00:37'469 35a2_cpp[0] connection.open end
07:00:37'469 35a2_cpp[0] channel.open end
07:00:37'469 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:00:37'469 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:00:37'469 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:00:37'469 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:00:37'483 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:00:38'271 35a2_cpp ___(C) CNS.received begin [a5]
07:00:38'271 35a2_cpp rotationBuffer.putMessage begin [4] before lock [a5]
07:00:38'271 35a2_cpp rotationBuffer.putMessage begin [4] [a5]
07:00:38'271 35a2_cpp rotationBuffer.putMessage end next=[5]
07:00:38'271 35a2_cpp ___(C) CNS.received.ack begin [5] [a5]
07:00:38'271 35a2_cpp ___(C) CNS.received.ack end [5]
07:00:38'271 35a2_cpp ___(C) CNS.received end [a5]
07:00:38'290 35a2_cpp rotationBuffer.removeMessage begin [4] [a5]
07:00:38'290 35a2_cpp rotationBuffer.removeMessage end return=[a5] next=[5]
07:00:38'290 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a5daze] ****important****
07:00:38'290 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:00:38'290 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:00:38'310 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:00:38'312 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a5daze] **********important**********
07:00:38'312 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
07:00:38'312 35a2_cpp[0] connection.close begin
07:00:38'312 35a2_cpp[0] channel.close begin
07:00:38'312 35a2_cpp[0] channel.close end
07:00:38'312 35a2_cpp[0] connection.close end
07:00:38'313 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
07:00:38'314 35a2_cpp (P)___ WKP.SM_ ev_run end
07:00:38'314 35a2_cpp (P)___ WKP.SM_ end
07:00:38'314 35a2_cpp (P)___ WKP.SM_ begin
07:00:38'314 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:00:38'314 35a2_cpp[0] channel.open begin
07:00:38'314 35a2_cpp[0] connection.open begin
07:00:38'314 35a2_cpp[0] connection.open end
07:00:38'314 35a2_cpp[0] channel.open end
07:00:38'314 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:00:38'314 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:00:38'314 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:00:38'314 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:00:38'319 35a2_cpp (P)___ WKP.BQ_.OS_ begin
接続を切ることより、ハンドラを抜けることの方が 肝心なんじゃないか?
あと、ラムダのキャプチャをコピーにした。
接続を切るのは 本当に必要なのか?
試してみよう。
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > 1
16:19:38 949 34_cs ___(C) start begin
16:19:38 955 34_cs consumer.create [1] begin
16:19:38 956 34_cs consumer.create [1] end
16:19:38 957 34_cs channel.open [1] begin
16:19:38 958 34_cs connection.open begin
16:19:39 165 34_cs connection.open end
16:19:39 170 34_cs channel.open [1] end
16:19:39 172 34_cs <-------- [interrupt!] Registered(^q^)
16:19:39 172 34_cs ___(C) start end
16:19:39 172 34_cs (P)___ waitRegistered begin
16:19:39 172 34_cs (P)___ waitRegistered end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
16:19:39 924 34_cs (P)___ enqueue begin [a1]
16:19:39 924 34_cs channel.open [0] begin
16:19:39 925 34_cs connection.open begin
16:19:39 932 34_cs connection.open end
16:19:39 934 34_cs channel.open [0] end
16:19:39 937 34_cs (P)___ enqueue end
16:19:39 938 34_cs connection.close [0] begin
16:19:39 938 34_cs channel.close [0] begin
16:19:39 945 34_cs channel.close [0] end
16:19:39 951 34_cs connection.close [0] end
Enqueue? > 16:19:39 978 34_cs <-------- [interrupt!] Dequeue(^q^) a1daze
a2
16:19:40 818 34_cs (P)___ enqueue begin [a2]
16:19:40 819 34_cs channel.open [0] begin
16:19:40 819 34_cs connection.open begin
16:19:40 831 34_cs connection.open end
16:19:40 833 34_cs channel.open [0] end
16:19:40 833 34_cs (P)___ enqueue end
16:19:40 833 34_cs connection.close [0] begin
16:19:40 833 34_cs channel.close [0] begin
16:19:40 835 34_cs channel.close [0] end
16:19:40 835 34_cs connection.close [0] end
Enqueue? > a3
16:19:41 755 34_cs (P)___ enqueue begin [a3]
16:19:41 755 34_cs channel.open [0] begin
16:19:41 755 34_cs connection.open begin
16:19:41 762 34_cs connection.open end
16:19:41 764 34_cs channel.open [0] end
16:19:41 764 34_cs (P)___ enqueue end
16:19:41 764 34_cs connection.close [0] begin
16:19:41 764 34_cs channel.close [0] begin
16:19:41 766 34_cs channel.close [0] end
16:19:41 767 34_cs connection.close [0] end
Enqueue? > a4
16:19:42 835 34_cs (P)___ enqueue begin [a4]
16:19:42 835 34_cs channel.open [0] begin
16:19:42 835 34_cs connection.open begin
16:19:42 851 34_cs connection.open end
16:19:42 853 34_cs channel.open [0] end
16:19:42 854 34_cs (P)___ enqueue end
16:19:42 854 34_cs connection.close [0] begin
16:19:42 854 34_cs channel.close [0] begin
16:19:42 856 34_cs channel.close [0] end
16:19:42 857 34_cs connection.close [0] end
Enqueue? > a5
16:19:43 795 34_cs (P)___ enqueue begin [a5]
16:19:43 795 34_cs channel.open [0] begin
16:19:43 795 34_cs connection.open begin
16:19:43 820 34_cs connection.open end
16:19:43 822 34_cs channel.open [0] end
16:19:43 823 34_cs (P)___ enqueue end
16:19:43 823 34_cs connection.close [0] begin
16:19:43 823 34_cs channel.close [0] begin
16:19:43 825 34_cs channel.close [0] end
16:19:43 826 34_cs connection.close [0] end
接続を切るのは必要みたいだな、理屈は分からないが……。
07:19:11'188 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
07:19:11'189 35a2_cpp main dump begin
07:19:11'189 tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
07:19:11'189 35a2_cpp main dump end
07:19:11'189 35a2_cpp ___(C) ___ begin
07:19:11'189 35a2_cpp ___(C) DQ_ begin
07:19:11'189 35a2_cpp[1] channel.open begin
07:19:11'189 35a2_cpp (P)___ WKP begin
07:19:11'189 35a2_cpp (P)___ WKP.SM_ begin
07:19:11'189 35a2_cpp[1] connection.open begin
07:19:11'189 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:19:11'189 35a2_cpp[0] channel.open begin
07:19:11'189 35a2_cpp[0] connection.open begin
07:19:11'191 35a2_cpp[107:19:11'191 35a2_cpp[] connection.open end0] connection.open end
07:19:11'191 35a2_cpp[0] channel.open end
07:19:11'191 35a2_cpp[1] channel.open end
07:19:11'191 35a2_cpp ___(C) DQ_ end
07:19:11'191 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:19:11'191 35a2_cpp ___(C) CNS begin
07:19:11'191 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:19:11'191 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:19:11'191 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:19:11'191 35a2_cpp ___(C) CNS end
07:19:11'191 35a2_cpp ___(C) ___ ev_run begin
07:19:11'211 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:19:39'939 35a2_cpp ___(C) CNS.received begin [a1]
07:19:39'939 35a2_cpp rotationBuffer.putMessage begin [0] before lock [a1]
07:19:39'939 35a2_cpp rotationBuffer.putMessage begin [0] [a1]
07:19:39'939 35a2_cpp rotationBuffer.putMessage end next=[1]
07:19:39'939 35a2_cpp ___(C) CNS.received.ack begin [1] [a1]
07:19:39'940 35a2_cpp ___(C) CNS.received.ack end [1]
07:19:39'940 35a2_cpp ___(C) CNS.received end [a1]
07:19:39'950 35a2_cpp rotationBuffer.removeMessage begin [0] [a1]
07:19:39'950 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[1]
07:19:39'950 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a1daze] ****important****
07:19:39'951 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:19:39'951 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:19:39'971 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:19:39'977 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a1daze] **********important**********
07:19:40'834 35a2_cpp ___(C) CNS.received begin [a2]
07:19:40'834 35a2_cpp rotationBuffer.putMessage begin [1] before lock [a2]
07:19:40'834 35a2_cpp rotationBuffer.putMessage begin [1] [a2]
07:19:40'834 35a2_cpp rotationBuffer.putMessage end next=[2]
07:19:40'834 35a2_cpp ___(C) CNS.received.ack begin [2] [a2]
07:19:40'834 35a2_cpp ___(C) CNS.received.ack end [2]
07:19:40'835 35a2_cpp ___(C) CNS.received end [a2]
07:19:41'766 35a2_cpp ___(C) CNS.received begin [a3]
07:19:41'766 35a2_cpp rotationBuffer.putMessage begin [2] before lock [a3]
07:19:41'766 35a2_cpp rotationBuffer.putMessage begin [2] [a3]
07:19:41'766 35a2_cpp rotationBuffer.putMessage end next=[3]
07:19:41'766 35a2_cpp ___(C) CNS.received.ack begin [3] [a3]
07:19:41'766 35a2_cpp ___(C) CNS.received.ack end [3]
07:19:41'766 35a2_cpp ___(C) CNS.received end [a3]
07:19:42'856 35a2_cpp ___(C) CNS.received begin [a4]
07:19:42'856 35a2_cpp rotationBuffer.putMessage begin [3] before lock [a4]
07:19:42'856 35a2_cpp rotationBuffer.putMessage begin [3] [a4]
07:19:42'856 35a2_cpp rotationBuffer.putMessage end next=[4]
07:19:42'856 35a2_cpp ___(C) CNS.received.ack begin [4] [a4]
07:19:42'856 35a2_cpp ___(C) CNS.received.ack end [4]
07:19:42'856 35a2_cpp ___(C) CNS.received end [a4]
07:19:43'825 35a2_cpp ___(C) CNS.received begin [a5]
07:19:43'825 35a2_cpp rotationBuffer.putMessage begin [4] before lock [a5]
07:19:43'825 35a2_cpp rotationBuffer.putMessage begin [4] [a5]
07:19:43'825 35a2_cpp rotationBuffer.putMessage end next=[5]
07:19:43'825 35a2_cpp ___(C) CNS.received.ack begin [5] [a5]
07:19:43'825 35a2_cpp ___(C) CNS.received.ack end [5]
07:19:43'825 35a2_cpp ___(C) CNS.received end [a5]
接続を切って、bindQueue をし直せ、ということなのだろうか?
だったら、接続を切る代わりに、チャンネルを閉じるとどうなるのか?
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
16:26:34 388 34_cs (P)___ enqueue begin [a1]
16:26:34 389 34_cs channel.open [0] begin
16:26:34 390 34_cs connection.open begin
16:26:34 405 34_cs connection.open end
16:26:34 407 34_cs channel.open [0] end
16:26:34 414 34_cs (P)___ enqueue end
16:26:34 415 34_cs connection.close [0] begin
16:26:34 416 34_cs channel.close [0] begin
16:26:34 423 34_cs channel.close [0] end
16:26:34 429 34_cs connection.close [0] end
Enqueue? > 16:26:34 458 34_cs <-------- [interrupt!] Dequeue(^q^) a1daze
a2
16:26:35 490 34_cs (P)___ enqueue begin [a2]
16:26:35 491 34_cs channel.open [0] begin
16:26:35 491 34_cs connection.open begin
16:26:35 512 34_cs connection.open end
16:26:35 517 34_cs channel.open [0] end
16:26:35 517 34_cs (P)___ enqueue end
16:26:35 517 34_cs connection.close [0] begin
16:26:35 517 34_cs channel.close [0] begin
16:26:35 519 34_cs channel.close [0] end
16:26:35 521 34_cs connection.close [0] end
Enqueue? > a3
16:26:36 513 34_cs (P)___ enqueue begin [a3]
16:26:36 513 34_cs channel.open [0] begin
16:26:36 514 34_cs connection.open begin
16:26:36 520 34_cs connection.open end
16:26:36 522 34_cs channel.open [0] end
16:26:36 522 34_cs (P)___ enqueue end
16:26:36 522 34_cs connection.close [0] begin
16:26:36 522 34_cs channel.close [0] begin
16:26:36 524 34_cs channel.close [0] end
16:26:36 525 34_cs connection.close [0] end
Enqueue? > a4
16:26:38 130 34_cs (P)___ enqueue begin [a4]
16:26:38 130 34_cs channel.open [0] begin
16:26:38 130 34_cs connection.open begin
16:26:38 137 34_cs connection.open end
16:26:38 138 34_cs channel.open [0] end
16:26:38 139 34_cs (P)___ enqueue end
16:26:38 139 34_cs connection.close [0] begin
16:26:38 139 34_cs channel.close [0] begin
16:26:38 140 34_cs channel.close [0] end
16:26:38 141 34_cs connection.close [0] end
07:26:11'524 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
07:26:11'525 35a2_cpp main dump begin
07:26:11'525 tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
07:26:11'525 35a2_cpp main dump end
07:26:11'525 35a2_cpp ___(C) ___ begin
07:26:11'525 35a2_cpp ___(C) DQ_ begin
07:26:11'525 35a2_cpp (P)___ WKP begin
07:26:11'525 35a2_cpp[1] channel.open begin
07:26:11'525 35a2_cpp (P)___ WKP.SM_ begin
07:26:11'525 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:26:11'525 35a2_cpp[1] connection.open begin
07:26:11'525 35a2_cpp[0] channel.open begin
07:26:11'525 35a2_cpp[0] connection.open begin
07:26:11'527 35a2_cpp[0] connection.open end
07:26:11'527 35a2_cpp[1] connection.open end
07:26:11'527 35a2_cpp[0] channel.open end
07:26:11'527 35a2_cpp[1] channel.open end
07:26:11'527 35a2_cpp ___(C) DQ_ end
07:26:11'527 35a2_cpp ___(C) CNS begin
07:26:11'528 35a2_cpp ___(C) CNS end
07:26:11'528 35a2_cpp ___(C) ___ ev_run begin
07:26:11'527 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:26:11'528 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:26:11'528 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:26:11'528 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:26:11'540 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:26:34'415 35a2_cpp ___(C) CNS.received begin [a1]
07:26:34'415 35a2_cpp rotationBuffer.putMessage begin [0] before lock [a1]
07:26:34'415 35a2_cpp rotationBuffer.putMessage begin [0] [a1]
07:26:34'415 35a2_cpp rotationBuffer.putMessage end next=[1]
07:26:34'415 35a2_cpp ___(C) CNS.received.ack begin [1] [a1]
07:26:34'415 35a2_cpp ___(C) CNS.received.ack end [1]
07:26:34'415 35a2_cpp ___(C) CNS.received end [a1]
07:26:34'431 35a2_cpp rotationBuffer.removeMessage begin [0] [a1]
07:26:34'431 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[1]
07:26:34'431 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a1daze] ****important****
07:26:34'431 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:26:34'431 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:26:34'451 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:26:34'453 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a1daze] **********important**********
07:26:34'454 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
07:26:34'454 35a2_cpp[0] channel.close begin
07:26:34'454 35a2_cpp[0] channel.close end
07:26:34'454 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
07:26:35'519 35a2_cpp ___(C) CNS.received begin [a2]
07:26:35'519 35a2_cpp rotationBuffer.putMessage begin [1] before lock [a2]
07:26:35'519 35a2_cpp rotationBuffer.putMessage begin [1] [a2]
07:26:35'519 35a2_cpp rotationBuffer.putMessage end next=[2]
07:26:35'519 35a2_cpp ___(C) CNS.received.ack begin [2] [a2]
07:26:35'519 35a2_cpp ___(C) CNS.received.ack end [2]
07:26:35'519 35a2_cpp ___(C) CNS.received end [a2]
07:26:36'524 35a2_cpp ___(C) CNS.received begin [a3]
07:26:36'524 35a2_cpp rotationBuffer.putMessage begin [2] before lock [a3]
07:26:36'524 35a2_cpp rotationBuffer.putMessage begin [2] [a3]
07:26:36'524 35a2_cpp rotationBuffer.putMessage end next=[3]
07:26:36'524 35a2_cpp ___(C) CNS.received.ack begin [3] [a3]
07:26:36'524 35a2_cpp ___(C) CNS.received.ack end [3]
07:26:36'524 35a2_cpp ___(C) CNS.received end [a3]
07:26:38'141 35a2_cpp ___(C) CNS.received begin [a4]
07:26:38'141 35a2_cpp rotationBuffer.putMessage begin [3] before lock [a4]
07:26:38'141 35a2_cpp rotationBuffer.putMessage begin [3] [a4]
07:26:38'141 35a2_cpp rotationBuffer.putMessage end next=[4]
07:26:38'141 35a2_cpp ___(C) CNS.received.ack begin [4] [a4]
07:26:38'142 35a2_cpp ___(C) CNS.received.ack end [4]
07:26:38'142 35a2_cpp ___(C) CNS.received end [a4]
接続を切ることは 必要な条件のようだ。理屈は分からないが。
と思って2回目を試すと
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
16:31:20 445 34_cs (P)___ enqueue begin [a1]
16:31:20 445 34_cs channel.open [0] begin
16:31:20 445 34_cs connection.open begin
16:31:20 455 34_cs connection.open end
16:31:20 458 34_cs channel.open [0] end
16:31:20 461 34_cs (P)___ enqueue end
16:31:20 462 34_cs connection.close [0] begin
16:31:20 462 34_cs channel.close [0] begin
16:31:20 467 34_cs channel.close [0] end
16:31:20 479 34_cs connection.close [0] end
Enqueue? > a2
16:31:21 291 34_cs (P)___ enqueue begin [a2]
16:31:21 291 34_cs channel.open [0] begin
16:31:21 292 34_cs connection.open begin
16:31:21 297 34_cs connection.open end
16:31:21 299 34_cs channel.open [0] end
16:31:21 299 34_cs (P)___ enqueue end
16:31:21 299 34_cs connection.close [0] begin
16:31:21 299 34_cs channel.close [0] begin
16:31:21 301 34_cs channel.close [0] end
16:31:21 301 34_cs connection.close [0] end
Enqueue? > 16:31:21 331 34_cs <-------- [interrupt!] Dequeue(^q^) a2daze
a3
16:31:22 915 34_cs (P)___ enqueue begin [a3]
16:31:22 916 34_cs channel.open [0] begin
16:31:22 916 34_cs connection.open begin
16:31:22 931 34_cs connection.open end
16:31:22 932 34_cs channel.open [0] end
16:31:22 933 34_cs (P)___ enqueue end
16:31:22 933 34_cs connection.close [0] begin
16:31:22 933 34_cs channel.close [0] begin
16:31:22 935 34_cs channel.close [0] end
16:31:22 936 34_cs connection.close [0] end
Enqueue? > a4
16:31:29 779 34_cs (P)___ enqueue begin [a4]
16:31:29 779 34_cs channel.open [0] begin
16:31:29 779 34_cs connection.open begin
16:31:29 792 34_cs connection.open end
16:31:29 794 34_cs channel.open [0] end
16:31:29 795 34_cs (P)___ enqueue end
16:31:29 795 34_cs connection.close [0] begin
16:31:29 795 34_cs channel.close [0] begin
16:31:29 798 34_cs channel.close [0] end
16:31:29 799 34_cs connection.close [0] end
Enqueue? > 16:31:29 829 34_cs <-------- [interrupt!] Dequeue(^q^) a4daze
接続を切ると、まぐれで うまく行くことがある、ということか。
07:30:44'066 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
07:30:44'066 35a2_cpp main dump begin
07:30:44'066 tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
07:30:44'066 35a2_cpp main dump end
07:30:44'067 35a2_cpp ___(C) ___ begin
07:30:44'067 35a2_cpp ___(C) DQ_ begin
07:30:44'067 35a2_cpp (P)___ WKP begin
07:30:44'067 35a2_cpp[1] channel.open begin
07:30:44'067 35a2_cpp (P)___ WKP.SM_ begin
07:30:44'067 35a2_cpp[1] connection.open begin
07:30:44'067 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:30:44'067 35a2_cpp[0] channel.open begin
07:30:44'067 35a2_cpp[0] connection.open begin
07:30:44'071 35a2_cpp[0] connection.open end
07:30:44'071 35a2_cpp[1] connection.open end
07:30:44'072 35a2_cpp[1] channel.open end
07:30:44'072 35a2_cpp[0] channel.open end
07:30:44'072 35a2_cpp ___(C) DQ_ end
07:30:44'072 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:30:44'072 35a2_cpp ___(C) CNS begin
07:30:44'072 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:30:44'072 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:30:44'072 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:30:44'072 35a2_cpp ___(C) CNS end
07:30:44'072 35a2_cpp ___(C) ___ ev_run begin
07:30:44'095 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:31:21'301 35a2_cpp ___(C) CNS.received begin [a2]
07:31:21'301 35a2_cpp rotationBuffer.putMessage begin [0] before lock [a2]
07:31:21'301 35a2_cpp rotationBuffer.putMessage begin [0] [a2]
07:31:21'301 35a2_cpp rotationBuffer.putMessage end next=[1]
07:31:21'301 35a2_cpp ___(C) CNS.received.ack begin [1] [a2]
07:31:21'301 35a2_cpp ___(C) CNS.received.ack end [1]
07:31:21'301 35a2_cpp ___(C) CNS.received end [a2]
07:31:21'302 35a2_cpp rotationBuffer.removeMessage begin [0] [a2]
07:31:21'302 35a2_cpp rotationBuffer.removeMessage end return=[a2] next=[1]
07:31:21'302 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a2daze] ****important****
07:31:21'302 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:31:21'302 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:31:21'323 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:31:21'324 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a2daze] **********important**********
07:31:21'324 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
07:31:21'324 35a2_cpp[0] connection.close begin
07:31:21'324 35a2_cpp[0] channel.close begin
07:31:21'324 35a2_cpp[0] channel.close end
07:31:21'324 35a2_cpp[0] connection.close end
07:31:21'324 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
07:31:21'330 35a2_cpp (P)___ WKP.SM_ ev_run end
07:31:21'330 35a2_cpp (P)___ WKP.SM_ end
07:31:21'330 35a2_cpp (P)___ WKP.SM_ begin
07:31:21'330 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:31:21'330 35a2_cpp[0] channel.open begin
07:31:21'330 35a2_cpp[0] connection.open begin
07:31:21'330 35a2_cpp[0] connection.open end
07:31:21'330 35a2_cpp[0] channel.open end
07:31:21'330 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:31:21'330 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:31:21'330 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:31:21'330 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:31:21'344 35a2_cpp (P)___ WKP.BQ_.OS_ begin
07:31:29'797 35a2_cpp ___(C) CNS.received begin [a4]
07:31:29'797 35a2_cpp rotationBuffer.putMessage begin [1] before lock [a4]
07:31:29'797 35a2_cpp rotationBuffer.putMessage begin [1] [a4]
07:31:29'798 35a2_cpp rotationBuffer.putMessage end next=[2]
07:31:29'798 35a2_cpp ___(C) CNS.received.ack begin [2] [a4]
07:31:29'798 35a2_cpp ___(C) CNS.received.ack end [2]
07:31:29'798 35a2_cpp ___(C) CNS.received end [a4]
07:31:29'806 35a2_cpp rotationBuffer.removeMessage begin [1] [a4]
07:31:29'807 35a2_cpp rotationBuffer.removeMessage end return=[a4] next=[2]
07:31:29'807 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[a4daze] ****important****
07:31:29'807 35a2_cpp (P)___ WKP.BQ_.OS_ if begin
07:31:29'807 35a2_cpp (P)___ WKP.BQ_.OS_ if end
07:31:29'827 35a2_cpp (P)___ WKP.BQ_.OS_ end
07:31:29'829 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [a4daze] **********important**********
07:31:29'829 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin
07:31:29'829 35a2_cpp[0] connection.close begin
07:31:29'829 35a2_cpp[0] channel.close begin
07:31:29'829 35a2_cpp[0] channel.close end
07:31:29'829 35a2_cpp[0] connection.close end
07:31:29'829 35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end
07:31:29'831 35a2_cpp (P)___ WKP.SM_ ev_run end
07:31:29'831 35a2_cpp (P)___ WKP.SM_ end
07:31:29'831 35a2_cpp (P)___ WKP.SM_ begin
07:31:29'831 35a2_cpp (P)___ WKP.SM_.declareQueue begin
07:31:29'831 35a2_cpp[0] channel.open begin
07:31:29'831 35a2_cpp[0] connection.open begin
07:31:29'831 35a2_cpp[0] connection.open end
07:31:29'831 35a2_cpp[0] channel.open end
07:31:29'831 35a2_cpp (P)___ WKP.SM_.declareQueue end
07:31:29'831 35a2_cpp (P)___ WKP.SM_.bindQueue begin
07:31:29'831 35a2_cpp (P)___ WKP.SM_.bindQueue end
07:31:29'831 35a2_cpp (P)___ WKP.SM_ ev_run begin
07:31:29'844 35a2_cpp (P)___ WKP.BQ_.OS_ begin
なんだろう、また同じ不具合だ。
# jobs
[1] Running rabbitmq-server & (wd: /home/★user/shogi/csharp_service)
[2]- Running ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
[3]+ Running ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
2つ走っているのが怪しい。
直った。
# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
Name : キューを削除します
"" + [Enter] : 次のステップへ進む
ex.) BrokenBox
Delete ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) SunnyBox
Enqueue ? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
ex.) 3
Enqueue ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > 1
16:37:39 253 34_cs ___(C) start begin
16:37:39 259 34_cs consumer.create [1] begin
16:37:39 259 34_cs consumer.create [1] end
16:37:39 260 34_cs channel.open [1] begin
16:37:39 261 34_cs connection.open begin
16:37:39 415 34_cs connection.open end
16:37:39 420 34_cs channel.open [1] end
16:37:39 423 34_cs <-------- [interrupt!] Registered(^q^)
16:37:39 423 34_cs ___(C) start end
16:37:39 423 34_cs (P)___ waitRegistered begin
16:37:39 423 34_cs (P)___ waitRegistered end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a
16:37:40 100 34_cs (P)___ enqueue begin [a]
16:37:40 101 34_cs channel.open [0] begin
16:37:40 101 34_cs connection.open begin
16:37:40 110 34_cs connection.open end
16:37:40 118 34_cs channel.open [0] end
16:37:40 123 34_cs (P)___ enqueue end
16:37:40 124 34_cs connection.close [0] begin
16:37:40 124 34_cs channel.close [0] begin
16:37:40 130 34_cs channel.close [0] end
16:37:40 134 34_cs connection.close [0] end
Enqueue? > 16:37:40 156 34_cs <-------- [interrupt!] Dequeue(^q^) adaze
b
16:37:40 811 34_cs (P)___ enqueue begin [b]
16:37:40 812 34_cs channel.open [0] begin
16:37:40 812 34_cs connection.open begin
16:37:40 817 34_cs connection.open end
16:37:40 819 34_cs channel.open [0] end
16:37:40 819 34_cs (P)___ enqueue end
16:37:40 819 34_cs connection.close [0] begin
16:37:40 820 34_cs channel.close [0] begin
16:37:40 821 34_cs channel.close [0] end
16:37:40 822 34_cs connection.close [0] end
Enqueue? > 16:37:40 856 34_cs <-------- [interrupt!] Dequeue(^q^) bdaze
c
16:37:41 603 34_cs (P)___ enqueue begin [c]
16:37:41 604 34_cs channel.open [0] begin
16:37:41 604 34_cs connection.open begin
16:37:41 608 34_cs connection.open end
16:37:41 610 34_cs channel.open [0] end
16:37:41 610 34_cs (P)___ enqueue end
16:37:41 610 34_cs connection.close [0] begin
16:37:41 610 34_cs channel.close [0] begin
16:37:41 612 34_cs channel.close [0] end
16:37:41 613 34_cs connection.close [0] end
Enqueue? > 16:37:41 638 34_cs <-------- [interrupt!] Dequeue(^q^) cdaze
d
16:37:42 227 34_cs (P)___ enqueue begin [d]
16:37:42 227 34_cs channel.open [0] begin
16:37:42 227 34_cs connection.open begin
16:37:42 247 34_cs connection.open end
16:37:42 249 34_cs channel.open [0] end
16:37:42 250 34_cs (P)___ enqueue end
16:37:42 250 34_cs connection.close [0] begin
16:37:42 250 34_cs channel.close [0] begin
16:37:42 252 34_cs channel.close [0] end
16:37:42 252 34_cs connection.close [0] end
Enqueue? > 16:37:42 277 34_cs <-------- [interrupt!] Dequeue(^q^) ddaze
e
16:37:42 867 34_cs (P)___ enqueue begin [e]
16:37:42 868 34_cs channel.open [0] begin
16:37:42 868 34_cs connection.open begin
16:37:42 882 34_cs connection.open end
16:37:42 884 34_cs channel.open [0] end
16:37:42 884 34_cs (P)___ enqueue end
16:37:42 884 34_cs connection.close [0] begin
16:37:42 884 34_cs channel.close [0] begin
16:37:42 886 34_cs channel.close [0] end
16:37:42 887 34_cs connection.close [0] end
Enqueue? > 16:37:42 910 34_cs <-------- [interrupt!] Dequeue(^q^) edaze
f
16:37:43 427 34_cs (P)___ enqueue begin [f]
16:37:43 428 34_cs channel.open [0] begin
16:37:43 428 34_cs connection.open begin
16:37:43 433 34_cs connection.open end
16:37:43 434 34_cs channel.open [0] end
16:37:43 434 34_cs (P)___ enqueue end
16:37:43 434 34_cs connection.close [0] begin
16:37:43 434 34_cs channel.close [0] begin
16:37:43 436 34_cs channel.close [0] end
16:37:43 437 34_cs connection.close [0] end
Enqueue? > 16:37:43 463 34_cs <-------- [interrupt!] Dequeue(^q^) fdaze
じゃあ、これで。
ソースコード
tamesi34_cs.cs
// OS : Windows 10
// IDE : Visual Studio 2015
// Install : NuGet : Install-Package RabbitMQ.Client -Version 4.1.1
//
// OS : Ubuntu 16.04
// Service : RabbitMQ
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
// : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
// : Manual : Command : man rabbitmqctl
// : Start : Command : (1) rabbitmq-server
// : (2) [Ctrl] + [Z]
// : (3) bg (Job Number)
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : Command : rabbitmqctl list_queues
// : // Count messages in queues.
//
// Program : this
// : Author : Satoshi TAKAHASHI (Handle. Muzudho)
// : License : MIT License https://opensource.org/licenses/MIT
// : Explain : For test. Practice of interprocess communication.
// : This program run on the background side.
// : Delete queue,
// : Or Send message to message queue.
// : Interrupt received message from queue.
// : Please settings source queue, destination queue to command line argument.
// : Explain : command line argument of life span of queue (Compositable)
// : durable The queue exists, Even if the RabbitMQ service closed.
// : autodelete The queue delete when consumers to be nothing.
// : passive Just check what queue exists. Don't use read/write to queue.
// : exclusive The queue delete when connection closed. The queue exists is this connection only.
// : Compile : Command : mcs /r:RabbitMQ.Client.dll -define:UBUNTU tamesi34_cs.cs
// : Explain : mcs Mono c# compiler
// : /r:RabbitMQ.Client.dll Append reference RabbitMQ
// : -define:UBUNTU #define UBUNTU
// : tamesi34_cs.cs Source file
// : Execute : Command : ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
// : // Run on the foreground.
// : Explain : ./tamesi34_cs.exe Executable file
// : 2> ./tamesi34_cs.err.log Standard error to log file
// : Reference : Web site: QueueDeclare (v1.0) http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
// : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
// : QueueDelete (v1.4) https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.QueueDelete(System.UInt16,System.String,System.Boolean,System.Boolean,System.Boolean)
// : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
// : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
//
//--------------------------------------------------------------------------------
// tamesi34_cs.cs
// Ubuntu version. Source code is old. API is difference.
// #define UBUNTU
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
namespace UsagiMQ
{
/// <summary>
/// This number is this program only. AMQP-CPP implements is difference.
/// AMQP-CPP implements(C++) : AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
/// </summary>
[Flags]
enum LifeSpanType
{
Durable = 1,
Autodelete = 0x1<<1,
Passive = 0x1 << 2,
Exclusive = 0x1 << 3
}
class Program
{
static string Dump()
{
StringBuilder sb = new StringBuilder();
sb.AppendLine("tamesi34_cs.cs Dump");
sb.AppendLine(" name_queues [ENQUEUE_INDEX] =[" + name_queues[ENQUEUE_INDEX]+"]");
sb.AppendLine(" lifeSpan_queues [ENQUEUE_INDEX] =[" + lifeSpan_queues[ENQUEUE_INDEX] + "]");
sb.AppendLine(" durable_lifeSpans [ENQUEUE_INDEX] =[" + durable_lifeSpans[ENQUEUE_INDEX] + "]");
sb.AppendLine(" autodelete_lifeSpans[ENQUEUE_INDEX] =[" + autodelete_lifeSpans[ENQUEUE_INDEX] + "]");
sb.AppendLine(" passive_lifeSpans [ENQUEUE_INDEX] =[" + passive_lifeSpans[ENQUEUE_INDEX] + "]");
sb.AppendLine(" exclusive_lifeSpans [ENQUEUE_INDEX] =[" + exclusive_lifeSpans[ENQUEUE_INDEX] + "]");
sb.AppendLine(" ----");
sb.AppendLine(" name_queues [DEQUEUE_INDEX] =[" + name_queues[DEQUEUE_INDEX] + "]");
sb.AppendLine(" lifeSpan_queues [DEQUEUE_INDEX] =[" + lifeSpan_queues[DEQUEUE_INDEX] + "]");
sb.AppendLine(" durable_lifeSpans [DEQUEUE_INDEX] =[" + durable_lifeSpans[DEQUEUE_INDEX] + "]");
sb.AppendLine(" autodelete_lifeSpans[DEQUEUE_INDEX] =[" + autodelete_lifeSpans[DEQUEUE_INDEX] + "]");
sb.AppendLine(" passive_lifeSpans [DEQUEUE_INDEX] =[" + passive_lifeSpans[DEQUEUE_INDEX] + "]");
sb.AppendLine(" exclusive_lifeSpans [DEQUEUE_INDEX] =[" + exclusive_lifeSpans[DEQUEUE_INDEX] + "]");
sb.AppendLine(" ----");
sb.AppendLine(" name_queues [DELETEQUEUE_INDEX]=[" + name_queues[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" lifeSpan_queues [DELETEQUEUE_INDEX]=[" + lifeSpan_queues[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" durable_lifeSpans [DELETEQUEUE_INDEX]=[" + durable_lifeSpans[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" autodelete_lifeSpans[DELETEQUEUE_INDEX]=[" + autodelete_lifeSpans[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" passive_lifeSpans [DELETEQUEUE_INDEX]=[" + passive_lifeSpans[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" exclusive_lifeSpans [DELETEQUEUE_INDEX]=[" + exclusive_lifeSpans[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" ----");
return sb.ToString();
}
static string GetNow()
{
return DateTime.Now.ToString("HH:mm:ss fff ");
}
const int ENQUEUE_INDEX = 0;
const int DEQUEUE_INDEX = 1;
const int DELETEQUEUE_INDEX = 2;
const int NUM_INDEX = 3;
const string HOST_NAME = "localhost";
static string[] name_queues = new string[NUM_INDEX];
static LifeSpanType[] lifeSpan_queues = new LifeSpanType[NUM_INDEX];
static bool[]
durable_lifeSpans = new bool[NUM_INDEX],
autodelete_lifeSpans = new bool[NUM_INDEX],
passive_lifeSpans = new bool[NUM_INDEX],
exclusive_lifeSpans = new bool[NUM_INDEX];
static void SetLifeSpan(int index_queue, string name_queue, LifeSpanType lifeSpan)
{
name_queues[index_queue] = name_queue;
lifeSpan_queues[index_queue] = lifeSpan;
// 一旦クリアー
durable_lifeSpans[index_queue] = false;
autodelete_lifeSpans[index_queue] = false;
passive_lifeSpans[index_queue] = false;
exclusive_lifeSpans[index_queue] = false;
// durable
if (((int)lifeSpan & (int)LifeSpanType.Durable) == (int)LifeSpanType.Durable) { durable_lifeSpans[index_queue] = true; }
// autodelete
if (((int)lifeSpan & (int)LifeSpanType.Autodelete) == (int)LifeSpanType.Autodelete) { autodelete_lifeSpans[index_queue] = true; }
// passive
if (((int)lifeSpan & (int)LifeSpanType.Passive) == (int)LifeSpanType.Passive) { passive_lifeSpans[index_queue] = true; }
// exclusive
if (((int)lifeSpan & (int)LifeSpanType.Exclusive) == (int)LifeSpanType.Exclusive) { exclusive_lifeSpans[index_queue] = true; }
}
public static ConnectionFactory GetFactory()
{
if (null == m_factory_)
{
m_factory_ = new ConnectionFactory() { HostName = HOST_NAME };
}
return m_factory_;
}
static ConnectionFactory m_factory_;
public static IConnection GetConnection(int index)
{
if (null == m_connection_[index])
{
Console.WriteLine(GetNow()+"34_cs connection.open begin");
m_connection_[index] = GetFactory().CreateConnection();
Console.WriteLine(GetNow() + "34_cs connection.open end");
}
return m_connection_[index];
}
static IConnection[] m_connection_ = new IConnection[NUM_INDEX];
public static IModel GetChannel(int index)
{
if (null == m_channels_[index])
{
Console.WriteLine(GetNow() + "34_cs channel.open [" + index + "] begin");
m_channels_[index] = GetConnection(index).CreateModel();
// 引数が7つの QueueDeclare でエラーを吐くことがあるので、情報を出力しておく。
//Console.Error.WriteLine(Dump());
//Console.Error.WriteLine("(tamesi34_cs.cs?) GetChannel index = [" + index + "]");
//Console.Error.WriteLine(" name_queues[index] = [" + name_queues[index] + "]");
//Console.Error.WriteLine(" passive_lifeSpans[index] = [" + passive_lifeSpans[index] + "]");
//Console.Error.WriteLine(" durable_lifeSpans[index] = [" + durable_lifeSpans[index] + "]");
//Console.Error.WriteLine(" exclusive_lifeSpans[index] = [" + exclusive_lifeSpans[index] + "]");
//Console.Error.WriteLine(" autodelete_lifeSpans[index] = [" + autodelete_lifeSpans[index] + "]");
//Console.Error.WriteLine(" nowait は仮に false 固定");
//Console.Error.WriteLine(" arguments は仮に null 固定");
#if UBUNTU
// Ubuntuでは何故か Spring.Messaging.Amqp.Rabbit の引数 7 つのやつになっている。
// RabbitMQ.Client.Framing.Impl.v0_8.Model.QueueDeclare (System.String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Boolean nowait, IDictionary arguments)
// そして false になっているようだ。
m_channels_[index].QueueDeclare(name_queues[index], passive_lifeSpans[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], false, null);
//m_channels_[index].QueueDeclare(name_queues[index], false, false, false, false, false, null);
#else
m_channels_[index].QueueDeclare(name_queues[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], null);
#endif
Console.WriteLine(GetNow() + "34_cs channel.open [" + index + "] end");
}
return m_channels_[index];
}
static IModel[] m_channels_ = new IModel[NUM_INDEX];
public static EventingBasicConsumer GetConsumer(int index)
{
if (null == m_consumers_[index])
{
Console.WriteLine(GetNow() + "34_cs consumer.create [" + index + "] begin");
#if UBUNTU
// Ubuntuでは何故か v1.4.0 の引数が 0 個のやつになっている。調べたが引数が1個~6個のものは無かった。
m_consumers_[index] = new EventingBasicConsumer();
#else
m_consumers_[index] = new EventingBasicConsumer(GetChannel(index));
#endif
Console.WriteLine(GetNow() + "34_cs consumer.create [" + index + "] end");
}
return m_consumers_[index];
}
static EventingBasicConsumer[] m_consumers_ = new EventingBasicConsumer[NUM_INDEX];
/// <summary>
/// コンシューマーが準備できたときに割り込んでくる処理
/// </summary>
#if UBUNTU
public static ConsumerEventHandler GetRegisteredHandler()
#else
public static EventHandler<ConsumerEventArgs> GetRegisteredHandler()
#endif
{
if (null == m_registeredHandler_)
{
#if UBUNTU
m_registeredHandler_ = new ConsumerEventHandler((model, ea) =>
#else
m_registeredHandler_ = new EventHandler<ConsumerEventArgs>((model, ea) =>
#endif
{
Console.WriteLine(GetNow() + "34_cs <-------- [interrupt!] Registered(^q^)");
flag_registered = true;
});
}
return m_registeredHandler_;
}
#if UBUNTU
static ConsumerEventHandler m_registeredHandler_;
#else
static EventHandler<ConsumerEventArgs> m_registeredHandler_;
#endif
static bool flag_registered;
/// <summary>
/// 受信できたときに割り込んでくる処理
/// </summary>
#if UBUNTU
public static BasicDeliverEventHandler GetDequeueHandler()
#else
public static EventHandler<BasicDeliverEventArgs> GetDequeueHandler()
#endif
{
if (null == m_dequeueHandler_)
{
#if UBUNTU
m_dequeueHandler_ = new BasicDeliverEventHandler((model, ea) =>
#else
m_dequeueHandler_ = new EventHandler<BasicDeliverEventArgs>((model, ea) =>
#endif
{
byte[] body = ea.Body;
string message = Encoding.UTF8.GetString(body);
Console.WriteLine(GetNow() + "34_cs <-------- [interrupt!] Dequeue(^q^) {0}", message);
});
}
return m_dequeueHandler_;
}
#if UBUNTU
static BasicDeliverEventHandler m_dequeueHandler_;
#else
static EventHandler<BasicDeliverEventArgs> m_dequeueHandler_;
#endif
/// <summary>
/// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
/// </summary>
static void CloseConnection(int index)
{
if (null != m_connection_[index])
{
Console.WriteLine(GetNow() + "34_cs connection.close [" + index + "] begin");
// フラグとして、ヌルを入れる
CloseChannel(index);
m_connection_[index].Close();
m_connection_[index] = null;
Console.WriteLine(GetNow() + "34_cs connection.close [" + index + "] end");
}
}
/// <summary>
/// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
/// </summary>
static void CloseChannel(int index)
{
if (null != m_channels_[index])
{
Console.WriteLine(GetNow() + "34_cs channel.close [" + index + "] begin");
m_channels_[index].Close();
m_channels_[index] = null;
Console.WriteLine(GetNow() + "34_cs channel.close [" + index + "] end");
}
}
static void Main(string[] args)
{
//----------------------------------------
// Delete
//----------------------------------------
for (;;)
{
Console.Write(@"削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
Name : キューを削除します
"""" + [Enter] : 次のステップへ進む
ex.) BrokenBox
Delete ? > ");
string queueName_delete = Console.ReadLine();
if (""== queueName_delete.Trim())
{
break;
}
Console.Write(@"削除するメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
("+ ((int)LifeSpanType.Durable) + @") durable : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Delete ? > ");
LifeSpanType lifeSpan_delete = (LifeSpanType)int.Parse(Console.ReadLine());
SetLifeSpan(DELETEQUEUE_INDEX, queueName_delete, lifeSpan_delete);
uint result = DeleteQueue();
Console.WriteLine(@"["+ queueName_delete + "]キューを削除したはずだぜ☆(^~^) result=["+ result + "] ただし、中にメッセージが残っていると消すのに失敗するみたいだが。");
}
//----------------------------------------
// Enqueue settings
//----------------------------------------
for (;;)
{
Console.Write(@"エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) SunnyBox
Enqueue ? > ");
string queueName_enqueue = Console.ReadLine();
Console.Write(@"エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(" + ((int)LifeSpanType.Durable) + @") durable : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive : この接続でだけ使える。この接続が切れたら消す
ex.) 3
Enqueue ? > ");
int lifeSpan_enqueue;
if(int.TryParse(Console.ReadLine(),out lifeSpan_enqueue))
{
SetLifeSpan(ENQUEUE_INDEX, queueName_enqueue, (LifeSpanType)lifeSpan_enqueue);
break;
}
}
//----------------------------------------
// Enqueue settings
//----------------------------------------
EventingBasicConsumer consumer = null;
for (;;)
{
Console.Write(@"デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > ");
string queueName_dequeue = Console.ReadLine();
Console.Write(@"デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(" + ((int)LifeSpanType.Durable) + @") durable : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > ");
int lifeSpan_dequeue;
if(int.TryParse(Console.ReadLine(),out lifeSpan_dequeue))
{
SetLifeSpan(DEQUEUE_INDEX, queueName_dequeue, (LifeSpanType)lifeSpan_dequeue);
break;
}
}
StartConsume(out consumer);
Console.WriteLine(GetNow() + "34_cs (P)___ waitRegistered begin");
while (!flag_registered)
{
Thread.Sleep(20);
}
Console.WriteLine(GetNow() + "34_cs (P)___ waitRegistered end");
Console.Write(@"終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > ");
for (;;)
{
// "Hello World!" などを入力
string line = Console.ReadLine();
Enqueue(line);
Console.Write(@"Enqueue? > ");
}
// ここには来ない
// CloseConnection();
}
static uint DeleteQueue()
{
IModel channel = GetChannel(DELETEQUEUE_INDEX);
#if UBUNTU
// Ubuntuでは何故か 昔の .Net 用の引数 5 つのやつの変則版になっている。
// uint QueueDelete(ushort ticket, string queue, bool ifUnused, bool ifEmpty, bool nowait)
uint result = channel.QueueDelete( name_queues[DELETEQUEUE_INDEX],true,true,true);
#else
uint result = channel.QueueDelete(name_queues[DELETEQUEUE_INDEX],true,true);
#endif
// 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
CloseConnection(DELETEQUEUE_INDEX);
//CloseChannel(DELETEQUEUE_INDEX);
return result;
}
static void Enqueue(string message)
{
byte[] body = Encoding.UTF8.GetBytes(message);
Console.WriteLine(GetNow() + "34_cs (P)___ enqueue begin [" + message + "]");
GetChannel(ENQUEUE_INDEX).BasicPublish("", name_queues[ENQUEUE_INDEX], null, body);
Console.WriteLine(GetNow() + "34_cs (P)___ enqueue end");
// 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
CloseConnection(ENQUEUE_INDEX);
//CloseChannel(ENQUEUE_INDEX);
}
static void StartConsume(out EventingBasicConsumer consumer)
{
Console.WriteLine(GetNow() + "34_cs ___(C) start begin");
consumer = GetConsumer(DEQUEUE_INDEX); // ローカル変数だとスコープが切れて寿命が尽きてしまうので、外側に返す
// 受信できたときに割り込んでくる処理
consumer.Received += GetDequeueHandler();
consumer.Registered += GetRegisteredHandler();
#if UBUNTU
// Ubuntuでは何故か引数が 6 個のやつに似ている。
// string BasicConsume(ushort ticket, string queue, bool noAck, string consumerTag, IDictionary filter, IBasicConsumer consumer)
GetChannel(DEQUEUE_INDEX).BasicConsume( name_queues[DEQUEUE_INDEX], true, "", null, consumer);
#else
GetChannel(DEQUEUE_INDEX).BasicConsume( name_queues[DEQUEUE_INDEX], true, consumer);
#endif
// 終了はさせない
// consumer.Received -= GetReceiveHandler();
// CloseChannel(DEQUEUE_INDEX);
Console.WriteLine(GetNow() + "34_cs ___(C) start end");
}
}
}
tamesi35a2_cpp.cpp
//--------------------------------------------------------------------------------
// OS : Windows10 : It not work. This program for Ubuntu 16.04.
//
// OS : Ubuntu 16.04
// Library : libev
// : Install : Command : sudo apt-get update
// : sudo apt-get install libev-dev
// Service : RabbitMQ
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
// : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
// : Manual : Command : man rabbitmqctl
// : Start : Command : (1) rabbitmq-server
// : (2) [Ctrl] + [Z]
// : (3) bg (Job Number)
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : : Command : rabbitmqctl list_queues
// : // Count messages in queues.
// Library : AMQP-CPP
// : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
// : Reference : Web site : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
// : Reference : Web site : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
// : Reference : Web site : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
// : Reference : Web site : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
// : Reference : Web site : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//
// Program : this
// : Author : Satoshi TAKAHASHI (Handle. Muzudho)
// : License : MIT License https://opensource.org/licenses/MIT
// : Explain : For test. Practice of interprocess communication.
// : This program run on the background side.
// : Append daze to received message on interprocess communication.
// : Daze meaning is dialect in the suburbs of Tokyo.
// : Please settings source queue, destination queue to command line argument.
// : And standard input to /dev/null
// : Compile : Command : g++ -std=c++11 tamesi35a2_cpp.cpp -o tamesi35a2_cpp.exe -lev -lamqpcpp -pthread
// : Explain : g++ C++ Compiler
// : -std=c++11 C++ 11
// : tamesi35a2_cpp.cpp Source file
// : -o tamesi35a2_cpp.exe Output executable file
// : -lev Used library libev
// : -lamqpcpp Used library AMQP-CPP
// : -pthread Used library thread
// : Execute : Command : ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
// : Run on the background.
// : Explain : ./tamesi35a2_cpp.exe Executable file
// : --enqueue Destination queue settings section
// : 1113 Destination queue name (string ok)
// : durable Destination queue life span (multiple)
// : --dequeue Source queue settings section
// : 1112 Source queue name (string ok)
// : durable Source queue life span
// : autodelete Source queue life span
// : > ./tamesi35a2_cpp.out.log Standard output to ./tamesi35a2_cpp.out.log
// : 2> ./tamesi35a2_cpp.err.log Standard error to ./tamesi35a2_cpp.err.log
// : < /dev/null Standard input from nothing
// : & Process goto the background
// : Explain : command line argument of life span of queue (Compositable)
// : durable The queue exists, Even if the RabbitMQ service closed.
// : autodelete The queue delete when consumers to be nothing.
// : passive Just check what queue exists. Don't use read/write to queue.
// : exclusive The queue delete when connection closed. The queue exists is this connection only.
// : Example : Windows 10 (This program not work) default queue life span. durable
// : Ubuntu 16.04 default queue life span. durable autodelete
// : Stop : Typing : [Ctrl]+[C]
// : Referrence: ラムダ式(C++11) (cpprefjp - C++日本語リファレンス) https ://cpprefjp.github.io/lang/cpp11/lambda_expressions.html
//--------------------------------------------------------------------------------
#include <string> // std::string
#include <iostream> // std::cout
#include <iomanip> // std::setfill(...)
#include <sstream> // std::ostringstream
#include <chrono>
#include <thread> // sleep_for
#include <future> // thread
// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
// AMQP-CPPでの実装 :
// AMQP::durable=[1] RabbitMQが止まってもキューを残す
// AMQP::autodelete=[2] コンシューマーが1人も接続していなかったら消す
// AMQP::passive=[8] キューが存在するかどうかチェックするだけ。中身見ない時これ
// AMQP::exclusive=[512] この接続でだけ使える。この接続が切れたら消す
typedef int LifeSpan_t;
static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
// 0 : enqueue用設定
// 1 : dequeue用設定
static const int ENQUEUE_INDEX = 0;
static const int DEQUEUE_INDEX = 1;
static const int NUM_INDEX = 2;
static std::string name_queues[] = { "", "" };
static LifeSpan_t lifeSpan_queues[] = { (LifeSpan_t)0, (LifeSpan_t)0 };
static LifeSpan_t LifeSpanString_To_Int(std::string lifeSpan)
{
if ("durable"==lifeSpan) {
return AMQP::durable;
} else if ("autodelete" == lifeSpan) {
return AMQP::autodelete;
} else if ("passive" == lifeSpan) {
return AMQP::passive;
} else if ("exclusive" == lifeSpan) {
return AMQP::exclusive;
} else {
std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
exit(1);
}
}
std::string exchange_name = "myexchange";
std::string routing_key = "";
static std::string Dump()
{
static std::ostringstream sb;
sb << "tamesi35a2_cs.cs Dump" << std::endl
<< " name_queues [ENQUEUE_INDEX] =[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
<< " lifeSpan_queues [ENQUEUE_INDEX] =[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl
<< " ----" << std::endl
<< " name_queues [DEQUEUE_INDEX] =[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl
<< " lifeSpan_queues [DEQUEUE_INDEX] =[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl
<< " ----" << std::endl;
return sb.str();
}
// 時計はずれているが、現在時刻が返ってくる
static std::string getTimeslipNow()
{
// どこか外国の 1969年12月31日19時ぐらいから数えたミリ秒だろうか?
std::chrono::milliseconds ms = std::chrono::duration_cast< std::chrono::milliseconds >(
std::chrono::system_clock::now().time_since_epoch()
);
long long hours = ms.count() / 1000 / 60 / 60 % 24;
long long minutes = ms.count() / 1000 / 60 % 60;
long long seconds = ms.count() / 1000 % 60;
long long milliseconds = ms.count() % 1000;
std::ostringstream oss;
oss << std::setfill('0') << std::setw(2) << hours << ":" << std::setfill('0') << std::setw(2) << minutes << ":" << std::setfill('0') << std::setw(2) << seconds << "'" << std::setfill('0') << std::setw(3) << milliseconds << " ";
return oss.str();
}
// 接続はシングルトンにします
static struct ev_loop* pLoop_ev[2] = { ev_loop_new(), ev_loop_new() };
static AMQP::LibEvHandler* pHandler_ev[2] = { new AMQP::LibEvHandler(pLoop_ev[ENQUEUE_INDEX]), new AMQP::LibEvHandler(pLoop_ev[DEQUEUE_INDEX]) };
static AMQP::TcpConnection* pConnection_ev[2] = { nullptr,nullptr };
static AMQP::TcpConnection* getConnection(int index)
{
if (nullptr== pConnection_ev[index]) {
std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] connection.open begin" << std::endl;
pConnection_ev[index] = new AMQP::TcpConnection( pHandler_ev[index], ADDRESS );
std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] connection.open end" << std::endl;
}
return pConnection_ev[index];
}
static void closeChannel(int index);
static void closeConnection(int index)
{
if (nullptr != pConnection_ev[index]) {
std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] connection.close begin" << std::endl;
// チャンネルにもヌルのフラグを入れる
closeChannel(index);
pConnection_ev[index]->close();
pConnection_ev[index] = nullptr;
std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] connection.close end" << std::endl;
}
}
// チャンネルはシングルトンにします。
static AMQP::TcpChannel* pChannel_ev[2] = { nullptr,nullptr };
static AMQP::TcpChannel* getChannel(int index)
{
if (nullptr== pChannel_ev[index])
{
std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] channel.open begin" << std::endl;
pChannel_ev[index] = new AMQP::TcpChannel(getConnection(index));
std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] channel.open end" << std::endl;
}
return pChannel_ev[index];
}
static void closeChannel(int index)
{
if (nullptr != pChannel_ev[index]) {
std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] channel.close begin" << std::endl;
pChannel_ev[index]->close();
pChannel_ev[index] = nullptr;
std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] channel.close end" << std::endl;
}
}
/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
const int bufferSize = 1024;
static std::string buffer[bufferSize] = {};
static int bufferCursors[2] = { 0, 0 };
const int PUT_INDEX = 0;
const int GET_INDEX = 1;
std::mutex _mutex;
static void putMessage(std::string message)
{
std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.putMessage begin [" << bufferCursors[PUT_INDEX] << "] before lock [" << message << "]" << std::endl;
std::lock_guard<std::mutex> lock(_mutex);
buffer[bufferCursors[PUT_INDEX]] = message;
std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.putMessage begin [" << bufferCursors[PUT_INDEX] << "] [" << message << "]" << std::endl;
bufferCursors[PUT_INDEX]++;
if (!(bufferCursors[PUT_INDEX] < bufferSize))
{
bufferCursors[PUT_INDEX] = 0;
}
std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.putMessage end next=[" << bufferCursors[PUT_INDEX] << "]" << std::endl;
}
static std::string removeMessage()
{
//std::cout << "35a2_cpp rotationBuffer.removeMessage begin [" << bufferCursors[GET_INDEX] << "] before lock" << std::endl;
std::lock_guard<std::mutex> lock(_mutex);
if ("" != buffer[bufferCursors[GET_INDEX]])
{
std::string message = buffer[bufferCursors[GET_INDEX]];
std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.removeMessage begin [" << bufferCursors[GET_INDEX] << "] [" << message << "]" << std::endl;
buffer[bufferCursors[GET_INDEX]] = "";
bufferCursors[GET_INDEX]++;
if (!(bufferCursors[GET_INDEX] < bufferSize))
{
bufferCursors[GET_INDEX] = 0;
}
std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.removeMessage end return=[" << message << "] next=[" << bufferCursors[GET_INDEX] << "]" << std::endl;
return message;
}
return "";
}
}
static void sendMessage()
{
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_ begin" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_.declareQueue begin" << std::endl;
getChannel(ENQUEUE_INDEX)->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan_queues[ENQUEUE_INDEX])
.onError([](const char* errMsg) {
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.SM_.declareQueue.onError [" << errMsg << "] queue name=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan=[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << "\n";
});
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_.declareQueue end" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_.bindQueue begin" << std::endl;
getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
// ev_run から1回だけ呼び出される。接続を切るとev_runから抜ける。コミットするにはこのハンドラから抜けること。
.onSuccess([]() {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ begin" << std::endl;
bool infinityLoop = true;
bool enable = true;
while (infinityLoop)
{
if (enable) {
// 回転式バッファーからメッセージを切り取る。
std::string message = rotationBuffer::removeMessage();
if ("" != message)
{
// 末尾に daze を付ける。
message += "daze";
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[" << message << "] ****important**** " << std::endl;
// トランザクション開始
getChannel(ENQUEUE_INDEX)->startTransaction();
// 以後、このループは空回りさせる
enable = false;
// パブリッシュ開始
if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if begin" << std::endl;
// コミットを開始
getChannel(ENQUEUE_INDEX)->commitTransaction()
.onSuccess([=]() {
// コミット成功
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [" << message << "] **********important**********" << std::endl;
// 接続を切ったあと、トランザクションのコミットが実行されるようだ。
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection begin" << std::endl;
closeConnection(ENQUEUE_INDEX);
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ closeConnection end" << std::endl;
})
.onError([&message](const char *errMsg) {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError [" << message << "]" << std::endl;
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
});
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ if end" << std::endl;
}
else {
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else begin" << std::endl;
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
getChannel(ENQUEUE_INDEX)->rollbackTransaction();
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ else end" << std::endl;
}
// 無限ループから出て、このハンドラから抜けよう。そのあと ev_run が処理の続き(コミットの続き等)を行う
infinityLoop = false;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ end" << std::endl;
}).onError([](const char* errMsg) {
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
});
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_.bindQueue end" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_ ev_run begin" << std::endl;
// ここで、バインドキューのオンサクセスの処理へ。
ev_run(pLoop_ev[ENQUEUE_INDEX], 0);
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_ ev_run end" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_ end" << std::endl;
}
// メッセージ・キューの送信を担当
static void workProduce()
{
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP begin" << std::endl;
// おっきな無限ループ
for (;;) {
sendMessage();
}
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP outOfInfinityLoop error" << std::endl;
}
// メッセージ・キューの監視を開始
static void workConsume()
{
std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ begin" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp ___(C) DQ_ begin" << std::endl;
// I will go to the front of the box named "1111".
LifeSpan_t lifeSpan = lifeSpan_queues[DEQUEUE_INDEX];
getChannel(DEQUEUE_INDEX)->declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);
std::cout << getTimeslipNow() << "35a2_cpp ___(C) DQ_ end" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS begin" << std::endl;
// I look inside the box.
auto errorCb = [&lifeSpan](const char *errMsg) {
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp ___(C) CNS.onError My ID watching failed [" << errMsg << "] lifeSpan=[" << lifeSpan << "]" << std::endl;
};
auto messageCb = [](const AMQP::Message &amqpMessage, uint64_t deliveryTag, bool redelivered) {
std::string message(amqpMessage.body(), amqpMessage.bodySize());
std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received begin [" << message << "]" << std::endl;
rotationBuffer::putMessage(message);
std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received.ack begin [" << deliveryTag << "] [" << message << "]" << std::endl;
getChannel(DEQUEUE_INDEX)->ack(deliveryTag);
std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received.ack end [" << deliveryTag << "]" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received end [" << message << "]" << std::endl;
};
getChannel(DEQUEUE_INDEX)->consume(name_queues[DEQUEUE_INDEX])
.onReceived(messageCb)
.onError(errorCb);
std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS end" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ ev_run begin" << std::endl;
// I will keep on forever.
ev_run(pLoop_ev[DEQUEUE_INDEX]);
std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ ev_run end" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ end" << std::endl;
// I will not come here.
return;
}
int main(int argc, char* argv[])
{
// 引数の解析
if(0<argc)
{
// プログラム名を省き、コマンドライン引数だけをつなげる。
std::string cmdArg;
for (int i = 1; i < argc; ++i)
{
cmdArg += std::string(argv[i]);
if (i < argc) {
cmdArg += " ";
}
}
std::istringstream data(cmdArg);
std::cout << getTimeslipNow() << "35a2_cpp main commandLineArguments=[" << data.str() << "]" << std::endl;
// 与件
// 「--enqueue 1117 durable autodelete --dequeue 1116 durable autodelete」
// 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」
// 受け皿
// name_queues 結果はこれらの配列に入れる
// lifeSpan_queues
// 記憶
int m0 = -1; // enqueue or dequeue index.
int m1 = -1; // lifespans index.
// 解析器
std::string a;
while (data >> a) {
if ("--enqueue" == a) { m0 = ENQUEUE_INDEX; m1 = -1; }
else if ("--dequeue" == a) { m0 = DEQUEUE_INDEX; m1 = -1; }
else if (-1 == m1 && 0 == m0) { // エンキュー先キュー名
name_queues[m0] = a;
lifeSpan_queues[m0] = 0;
m1++;
}
else if (-1 == m1 && 1 == m0) { // デキュー元キュー名
name_queues[m0] = a;
lifeSpan_queues[m0] = 0;
m1++;
}
else { lifeSpan_queues[m0] |= LifeSpanString_To_Int(a); }
}
if ("" == name_queues[ENQUEUE_INDEX] || "" == name_queues[DEQUEUE_INDEX]) {
goto gt_CommandlineParameterError;
}
std::cout << getTimeslipNow() << "35a2_cpp main dump begin" << std::endl;
std::cout << getTimeslipNow() << Dump();
std::cout << getTimeslipNow() << "35a2_cpp main dump end" << std::endl;
}
else
{
goto gt_CommandlineParameterError;
}
// キューの監視スレッド、メッセージの送信スレッドをスタート
{
auto th1 = std::thread([] { workConsume(); });
auto th2 = std::thread([] { workProduce(); });
for (;;) // このメイン・スレッドはずっと寝てる
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
// このプログラムは、自分では接続を切らないので、ここから下には来ない
th2.join();
th1.join();
}
return 0;
gt_CommandlineParameterError:
std::cerr << getTimeslipNow() << "コマンドライン引数の「--enqueue 送信先キュー名 寿命」「--dequeue 受信元キュー名 寿命」を漏れなく指定してください。" << std::endl
<< "例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete" << std::endl
<< "寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」" << std::endl
<< "queueNames[0]=[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
<< "queueNames[1]=[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl;
exit(1);
}