前回の記事 : http://qiita.com/muzudho1/items/90eec636fc1664dbfcee
これでどうか。
// メッセージ・キューの送信を担当
static void workProduce()
{
std::cout << "(^▽^)エンキュー開始だぜ☆" << std::endl;
std::string exchange_name = "myexchange";
std::string routing_key = "";
int lifeSpan = LifeSpanString_To_AmqpInt(lifeSpan_queues[ENQUEUE_INDEX]);
getChannel(ENQUEUE_INDEX)->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
.onError([&lifeSpan](const char* errMsg) {
std::cerr << "(>_<)enqueue: error declaring queue: [" << errMsg << "] queue name=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan=[" << lifeSpan << "]" << "\n";
});
std::cout << "(^▽^)デクレア・キューしたぜ☆" << std::endl;
getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
.onSuccess([&exchange_name, &routing_key]() {
std::string message = rotationBuffer::removeMessage();
std::cout << "(^q^) workProduce : メッセージを受け取ったか? message=[" << message << "]" << std::endl;
// 末尾に daze を付ける。
message += "daze";
std::cout << "(^▽^)workProduce : これからパブリッシュするぜ☆" << std::endl;
if (!getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cerr << "failed to publish?\n";
}
std::cout << "(^▽^)パブリッシュしたぜ☆" << std::endl;
// break in ev loop.
//closeConnection(ENQUEUE_INDEX);
//closeChannel(ENQUEUE_INDEX);
});
std::cout << "(^▽^)バインド・キューしたぜ☆" << std::endl;
std::cout << "(^q^) workProduce : メッセージ監視ループに入るぜ☆" << std::endl;
for (;;)
{
if (rotationBuffer::hasMessage())
{
std::cout << "(^q^) workProduce : メッセージがあるぜ☆ [" << rotationBuffer.getMessage() << "]" << std::endl;
// ここで、バインドキューのオンサクセスの処理へ。
ev_run(pLoop_ev[ENQUEUE_INDEX]);
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
std::cout << "(>_<) workProduce : メッセージ監視ループを出てしまったぜ☆?" << std::endl;
}
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^)レシーブしたぜ☆ myString=[imo]
(^◇^)回転式バッファー : putMessage : [1] 入れた☆! message=[imo]
(^◇^)回転式バッファー : putMessage : 次回は[2] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[2]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[2]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
なんでローテーションしてないんだ?
}).onError([](const char* errMsg) {
std::cerr << "(>_<) バインドキュー : error declaring queue: [" << errMsg << "]" << "\n";
});
これを付けても エラー・ログには出ていない。
(^◇^)回転式バッファー : ゲットメッセージ : [2] 出した☆! message=[kame]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[3] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[kame]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^)レシーブしたぜ☆ myString=[usagi]
(^◇^)回転式バッファー : putMessage : [3] 入れた☆! message=[usagi]
(^◇^)回転式バッファー : putMessage : 次回は[4] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[4]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[4]
(^◇^)回転式バッファー : ゲットメッセージ : [3] 出した☆! message=[usagi]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[4] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[usagi]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^)レシーブしたぜ☆ myString=[inu]
パブリッシュが成功するのは 接続を切ったときだったか。
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[4]
(^◇^)回転式バッファー : ゲットメッセージ : [3] 出した☆! message=[usagi]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[4] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[usagi]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^)レシーブしたぜ☆ myString=[inu]
(^◇^)回転式バッファー : putMessage : [4] 入れた☆! message=[inu]
(^◇^)回転式バッファー : putMessage : 次回は[5] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[5]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[5]
(^◇^)回転式バッファー : ゲットメッセージ : [4] 出した☆! message=[inu]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[5] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[inu]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^)レシーブしたぜ☆ myString=[neko]
(^◇^)回転式バッファー : putMessage : [5] 入れた☆! message=[neko]
(^◇^)回転式バッファー : putMessage : 次回は[6] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[6]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[6]
(^◇^)回転式バッファー : ゲットメッセージ : [5] 出した☆! message=[neko]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[6] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[neko]
パブリッシュできてないと思うが。
# rabbitmqctl list_queues
Listing queues ...
1112 0
1113 0
std::cout << "(^q^) workProduce : メッセージ監視ループに入るぜ☆" << std::endl;
// ここで、バインドキューのオンサクセスの処理へ。
//ev_run(pLoop_ev[ENQUEUE_INDEX]);
ev_run(pLoop_ev[ENQUEUE_INDEX], 0);
std::cout << "(^q^) workProduce : メッセージ監視ループを出たぜ☆ 再接続しようぜ☆" << std::endl;
引数 0 を付けてみるか。
変わり無し
接続が切れるのは1回?
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^)レシーブしたぜ☆ myString=[sika]
(^◇^)回転式バッファー : putMessage : [0] 入れた☆! message=[sika]
(^◇^)回転式バッファー : putMessage : 次回は[1] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[1]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[1]
(^◇^)回転式バッファー : ゲットメッセージ : [0] 出した☆! message=[sika]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[1] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[sika]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^▽^)これから接続を閉じるぜ☆ index=[0]
(^▽^)接続を閉じたぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
パブリッシュのあとに接続が切れるが、2回切れることがない。ずっと切れっぱなしなのか。
そして 1回もパブリッシュされてない。
ヌルのフラグ
static void closeConnection(int index)
{
if (nullptr != pConnection_ev[index]) {
// チャンネルにもヌルのフラグを入れる
closeChannel(index);
std::cout << "(^▽^)これから接続を閉じるぜ☆ index=[" << index << "]" << std::endl;
pConnection_ev[index]->close();
std::cout << "(^▽^)接続を閉じたぜ☆" << std::endl;
pConnection_ev[index] = nullptr;
}
}
チャンネルもヌルにしよう。
(^◇^)回転式バッファー : ゲットメッセージ : [1] 出した☆! message=[usagi]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[2] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[usagi]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)パブリッシュしたぜ☆
(^▽^)これからチャンネルを閉じるぜ☆ index=[0]
(^▽^)チャンネルを閉じたぜ☆
(^▽^)これから接続を閉じるぜ☆ index=[0]
(^▽^)接続を閉じたぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^)レシーブしたぜ☆ myString=[hituzi]
(^◇^)回転式バッファー : putMessage : [2] 入れた☆! message=[hituzi]
(^◇^)回転式バッファー : putMessage : 次回は[3] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[3]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[3]
(^◇^)回転式バッファー : ゲットメッセージ : [2] 出した☆! message=[hituzi]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[3] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[hituzi]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)パブリッシュしたぜ☆
(^▽^)これからチャンネルを閉じるぜ☆ index=[0]
(^▽^)チャンネルを閉じたぜ☆
(^▽^)これから接続を閉じるぜ☆ index=[0]
(^▽^)接続を閉じたぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
毎回 接続を閉じたり 開けたりするようになったが、パブリッシュは行われていない。
コンシューマー変数のスコープ
static void StartConsume()
{
IModel channel = GetChannel(DEQUEUE_INDEX);
EventingBasicConsumer consumer = GetConsumer(DEQUEUE_INDEX);
// 受信できたときに割り込んでくる処理
consumer.Received += GetDequeueHandler();
#if UBUNTU
// Ubuntuでは何故か引数が 5 個のやつになっている。
channel.BasicConsume( name_queues[DEQUEUE_INDEX], true, "", null, consumer);
#else
channel.BasicConsume( name_queues[DEQUEUE_INDEX], true, consumer);
#endif
// 終了はさせない
// consumer.Received -= GetReceiveHandler();
// CloseChannel(DEQUEUE_INDEX);
}
この consumer 変数は、ローカル変数でいいんだろうか?
std::cout << "(^▽^)bindQueue.onSuccess : これからトランザクションを開始するぜ☆" << std::endl;
getChannel(ENQUEUE_INDEX)->startTransaction();
std::cout << "(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆" << std::endl;
if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
getChannel(ENQUEUE_INDEX)->commitTransaction()
.onSuccess([]() {
std::cout << "(^▽^)コミット成功☆!" << std::endl;
})
.onError([](const char *errMsg) {
std::cout << "(>_<)コミット失敗☆ errMsg=[" << errMsg << "]" << std::endl;
});
std::cout << "(^▽^)パブリッシュしたぜ☆" << std::endl;
}else{
std::cout << "(/_\)パブリッシュできなかったぜ☆" << std::endl;
getChannel(ENQUEUE_INDEX)->rollbackTransaction();
}
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^)レシーブしたぜ☆ myString=[usagi]
(^◇^)回転式バッファー : putMessage : [1] 入れた☆! message=[usagi]
(^◇^)回転式バッファー : putMessage : 次回は[2] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[2]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[2]
(^◇^)回転式バッファー : ゲットメッセージ : [1] 出した☆! message=[usagi]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[2] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[usagi]
(^▽^)bindQueue.onSuccess : これからトランザクションを開始するぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^▽^)これからチャンネルを閉じるぜ☆ index=[0]
(^▽^)チャンネルを閉じたぜ☆
(^▽^)これから接続を閉じるぜ☆ index=[0]
(^▽^)接続を閉じたぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
コミットできてないのか。
(^q^)レシーブしたぜ☆ myString=[inu]
(^◇^)回転式バッファー : putMessage : [0] 入れた☆! message=[inu]
(^◇^)回転式バッファー : putMessage : 次回は[1] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[1]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[1]
(^◇^)回転式バッファー : ゲットメッセージ : [0] 出した☆! message=[inu]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[1] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[inu]
(^▽^)bindQueue.onSuccess : これからトランザクションを開始するぜ☆
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^▽^)これからチャンネルを閉じるぜ☆ index=[0]
(^▽^)チャンネルを閉じたぜ☆
(^▽^)これから接続を閉じるぜ☆ index=[0]
(^▽^)接続を閉じたぜ☆
(^q^)レシーブしたぜ☆ myString=[neko]
(^◇^)回転式バッファー : putMessage : [1] 入れた☆! message=[neko]
(^◇^)回転式バッファー : putMessage : 次回は[2] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[2]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[2]
(^◇^)回転式バッファー : ゲットメッセージ : [1] 出した☆! message=[neko]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[2] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[neko]
(^▽^)bindQueue.onSuccess : これからトランザクションを開始するぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^▽^)これからチャンネルを閉じるぜ☆ index=[0]
(^▽^)チャンネルを閉じたぜ☆
(^▽^)これから接続を閉じるぜ☆ index=[0]
(^▽^)接続を閉じたぜ☆
(^q^)レシーブしたぜ☆ myString=[saru]
(^◇^)回転式バッファー : putMessage : [2] 入れた☆! message=[saru]
(^◇^)回転式バッファー : putMessage : 次回は[3] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[3]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[3]
(^◇^)回転式バッファー : ゲットメッセージ : [2] 出した☆! message=[saru]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[3] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[saru]
(^▽^)bindQueue.onSuccess : これからトランザクションを開始するぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^▽^)これからチャンネルを閉じるぜ☆ index=[0]
(^▽^)チャンネルを閉じたぜ☆
(^▽^)これから接続を閉じるぜ☆ index=[0]
(^▽^)接続を閉じたぜ☆
(^q^)レシーブしたぜ☆ myString=[hituji]
これで パブリッシュできてないのか。
接続を1つにして、切断しないようにしてみるか
ev_run loop で構造体が被る。むり。
切断、チャンネル閉鎖ではなく、関数を抜けるか。
(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[1112] lifeSpan_queues[DEQUEUE_INDEX]=[3]
tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
(^q^)ダンプおわり
(^q^)コンシューム開始だぜ☆
(^q^)メインスレッドはずっと回ってるぜ☆
(^▽^)エンキュー開始だぜ☆
(^▽^)接続したぜ☆ index=[1]
(^▽^)チャンネル開いたぜ☆ index=[1]
(^q^)コンシュームは、デクレア・キューしたぜ☆
(^q^)コンシュームを設定したぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)workProduce : デクレア・キューしたぜ☆
(^▽^)workProduce : バインド・キューしたぜ☆
(^q^) workProduce : メッセージ監視ループに入るぜ☆
(^q^) bindQueue.onSuccess : 開始だぜ☆
(^q^)レシーブしたぜ☆ myString=[kame]
(^◇^)回転式バッファー : putMessage : [0] 入れた☆! message=[kame]
(^◇^)回転式バッファー : putMessage : 次回は[1] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[1]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[1]
(^◇^)回転式バッファー : ゲットメッセージ : [0] 出した☆! message=[kame]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[1] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[kame]
(^▽^)bindQueue.onSuccess : これからトランザクションを開始するぜ☆
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^q^) bindQueue.onSuccess : 終了だぜ☆
(^▽^)コミット成功☆!
(^q^)レシーブしたぜ☆ myString=[usagi]
(^◇^)回転式バッファー : putMessage : [1] 入れた☆! message=[usagi]
(^◇^)回転式バッファー : putMessage : 次回は[2] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[2]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[2]
(^q^)レシーブしたぜ☆ myString=[hituji]
(^◇^)回転式バッファー : putMessage : [2] 入れた☆! message=[hituji]
(^◇^)回転式バッファー : putMessage : 次回は[3] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[3]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[3]
(^q^)レシーブしたぜ☆ myString=[neko]
(^◇^)回転式バッファー : putMessage : [3] 入れた☆! message=[neko]
(^◇^)回転式バッファー : putMessage : 次回は[4] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[4]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[4]
1回だけ コミットに成功している。なぜ2回目以降は成功しないのか?
接続を切って、関数から抜けようぜ
Enqueue? > <---- [interrupt!] Dequeue(^q^) inudaze
i
(^q^)Enqueue: エンキュー開始☆ message=[i]
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [0]
name_queues[index] = [1112]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [True]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ index=[0]
(^q^)Enqueue: body.Length=[1]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
Enqueue? > <---- [interrupt!] Dequeue(^q^) idaze
hituji
(^q^)Enqueue: エンキュー開始☆ message=[hituji]
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [0]
name_queues[index] = [1112]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [True]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ index=[0]
(^q^)Enqueue: body.Length=[6]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
Enqueue? > <---- [interrupt!] Dequeue(^q^) hitujidaze
それでOK。
tamesi34_cs.cs
// OS : Windows 10
// IDE : Visual Studio 2015
// Install : NuGet : Install-Package RabbitMQ.Client -Version 4.1.1
//
// OS : Ubuntu 16.04
// Compile : Command : mcs /r:RabbitMQ.Client.dll -define:UBUNTU tamesi34_cs.cs
// Execute : Command : // フォアグラウンドで実行する
// : ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
// Check : Command : // キューの中身の数を調べる
// : rabbitmqctl list_queues
//
// Library : RabbitMQ
// Refference : Website : RabbitMQ http://www.rabbitmq.com/
// : Website : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
// : Website : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
//--------------------------------------------------------------------------------
// tamesi34_cs.cs
// Ubuntu の RabbitMQ はソースのバージョンが古いのか、API が異なった。
// #define UBUNTU
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace UsagiMQ
{
/// <summary>
/// このプログラム内だけで使われる数字。AMQP-CPPでの実装とは異なる。
/// AMQP-CPPでの実装 : AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
/// </summary>
[Flags]
enum LifeSpanType
{
/// <summary>
/// RabbitMQが止まってもキューを残す
/// </summary>
Durable = 1,
/// <summary>
/// コンシューマーが1人も接続していなかったら消す
/// </summary>
Autodelete = 0x1<<1,
/// <summary>
/// キューが存在するかどうかチェックするだけ。中身見ない時これ
/// </summary>
Passive = 0x1 << 2,
/// <summary>
/// この接続でだけ使える。この接続が切れたら消す
/// </summary>
Exclusive = 0x1 << 3
}
/// <summary>
/// メッセージを エンキューします。
/// キューの名前は指定してください。
/// デキューは割込みを受け付けます。
///
/// 参照 : QueueDeclare (v1.0) http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
/// 参照 : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
/// 参照 : QueueDelete (v1.4) https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.QueueDelete(System.UInt16,System.String,System.Boolean,System.Boolean,System.Boolean)
/// 参照 : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
/// 参照 : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
/// </summary>
class Program
{
static string Dump()
{
StringBuilder sb = new StringBuilder();
sb.AppendLine("tamesi34_cs.cs Dump");
sb.AppendLine(" name_queues [ENQUEUE_INDEX] =[" + name_queues[ENQUEUE_INDEX]+"]");
sb.AppendLine(" lifeSpan_queues [ENQUEUE_INDEX] =[" + lifeSpan_queues[ENQUEUE_INDEX] + "]");
sb.AppendLine(" durable_lifeSpans [ENQUEUE_INDEX] =[" + durable_lifeSpans[ENQUEUE_INDEX] + "]");
sb.AppendLine(" autodelete_lifeSpans[ENQUEUE_INDEX] =[" + autodelete_lifeSpans[ENQUEUE_INDEX] + "]");
sb.AppendLine(" passive_lifeSpans [ENQUEUE_INDEX] =[" + passive_lifeSpans[ENQUEUE_INDEX] + "]");
sb.AppendLine(" exclusive_lifeSpans [ENQUEUE_INDEX] =[" + exclusive_lifeSpans[ENQUEUE_INDEX] + "]");
sb.AppendLine(" ----");
sb.AppendLine(" name_queues [DEQUEUE_INDEX] =[" + name_queues[DEQUEUE_INDEX] + "]");
sb.AppendLine(" lifeSpan_queues [DEQUEUE_INDEX] =[" + lifeSpan_queues[DEQUEUE_INDEX] + "]");
sb.AppendLine(" durable_lifeSpans [DEQUEUE_INDEX] =[" + durable_lifeSpans[DEQUEUE_INDEX] + "]");
sb.AppendLine(" autodelete_lifeSpans[DEQUEUE_INDEX] =[" + autodelete_lifeSpans[DEQUEUE_INDEX] + "]");
sb.AppendLine(" passive_lifeSpans [DEQUEUE_INDEX] =[" + passive_lifeSpans[DEQUEUE_INDEX] + "]");
sb.AppendLine(" exclusive_lifeSpans [DEQUEUE_INDEX] =[" + exclusive_lifeSpans[DEQUEUE_INDEX] + "]");
sb.AppendLine(" ----");
sb.AppendLine(" name_queues [DELETEQUEUE_INDEX]=[" + name_queues[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" lifeSpan_queues [DELETEQUEUE_INDEX]=[" + lifeSpan_queues[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" durable_lifeSpans [DELETEQUEUE_INDEX]=[" + durable_lifeSpans[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" autodelete_lifeSpans[DELETEQUEUE_INDEX]=[" + autodelete_lifeSpans[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" passive_lifeSpans [DELETEQUEUE_INDEX]=[" + passive_lifeSpans[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" exclusive_lifeSpans [DELETEQUEUE_INDEX]=[" + exclusive_lifeSpans[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" ----");
return sb.ToString();
}
const int ENQUEUE_INDEX = 0;
const int DEQUEUE_INDEX = 1;
const int DELETEQUEUE_INDEX = 2;
const int NUM_INDEX = 3;
const string HOST_NAME = "localhost";
static string[] name_queues = new string[NUM_INDEX];
/// <summary>
/// キューの寿命(複合可能)
/// durable : RabbitMQが止まってもキューを残す
/// autodelete : コンシューマーが1人も接続していなかったら消す
/// passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
/// exclusive : この接続でだけ使える。この接続が切れたら消す
/// </summary>
static LifeSpanType[] lifeSpan_queues = new LifeSpanType[NUM_INDEX];
static bool[]
durable_lifeSpans = new bool[NUM_INDEX],
autodelete_lifeSpans = new bool[NUM_INDEX],
passive_lifeSpans = new bool[NUM_INDEX],
exclusive_lifeSpans = new bool[NUM_INDEX];
static void SetLifeSpan(int index_queue, string name_queue, LifeSpanType lifeSpan)
{
name_queues[index_queue] = name_queue;
lifeSpan_queues[index_queue] = lifeSpan;
// 一旦クリアー
durable_lifeSpans[index_queue] = false;
autodelete_lifeSpans[index_queue] = false;
passive_lifeSpans[index_queue] = false;
exclusive_lifeSpans[index_queue] = false;
// durable
if (((int)lifeSpan & (int)LifeSpanType.Durable) == (int)LifeSpanType.Durable) { durable_lifeSpans[index_queue] = true; }
// autodelete
if (((int)lifeSpan & (int)LifeSpanType.Autodelete) == (int)LifeSpanType.Autodelete) { autodelete_lifeSpans[index_queue] = true; }
// passive
if (((int)lifeSpan & (int)LifeSpanType.Passive) == (int)LifeSpanType.Passive) { passive_lifeSpans[index_queue] = true; }
// exclusive
if (((int)lifeSpan & (int)LifeSpanType.Exclusive) == (int)LifeSpanType.Exclusive) { exclusive_lifeSpans[index_queue] = true; }
}
public static ConnectionFactory GetFactory()
{
if (null == m_factory_)
{
m_factory_ = new ConnectionFactory() { HostName = HOST_NAME };
Console.WriteLine("(^q^)ファクトリーを用意したぜ☆ HostName=[" + HOST_NAME + "]");
}
return m_factory_;
}
static ConnectionFactory m_factory_;
public static IConnection GetConnection()
{
if (null == m_connection_)
{
m_connection_ = GetFactory().CreateConnection();
Console.WriteLine("(^q^)接続したぜ☆");
}
return m_connection_;
}
static IConnection m_connection_;
public static IModel GetChannel(int index)
{
if (null == m_channels_[index])
{
m_channels_[index] = GetConnection().CreateModel();
// 引数が7つの QueueDeclare でエラーを吐くことがあるので、情報を出力しておく。
Console.Error.WriteLine(Dump());
Console.Error.WriteLine("(tamesi34_cs.cs?) GetChannel index = [" + index + "]");
Console.Error.WriteLine(" name_queues[index] = [" + name_queues[index] + "]");
Console.Error.WriteLine(" passive_lifeSpans[index] = [" + passive_lifeSpans[index] + "]");
Console.Error.WriteLine(" durable_lifeSpans[index] = [" + durable_lifeSpans[index] + "]");
Console.Error.WriteLine(" exclusive_lifeSpans[index] = [" + exclusive_lifeSpans[index] + "]");
Console.Error.WriteLine(" autodelete_lifeSpans[index] = [" + autodelete_lifeSpans[index] + "]");
Console.Error.WriteLine(" nowait は仮に false 固定");
Console.Error.WriteLine(" arguments は仮に null 固定");
#if UBUNTU
// Ubuntuでは何故か Spring.Messaging.Amqp.Rabbit の引数 7 つのやつになっている。
// RabbitMQ.Client.Framing.Impl.v0_8.Model.QueueDeclare (System.String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Boolean nowait, IDictionary arguments)
// そして false になっているようだ。
m_channels_[index].QueueDeclare(name_queues[index], passive_lifeSpans[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], false, null);
//m_channels_[index].QueueDeclare(name_queues[index], false, false, false, false, false, null);
#else
m_channels_[index].QueueDeclare(name_queues[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], null);
#endif
}
Console.WriteLine("(^q^)チャンネルを開いたぜ☆ index=[" + index + "]");
return m_channels_[index];
}
static IModel[] m_channels_ = new IModel[NUM_INDEX];
public static EventingBasicConsumer GetConsumer(int index)
{
if (null == m_consumers_[index])
{
#if UBUNTU
// Ubuntuでは何故か v1.4.0 の引数が 0 個のやつになっている。調べたが引数が1個~6個のものは無かった。
m_consumers_[index] = new EventingBasicConsumer();
#else
m_consumers_[index] = new EventingBasicConsumer(GetChannel(index));
#endif
}
Console.WriteLine("(^q^)コンシューマーを取得したぜ☆ index=["+ index + "]");
return m_consumers_[index];
}
static EventingBasicConsumer[] m_consumers_ = new EventingBasicConsumer[NUM_INDEX];
/// <summary>
/// 受信できたときに割り込んでくる処理
/// </summary>
#if UBUNTU
public static BasicDeliverEventHandler GetDequeueHandler()
#else
public static EventHandler<BasicDeliverEventArgs> GetDequeueHandler()
#endif
{
if (null == m_dequeueHandler_)
{
#if UBUNTU
m_dequeueHandler_ = new BasicDeliverEventHandler((model, ea) =>
#else
m_dequeueHandler_ = new EventHandler<BasicDeliverEventArgs>((model, ea) =>
#endif
{
byte[] body = ea.Body;
string message = Encoding.UTF8.GetString(body);
Console.WriteLine("<---- [interrupt!] Dequeue(^q^) {0}", message);
});
}
return m_dequeueHandler_;
}
#if UBUNTU
static BasicDeliverEventHandler m_dequeueHandler_;
#else
static EventHandler<BasicDeliverEventArgs> m_dequeueHandler_;
#endif
/// <summary>
/// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
/// </summary>
static void CloseConnection()
{
if (null != m_connection_)
{
m_connection_.Close();
m_connection_ = null;
Console.WriteLine("(^q^)接続を閉じたぜ☆");
}
}
/// <summary>
/// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
/// </summary>
static void CloseChannel(int index)
{
if (null != m_channels_[index])
{
m_channels_[index].Close();
m_channels_[index] = null;
Console.WriteLine("(^q^)チャンネルを閉じたぜ☆");
}
}
static void Main(string[] args)
{
//----------------------------------------
// Delete
//----------------------------------------
for (;;)
{
Console.Write(@"削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? > ");
string queueName_delete = Console.ReadLine();
if (""== queueName_delete.Trim())
{
break;
}
Console.Write(@"削除するメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
("+ ((int)LifeSpanType.Durable) + @") durable : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
LifeSpanType lifeSpan_delete = (LifeSpanType)int.Parse(Console.ReadLine());
SetLifeSpan(DELETEQUEUE_INDEX, queueName_delete, lifeSpan_delete);
uint result = DeleteQueue();
Console.WriteLine(@"["+ queueName_delete + "]キューを削除したはずだぜ☆(^~^) result=["+ result + "] ただし、中にメッセージが残っていると消すのに失敗するみたいだが。");
}
//----------------------------------------
// Enqueue settings
//----------------------------------------
for (;;)
{
Console.Write(@"エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > ");
string queueName_enqueue = Console.ReadLine();
Console.Write(@"エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(" + ((int)LifeSpanType.Durable) + @") durable : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
int lifeSpan_enqueue;
if(int.TryParse(Console.ReadLine(),out lifeSpan_enqueue))
{
SetLifeSpan(ENQUEUE_INDEX, queueName_enqueue, (LifeSpanType)lifeSpan_enqueue);
break;
}
}
//----------------------------------------
// Enqueue settings
//----------------------------------------
EventingBasicConsumer consumer = null;
for (;;)
{
Console.Write(@"デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > ");
string queueName_dequeue = Console.ReadLine();
Console.Write(@"デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(" + ((int)LifeSpanType.Durable) + @") durable : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
int lifeSpan_dequeue;
if(int.TryParse(Console.ReadLine(),out lifeSpan_dequeue))
{
SetLifeSpan(DEQUEUE_INDEX, queueName_dequeue, (LifeSpanType)lifeSpan_dequeue);
StartConsume(out consumer);
break;
}
}
Console.Write(@"終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > ");
for (;;)
{
// "Hello World!" などを入力
string line = Console.ReadLine();
Enqueue(line);
Console.Write(@"Enqueue? > ");
}
// ここには来ない
// CloseConnection();
}
static uint DeleteQueue()
{
IModel channel = GetChannel(DELETEQUEUE_INDEX);
#if UBUNTU
// Ubuntuでは何故か 昔の .Net 用の引数 5 つのやつの変則版になっている。
// uint QueueDelete(ushort ticket, string queue, bool ifUnused, bool ifEmpty, bool nowait)
uint result = channel.QueueDelete( name_queues[DELETEQUEUE_INDEX],true,true,true);
#else
uint result = channel.QueueDelete(name_queues[DELETEQUEUE_INDEX],true,true);
#endif
// 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
CloseChannel(DELETEQUEUE_INDEX);
return result;
}
static void Enqueue(string message)
{
Console.WriteLine("(^q^)Enqueue: エンキュー開始☆ message=[" + message + "]");
IModel channel = GetChannel(ENQUEUE_INDEX);
byte[] body = Encoding.UTF8.GetBytes(message);
Console.WriteLine("(^q^)Enqueue: body.Length=[" + body.Length+"]");
channel.BasicPublish("", name_queues[ENQUEUE_INDEX], null, body);
Console.WriteLine("(^q^)Enqueue: パブリッシュしたぜ。キュー名=[" + name_queues[ENQUEUE_INDEX] + "]");
// 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
CloseChannel(ENQUEUE_INDEX);
}
static void StartConsume(out EventingBasicConsumer consumer)
{
IModel channel = GetChannel(DEQUEUE_INDEX);
consumer = GetConsumer(DEQUEUE_INDEX); // ローカル変数だとスコープが切れて寿命が尽きてしまうので、外側に返す
// 受信できたときに割り込んでくる処理
consumer.Received += GetDequeueHandler();
#if UBUNTU
// Ubuntuでは何故か引数が 5 個のやつになっている。
channel.BasicConsume( name_queues[DEQUEUE_INDEX], true, "", null, consumer);
#else
channel.BasicConsume( name_queues[DEQUEUE_INDEX], true, consumer);
#endif
// 終了はさせない
// consumer.Received -= GetReceiveHandler();
// CloseChannel(DEQUEUE_INDEX);
}
}
}
tamesi35a2_cpp.cpp
//--------------------------------------------------------------------------------
// OS : Windows10 : // このプログラムは動かない
//
// OS : Ubuntu 16.04
// Library : libev
// : Install : Command : sudo apt-get update
// : sudo apt-get install libev-dev
// Service : RabbitMQ
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Manual : Command : man rabbitmqctl
// : Start : Command : rabbitmq-server
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : : Command : rabbitmqctl list_queues
// Library : AMQP-CPP
// : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
// : Reference : Web site : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
// : Reference : Web site : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
// : Reference : Web site : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
// : Reference : Web site : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
// : Reference : Web site : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//
// Program : this
// : Compile : Command : g++ -std=c++11 tamesi35a2_cpp.cpp -o tamesi35a2_cpp.exe -lev -lamqpcpp -pthread
// : Execute : Command : // バックグラウンドで実行
// : ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
// : Stop : Typing : [Ctrl]+[C]
//--------------------------------------------------------------------------------
//
// メッセージの末尾に daze を付けます。
// デキューは「1112」、エンキューは「1113」キューに向けて行います。
// 標準入出力は、キーボード、画面には接続しないようにします。
#include <string> // std::string
#include <iostream> // std::cout
#include <sstream> // std::ostringstream
#include <chrono>
#include <thread> // sleep_for
#include <future> // thread
// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
// このプログラム内だけで使われる数字。AMQP-CPPでの実装とは異なる。
// AMQP-CPPでの実装 : AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
enum LifeSpanType
{
// RabbitMQが止まってもキューを残す
durable = 1,
// コンシューマーが1人も接続していなかったら消す
autodelete = 0x1 << 1,
// キューが存在するかどうかチェックするだけ。中身見ない時これ
passive = 0x1 << 2,
// この接続でだけ使える。この接続が切れたら消す
exclusive = 0x1 << 3
};
static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
// 0 : enqueue用設定
// 1 : dequeue用設定
static const int ENQUEUE_INDEX = 0;
static const int DEQUEUE_INDEX = 1;
static const int NUM_INDEX = 2;
static std::string name_queues[] = { "", "" };
static LifeSpanType lifeSpan_queues[] = { (LifeSpanType)0, (LifeSpanType)0 };
static LifeSpanType LifeSpanString_To_Enum(std::string lifeSpan)
{
if ("durable"==lifeSpan) {
return durable;
} else if ("autodelete" == lifeSpan) {
return autodelete;
} else if ("passive" == lifeSpan) {
return passive;
} else if ("exclusive" == lifeSpan) {
return exclusive;
} else {
std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
exit(1);
}
}
static int LifeSpanString_To_AmqpInt(LifeSpanType lifeSpan)
{
int amqpInt = 0;
if ( ((int)durable & (int)lifeSpan) == (int)durable) {
amqpInt |= AMQP::durable;
}
if (((int)autodelete & (int)lifeSpan) == (int)autodelete) {
amqpInt |= AMQP::autodelete;
}
if (((int)passive & (int)lifeSpan) == (int)passive) {
amqpInt |= AMQP::passive;
}
if (((int)exclusive & (int)lifeSpan) == (int)exclusive) {
amqpInt |= AMQP::exclusive;
}
return amqpInt;
}
static std::string Dump()
{
static std::ostringstream sb;
sb << "tamesi35a2_cs.cs Dump" << std::endl
<< " name_queues [ENQUEUE_INDEX] =[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
<< " lifeSpan_queues [ENQUEUE_INDEX] =[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl
<< " ----" << std::endl
<< " name_queues [DEQUEUE_INDEX] =[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl
<< " lifeSpan_queues [DEQUEUE_INDEX] =[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl
<< " ----" << std::endl;
return sb.str();
}
// 接続はシングルトンにします
static struct ev_loop* pLoop_ev[2] = { ev_loop_new(), ev_loop_new() };
static AMQP::LibEvHandler* pHandler_ev[2] = { new AMQP::LibEvHandler(pLoop_ev[ENQUEUE_INDEX]), new AMQP::LibEvHandler(pLoop_ev[DEQUEUE_INDEX]) };
static AMQP::TcpConnection* pConnection_ev[2] = { nullptr,nullptr };
static AMQP::TcpConnection* getConnection(int index)
{
if (nullptr== pConnection_ev[index]) {
pConnection_ev[index] = new AMQP::TcpConnection( pHandler_ev[index], ADDRESS );
std::cout << "(^▽^)接続したぜ☆ index=[" << index << "]" << std::endl;
}
return pConnection_ev[index];
}
static void closeChannel(int index);
static void closeConnection(int index)
{
if (nullptr != pConnection_ev[index]) {
// チャンネルにもヌルのフラグを入れる
closeChannel(index);
std::cout << "(^▽^)これから接続を閉じるぜ☆ index=[" << index << "]" << std::endl;
pConnection_ev[index]->close();
std::cout << "(^▽^)接続を閉じたぜ☆" << std::endl;
pConnection_ev[index] = nullptr;
}
}
// チャンネルはシングルトンにします。
static AMQP::TcpChannel* pChannel_ev[2] = { nullptr,nullptr };
static AMQP::TcpChannel* getChannel(int index)
{
if (nullptr== pChannel_ev[index])
{
pChannel_ev[index] = new AMQP::TcpChannel(getConnection(index));
std::cout << "(^▽^)チャンネル開いたぜ☆ index=[" << index << "]" << std::endl;
}
return pChannel_ev[index];
}
static void closeChannel(int index)
{
if (nullptr != pChannel_ev[index]) {
std::cout << "(^▽^)これからチャンネルを閉じるぜ☆ index=[" << index << "]" << std::endl;
pChannel_ev[index]->close();
std::cout << "(^▽^)チャンネルを閉じたぜ☆" << std::endl;
pChannel_ev[index] = nullptr;
}
}
/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
const int bufferSize = 100;
static std::string buffer[bufferSize] = {};
static int bufferCursors[2] = { 0, 0 };
const int PUT_INDEX = 0;
const int GET_INDEX = 1;
std::mutex _mutex;
static void putMessage(std::string message)
{
//std::unique_lock<std::mutex> lock(_mutex);
std::lock_guard<std::mutex> lock(_mutex);
buffer[bufferCursors[PUT_INDEX]] = message;
std::cout << "(^◇^)回転式バッファー : putMessage : [" << bufferCursors[PUT_INDEX] << "] 入れた☆! message=[" << message << "]" << std::endl;
bufferCursors[PUT_INDEX]++;
if (!(bufferCursors[PUT_INDEX] < bufferSize))
{
bufferCursors[PUT_INDEX] = 0;
}
std::cout << "(^◇^)回転式バッファー : putMessage : 次回は[" << bufferCursors[PUT_INDEX] << "] ☆!" << std::endl;
}
static bool hasMessage() {
std::lock_guard<std::mutex> lock(_mutex);
return "" != buffer[bufferCursors[GET_INDEX]];
}
static std::string removeMessage()
{
//std::unique_lock<std::mutex> lock(_mutex);
std::lock_guard<std::mutex> lock(_mutex);
if ("" != buffer[bufferCursors[GET_INDEX]])
{
std::string message = buffer[bufferCursors[GET_INDEX]];
std::cout << "(^◇^)回転式バッファー : ゲットメッセージ : [" << bufferCursors[GET_INDEX] << "] 出した☆! message=[" << message << "]" << std::endl;
buffer[bufferCursors[GET_INDEX]] = "";
bufferCursors[GET_INDEX]++;
if (!(bufferCursors[GET_INDEX] < bufferSize))
{
bufferCursors[GET_INDEX] = 0;
}
std::cout << "(^◇^)回転式バッファー : ゲットメッセージ : 次回は[" << bufferCursors[GET_INDEX] << "] ☆!" << std::endl;
return message;
}
return "";
}
static std::string getMessage()
{
std::lock_guard<std::mutex> lock(_mutex);
return buffer[bufferCursors[GET_INDEX]];
}
}
// メッセージ・キューの送信を担当
static void workProduce()
{
std::cout << "(^▽^)エンキュー開始だぜ☆" << std::endl;
std::string exchange_name = "myexchange";
std::string routing_key = "";
int lifeSpan = LifeSpanString_To_AmqpInt(lifeSpan_queues[ENQUEUE_INDEX]);
// おっきな無限ループ
for (;;) {
getChannel(ENQUEUE_INDEX)->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
.onError([&lifeSpan](const char* errMsg) {
std::cerr << "(>_<)enqueue: error declaring queue: [" << errMsg << "] queue name=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan=[" << lifeSpan << "]" << "\n";
});
std::cout << "(^▽^)workProduce : デクレア・キューしたぜ☆" << std::endl;
getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
// ev_run で1回呼び出される。onSuccessが終わっても、ev_run は抜けない。
.onSuccess([&exchange_name, &routing_key]() {
std::cout << "(^q^) bindQueue.onSuccess : 開始だぜ☆" << std::endl;
for (;;)
{
if (rotationBuffer::hasMessage())
{
std::string message = rotationBuffer::removeMessage();
std::cout << "(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[" << message << "]" << std::endl;
// 末尾に daze を付ける。
message += "daze";
std::cout << "(^▽^)bindQueue.onSuccess : これからトランザクションを開始するぜ☆" << std::endl;
getChannel(ENQUEUE_INDEX)->startTransaction();
std::cout << "(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆" << std::endl;
if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
getChannel(ENQUEUE_INDEX)->commitTransaction()
.onSuccess([]() {
std::cout << "(^▽^)コミット成功☆!" << std::endl;
})
.onError([](const char *errMsg) {
std::cout << "(>_<)コミット失敗☆ errMsg=[" << errMsg << "]" << std::endl;
});
std::cout << "(^▽^)パブリッシュしたぜ☆" << std::endl;
}else{
std::cout << "(/_\)パブリッシュできなかったぜ☆" << std::endl;
getChannel(ENQUEUE_INDEX)->rollbackTransaction();
}
// 接続を切って、かつ、このハンドラーから抜けよう。
// break in ev_run loop.
closeConnection(ENQUEUE_INDEX);
//closeChannel(ENQUEUE_INDEX);
break;
}
//else
//{
// std::cout << "(^q^) bindQueue.onSuccess : メッセージは無いぜ☆" << std::endl;
//}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
std::cout << "(^q^) bindQueue.onSuccess : 終了だぜ☆" << std::endl;
}).onError([](const char* errMsg) {
std::cerr << "(>_<) バインドキュー : error declaring queue: [" << errMsg << "]" << "\n";
});
std::cout << "(^▽^)workProduce : バインド・キューしたぜ☆" << std::endl;
std::cout << "(^q^) workProduce : メッセージ監視ループに入るぜ☆" << std::endl;
// ここで、バインドキューのオンサクセスの処理へ。
//ev_run(pLoop_ev[ENQUEUE_INDEX]);
ev_run(pLoop_ev[ENQUEUE_INDEX], 0);
std::cout << "(^q^) workProduce : メッセージ監視ループを出たぜ☆ 再接続しようぜ☆" << std::endl;
}
std::cout << "(>_<) workProduce : おっきな無限ループを出てしまったぜ☆?" << std::endl;
}
// メッセージ・キューの監視を開始
static void workConsume()
{
std::cout << "(^q^)コンシューム開始だぜ☆" << std::endl;
// I will go to the front of the box named "1111".
int lifeSpan = LifeSpanString_To_AmqpInt(lifeSpan_queues[DEQUEUE_INDEX]);
getChannel(DEQUEUE_INDEX)->declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);
std::cout << "(^q^)コンシュームは、デクレア・キューしたぜ☆" << std::endl;
// I look inside the box.
auto errorCb = [&lifeSpan](const char *errMsg) {
std::cerr << "My ID watching failed [" << errMsg << "] lifeSpan=[" << lifeSpan << "]" << std::endl;
};
auto messageCb = [](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::string myString(message.body(), message.bodySize());
std::cout << "(^q^)レシーブしたぜ☆ myString=[" << myString << "]" << std::endl;
rotationBuffer::putMessage(myString);
std::cout << "(^q^)レシーブ : これからackするぜ☆ deliveryTag=[" << deliveryTag << "]" << std::endl;
getChannel(DEQUEUE_INDEX)->ack(deliveryTag);
std::cout << "(^q^)レシーブ : ackしたぜ☆ deliveryTag=[" << deliveryTag << "]" << std::endl;
};
getChannel(DEQUEUE_INDEX)->consume(name_queues[DEQUEUE_INDEX])
.onReceived(messageCb)
.onError(errorCb);
std::cout << "(^q^)コンシュームを設定したぜ☆" << std::endl;
// I will keep on forever.
ev_run(pLoop_ev[DEQUEUE_INDEX]);
std::cout << "(^q^)コンシュームのループから抜けたぜ☆?" << std::endl;
// I will not come here.
return;
}
int main(int argc, char* argv[])
{
std::cout << "(^q^)Welcome! AMQP::durable=[" << AMQP::durable << "] AMQP::autodelete=[" << AMQP::autodelete << "] AMQP::passive=[" << AMQP::passive << "] AMQP::exclusive=[" << AMQP::exclusive << "]" << std::endl;
// 引数の解析
if(0<argc)
{
// プログラム名を省き、コマンドライン引数だけをつなげる。
std::string cmdArg;
for (int i = 1; i < argc; ++i)
{
cmdArg += std::string(argv[i]);
if (i < argc) {
cmdArg += " ";
}
}
std::cout << "(^q^)cmdArg=[" << cmdArg << "]" << std::endl;
std::istringstream data(cmdArg);
//data >> std::skipws;
std::cout << "(^q^)data.str()=[" << data.str() << "]" << std::endl;
// 与件
// 「--enqueue 1117 durable autodelete --dequeue 1116 durable autodelete」
// 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」
// 受け皿
std::string queueNames[2] = {}; // 結果はこれらの配列に入れる
std::vector<std::string> lifespans[2] = {};
// 記憶
int m0 = -1; // 0:enqueue, 1:dequeue
int m1 = -1; // lifespans index.
// 解析器
std::string a;
while (data >> a) {
std::cout << "(^_^)a0=[" << a << "]" << std::endl;
if ("--enqueue" == a)
{
std::cout << "(^q^)エンキュー!" << std::endl;
m0 = 0;
m1 = -1;
}
else if ("--dequeue" == a)
{
std::cout << "(^q^)デキュー!" << std::endl;
m0 = 1;
m1 = -1;
}
else if (-1 == m1 && 0 == m0) {
std::cout << "(^q^)エンキューのキュー名!" << std::endl;
queueNames[m0] = a; // キュー名
lifespans[m0] = std::vector<std::string>();// 配列の中にリストを準備
m1++;
}
else if (-1 == m1 && 1 == m0) {
std::cout << "(^q^)デキューのキュー名!" << std::endl;
queueNames[m0] = a; // キュー名
lifespans[m0] = std::vector<std::string>();// 配列の中にリストを準備
m1++;
}
else
{
std::cout << "(^q^)寿命! m0=[" << m0 << "] m1=[" << m1 << "]" << std::endl;
lifespans[m0].push_back(a);
std::cout << "(^q^)zyumyo-!" << std::endl;
} // 要素数-1 で最後の要素
}
std::cout << "(^-^)Loop end." << std::endl;
if ("" == queueNames[0] || "" == queueNames[1]) {
std::cerr << "コマンドライン引数の「--enqueue キュー名 寿命」「--dequeue キュー名 寿命」を漏れなく指定してください。例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete" << std::endl;
std::cerr << "queueNames[0]=[" << queueNames[0] << "]" << std::endl;
//std::cerr << "queueNames[1]=[" << queueNames[1] << "]" << std::endl;
//std::cerr << Dump();
exit(1);
}
std::cout << "(^q^)エンキューのキュー名=[" << queueNames[0] << "]" << std::endl;
std::cout << "(^q^)デキューのキュー名=[" << queueNames[1] << "]" << std::endl;
std::cout << "(^q^)エンキューの寿命の数=[" << lifespans[0].size() << "]" << std::endl;
std::cout << "(^q^)デキューの寿命の数=[" << lifespans[1].size() << "]" << std::endl;
// エンキュー
{
m0 = 0;
name_queues[ENQUEUE_INDEX] = queueNames[m0];
lifeSpan_queues[ENQUEUE_INDEX] = (LifeSpanType)0;
{
for (std::string lifespan : lifespans[m0])
{
lifeSpan_queues[ENQUEUE_INDEX] = (LifeSpanType)((int)lifeSpan_queues[ENQUEUE_INDEX] | (int)LifeSpanString_To_Enum(lifespan));
}
}
}
std::cout << "(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan_queues[ENQUEUE_INDEX]=[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl;
// デキュー
{
m0 = 1;
name_queues[DEQUEUE_INDEX] = queueNames[m0];
lifeSpan_queues[DEQUEUE_INDEX] = (LifeSpanType)0;
{
for (std::string lifespan : lifespans[m0])
{
lifeSpan_queues[DEQUEUE_INDEX] = (LifeSpanType)((int)lifeSpan_queues[DEQUEUE_INDEX] | (int)LifeSpanString_To_Enum(lifespan));
}
}
}
std::cout << "(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[" << name_queues[DEQUEUE_INDEX] << "] lifeSpan_queues[DEQUEUE_INDEX]=[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl;
std::cout << Dump();
std::cout << "(^q^)ダンプおわり" << std::endl;
}
else
{
std::cerr << "コマンドライン引数を指定してください。例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete";
exit(1);
}
// 常時監視をスタート
auto th1 = std::thread([] { workConsume(); });
auto th2 = std::thread([] { workProduce(); });
std::cout << "(^q^)メインスレッドはずっと回ってるぜ☆" << std::endl;
for (;;) // このメイン・スレッドはずっと寝てる
{
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
std::cout << "(^q^)メインスレッドのループを抜けた☆?" << std::endl;
// このプログラムは、自分では接続を切らない。
// スレッドの終了を待つ
th2.join();
th1.join();
std::cout << "(^q^)Finished!" << std::endl;
return 0;
}
これを浮かむ瀬に組み込むには 長ったらしい。もっと短く書けんのか。
最初の1つ
# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 1
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > saru
Enqueue? > suzume
Enqueue? > <---- [interrupt!] Dequeue(^q^) suzumedaze
penguin
Enqueue? > <---- [interrupt!] Dequeue(^q^) penguindaze
sika
Enqueue? > <---- [interrupt!] Dequeue(^q^) sikadaze
nezumi
Enqueue? > <---- [interrupt!] Dequeue(^q^) nezumidaze
こんどは最初の1つが返ってこないぞ。
最初の方
# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 1
34_cs consumer.create [1] begin
34_cs consumer.create [1] end
34_cs channel.open [1] begin
34_cs connection.open begin
34_cs connection.open end
34_cs channel.open [1] end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > usagi
34_cs publish begin [usagi]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > kame
34_cs publish begin [kame]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > rakko
34_cs publish begin [rakko]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) rakkodaze
surume
34_cs publish begin [surume]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) surumedaze
途中から反応しだすのは何でだろう?
じゃあ、しばらく待ってから通信したらいいのか?
# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 1
34_cs consumer.create [1] begin
34_cs consumer.create [1] end
34_cs channel.open [1] begin
34_cs connection.open begin
34_cs connection.open end
34_cs channel.open [1] end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > medaka
34_cs publish begin [medaka]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs <-------- [interrupt!] Dequeue(^q^) medakadaze
34_cs channel.close [0] end
Enqueue? > batta
34_cs publish begin [batta]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) battadaze
mebosi
34_cs publish begin [mebosi]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) mebosidaze
たしかに しばらく待つ と 最初の通信から応答が返ってくるようだが、プロセス間通信をするために最初に1秒待たないと行けないのだとしたら本末転倒だ。
tamesi35a2_cpp.out.log
35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
35a2_cpp main dump begin
tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
35a2_cpp main dump end
35a2_cpp[1] channel.open begin
35a2_cpp[1] connection.open begin
35a2_cpp[0] channel.open begin
35a2_cpp[0] connection.open begin
35a2_cpp[35a2_cpp[1] connection.open end
0] connection.open end35a2_cpp[1] channel.open end
35a2_cpp[0] channel.open end
35a2_cpp rotationBuffer.putMessage begin [0] [kame]
35a2_cpp rotationBuffer.putMessage end next=[1]
35a2_cpp consumer.consume.ack begin [1] [kame]
35a2_cpp consumer.consume.ack end [1]
35a2_cpp rotationBuffer.removeMessage begin [0] [kame]
35a2_cpp rotationBuffer.removeMessage end next=[1]
35a2_cpp[0] connection.close begin
35a2_cpp[0] channel.close begin
35a2_cpp[0] channel.close end
35a2_cpp[0] connection.close end
35a2_cpp[0] channel.open begin
35a2_cpp[0] connection.open begin
35a2_cpp[0] connection.open end
35a2_cpp[0] channel.open end
35a2_cpp rotationBuffer.putMessage begin [1] [surume]
35a2_cpp rotationBuffer.putMessage end next=[2]
35a2_cpp consumer.consume.ack begin [2] [surume]
35a2_cpp consumer.consume.ack end [2]
35a2_cpp rotationBuffer.removeMessage begin [1] [surume]
35a2_cpp rotationBuffer.removeMessage end next=[2]
35a2_cpp[0] connection.close begin
35a2_cpp[0] channel.close begin
35a2_cpp[0] channel.close end
35a2_cpp[0] connection.close end
35a2_cpp producer.publish.commited [ F�x^A^?^@^@B�nx^A^?^@^@$^@^@^@^@^@^@^@B�nx^A^?^@^@ F�x^A^?^@^@�F�w^A^?^@^@B�nx^A^?^@^@^A^@^@^@^@^@^@^@�G�w^A^?^@^@��mx^A^?^@^@�%c^@^@^@^@^@^PH�w^A^?^@^@�%c^@^@^@^@^@^@^@^@^@^@^@^@^@B�nx^A^$
ログに kame と surume しかいないのも分からない。 usagi や rakko はどこに行ったのか?
時刻はずれているが、時刻を出してみた。
03:20:17 '254 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
03:20:17 '254 35a2_cpp main dump begin
03:20:17 '254 tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
03:20:17 '254 35a2_cpp main dump end
03:20:17 '255 35a2_cpp producer.sendMessage begin
03:20:17 '255 35a2_cpp producer.sendMessage.declareQueue begin
03:20:17 '255 35a2_cpp[0] channel.open begin
03:20:17 '255 35a2_cpp consumer begin
03:20:17 '255 35a2_cpp consumer.declareQueue begin
03:20:17 '255 35a2_cpp[0] connection.open begin
03:20:17 '255 35a2_cpp[1] channel.open begin
03:20:17 '255 35a2_cpp[1] connection.open begin
03:20:17 '256 35a2_cpp[1] connection.open end
03:20:17 '257 35a2_cpp[1] channel.open end
03:20:17 '256 35a2_cpp[0] connection.open end
03:20:17 '257 35a2_cpp[0] channel.open end
03:20:17 '257 35a2_cpp producer.sendMessage.declareQueue end
03:20:17 '257 35a2_cpp producer.sendMessage.bindQueue begin
03:20:17 '257 35a2_cpp producer.sendMessage.bindQueue end
03:20:17 '257 35a2_cpp producer.sendMessage.ev_run begin
03:20:17 '257 35a2_cpp consumer.declareQueue end
03:20:17 '257 35a2_cpp consumer.consume begin
03:20:17 '257 35a2_cpp consumer.consume end
03:20:17 '257 35a2_cpp consumer.ev_run begin
03:20:56 '695 35a2_cpp rotationBuffer.putMessage begin [0] before lock [neko]
03:20:56 '695 35a2_cpp rotationBuffer.putMessage begin [0] [neko]
03:20:56 '695 35a2_cpp rotationBuffer.putMessage end next=[1]
03:20:56 '695 35a2_cpp consumer.consume.received.ack begin [1] [neko]
03:20:56 '695 35a2_cpp consumer.consume.received.ack end [1]
03:20:56 '699 35a2_cpp rotationBuffer.removeMessage begin [0] [neko]
03:20:56 '699 35a2_cpp rotationBuffer.removeMessage end return=[neko] next=[1]
03:20:56 '700 35a2_cpp producer.sendMessage.bindQueue.onSuccess [nekodaze]
03:20:56 '700 35a2_cpp[0] connection.close begin
03:20:56 '700 35a2_cpp[0] channel.close begin
03:20:56 '700 35a2_cpp[0] channel.close end
03:20:56 '700 35a2_cpp[0] connection.close end
03:20:56 '709 35a2_cpp producer.sendMessage.publish.commited []
03:20:56 '711 35a2_cpp producer.sendMessage.ev_run end
03:20:56 '711 35a2_cpp producer.sendMessage end
03:20:56 '711 35a2_cpp producer.sendMessage begin
03:20:56 '711 35a2_cpp producer.sendMessage.declareQueue begin
03:20:56 '711 35a2_cpp[0] channel.open begin
03:20:56 '711 35a2_cpp[0] connection.open begin
03:20:56 '712 35a2_cpp[0] connection.open end
03:20:56 '712 35a2_cpp[0] channel.open end
03:20:56 '712 35a2_cpp producer.sendMessage.declareQueue end
03:20:56 '712 35a2_cpp producer.sendMessage.bindQueue begin
03:20:56 '712 35a2_cpp producer.sendMessage.bindQueue end
03:20:56 '712 35a2_cpp producer.sendMessage.ev_run begin
ev_run は2つとも待機状態で、突然 neko から始まっているが、やはり inu は飛ばされている。このあたり調べてみたい。
03:20:17 '257 35a2_cpp producer.sendMessage.ev_run begin
03:20:17 '257 35a2_cpp consumer.declareQueue end
03:20:17 '257 35a2_cpp consumer.consume begin
03:20:17 '257 35a2_cpp consumer.consume end
03:20:17 '257 35a2_cpp consumer.ev_run begin
03:20:56 '695 35a2_cpp rotationBuffer.putMessage begin [0] before lock [neko]
03:20:56 '695 35a2_cpp rotationBuffer.putMessage begin [0] [neko]
03:20:56 '695 35a2_cpp rotationBuffer.putMessage end next=[1]
03:20:56 '695 35a2_cpp consumer.consume.received.ack begin [1] [neko]
03:20:56 '695 35a2_cpp consumer.consume.received.ack end [1]
03:20:56 '699 35a2_cpp rotationBuffer.removeMessage begin [0] [neko]
03:20:56 '699 35a2_cpp rotationBuffer.removeMessage end return=[neko] next=[1]
03:20:56 '700 35a2_cpp producer.sendMessage.bindQueue.onSuccess [nekodaze]
03:20:56 '700 35a2_cpp[0] connection.close begin
03:20:56 '700 35a2_cpp[0] channel.close begin
03:20:56 '700 35a2_cpp[0] channel.close end
03:20:56 '700 35a2_cpp[0] connection.close end
03:20:56 '709 35a2_cpp producer.sendMessage.publish.commited []
03:20:56 '711 35a2_cpp producer.sendMessage.ev_run end
03:20:56 '711 35a2_cpp producer.sendMessage end
03:20:56 '711 35a2_cpp producer.sendMessage begin
03:20:56 '711 35a2_cpp producer.sendMessage.declareQueue begin
03:20:56 '711 35a2_cpp[0] channel.open begin
03:20:56 '711 35a2_cpp[0] connection.open begin
03:20:56 '712 35a2_cpp[0] connection.open end
03:20:56 '712 35a2_cpp[0] channel.open end
03:20:56 '712 35a2_cpp producer.sendMessage.declareQueue end
03:20:56 '712 35a2_cpp producer.sendMessage.bindQueue begin
03:20:56 '712 35a2_cpp producer.sendMessage.bindQueue end
03:20:56 '712 35a2_cpp producer.sendMessage.ev_run begin
03:26:07 '706 35a2_cpp rotationBuffer.putMessage begin [1] before lock [saru]
03:26:07 '706 35a2_cpp rotationBuffer.putMessage begin [1] [saru]
03:26:07 '706 35a2_cpp rotationBuffer.putMessage end next=[2]
03:26:07 '706 35a2_cpp consumer.consume.received.ack begin [2] [saru]
03:26:07 '707 35a2_cpp consumer.consume.received.ack end [2]
03:26:07 '708 35a2_cpp rotationBuffer.removeMessage begin [1] [saru]
03:26:07 '708 35a2_cpp rotationBuffer.removeMessage end return=[saru] next=[2]
03:26:07 '708 35a2_cpp producer.sendMessage.bindQueue.onSuccess [sarudaze]
03:26:07 '708 35a2_cpp[0] connection.close begin
03:26:07 '708 35a2_cpp[0] channel.close begin
03:26:07 '708 35a2_cpp[0] channel.close end
03:26:07 '708 35a2_cpp[0] connection.close end
03:26:07 '710 35a2_cpp producer.sendMessage.ev_run end
03:26:07 '710 35a2_cpp producer.sendMessage end
03:26:07 '710 35a2_cpp producer.sendMessage begin
03:26:07 '710 35a2_cpp producer.sendMessage.declareQueue begin
03:26:07 '710 35a2_cpp[0] channel.open begin
03:26:07 '710 35a2_cpp[0] connection.open begin
03:26:07 '710 35a2_cpp[0] connection.open end
03:26:07 '711 35a2_cpp[0] channel.open end
03:26:07 '711 35a2_cpp producer.sendMessage.declareQueue end
03:26:07 '711 35a2_cpp producer.sendMessage.bindQueue begin
03:26:07 '711 35a2_cpp producer.sendMessage.bindQueue end
03:26:07 '711 35a2_cpp producer.sendMessage.ev_run begin
03:26:10 '447 35a2_cpp rotationBuffer.putMessage begin [2] before lock [inu]
03:26:10 '447 35a2_cpp rotationBuffer.putMessage begin [2] [inu]
03:26:10 '447 35a2_cpp rotationBuffer.putMessage end next=[3]
03:26:10 '447 35a2_cpp consumer.consume.received.ack begin [3] [inu]
03:26:10 '448 35a2_cpp consumer.consume.received.ack end [3]
03:26:10 '466 35a2_cpp rotationBuffer.removeMessage begin [2] [inu]
03:26:10 '466 35a2_cpp rotationBuffer.removeMessage end return=[inu] next=[3]
03:26:10 '466 35a2_cpp producer.sendMessage.bindQueue.onSuccess [inudaze]
03:26:10 '467 35a2_cpp[0] connection.close begin
03:26:10 '467 35a2_cpp[0] channel.close begin
03:26:10 '467 35a2_cpp[0] channel.close end
03:26:10 '467 35a2_cpp[0] connection.close end
03:26:10 '469 35a2_cpp producer.sendMessage.publish.commited []
03:26:10 '477 35a2_cpp producer.sendMessage.ev_run end
03:26:10 '477 35a2_cpp producer.sendMessage end
03:26:10 '477 35a2_cpp producer.sendMessage begin
03:26:10 '477 35a2_cpp producer.sendMessage.declareQueue begin
03:26:10 '477 35a2_cpp[0] channel.open begin
03:26:10 '477 35a2_cpp[0] connection.open begin
03:26:10 '477 35a2_cpp[0] connection.open end
03:26:10 '478 35a2_cpp[0] channel.open end
03:26:10 '478 35a2_cpp producer.sendMessage.declareQueue end
03:26:10 '478 35a2_cpp producer.sendMessage.bindQueue begin
03:26:10 '478 35a2_cpp producer.sendMessage.bindQueue end
03:26:10 '478 35a2_cpp producer.sendMessage.ev_run begin
03:26:15 '916 35a2_cpp rotationBuffer.putMessage begin [3] before lock [nezumi]
03:26:15 '916 35a2_cpp rotationBuffer.putMessage begin [3] [nezumi]
03:26:15 '916 35a2_cpp rotationBuffer.putMessage end next=[4]
03:26:15 '916 35a2_cpp consumer.consume.received.ack begin [4] [nezumi]
saru tori inu i nezumi で、ひとつ飛ばしになっている気もする。
手順を確認
RabbitMQ サービスを再起動。
rabbitmqctl stop
rabbitmq-server
[Ctrl] + [Z]
bg 1
1 はジョブ番号
./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
送信先キュー 1112、durable + autodelete
受信元キュー 1113、durable + autodelete
6件送信
# rabbitmqctl list_queues
Listing queues ...
1112 6
# ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
[2] 16393
# jobs
[1]- Running rabbitmq-server & (wd: /home/★user/shogi/csharp_service)
[2]+ Running ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
# rabbitmqctl list_queues
Listing queues ...
1112 0
1113 6
6件の返信はできているようだ。
つまり うまく行っているときは
03:56:43 '147 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
03:56:43 '148 35a2_cpp main dump begin
03:56:43 '148 tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
03:56:43 '148 35a2_cpp main dump end
03:56:43 '148 35a2_cpp ___(C) ___ begin
03:56:43 '148 03:56:43 '148 35a2_cpp ___(C) DQ_ begin
35a2_cpp (P)___ WKP begin03:56:43 '149 35a2_cpp[
1] channel.open begin
03:56:43 '149 35a2_cpp[1] connection.open begin
03:56:43 '149 35a2_cpp (P)___ WKP.SM_ begin
03:56:43 '149 35a2_cpp (P)___ WKP.SM_.declareQueue begin
03:56:43 '149 35a2_cpp[0] channel.open begin
03:56:43 '149 35a2_cpp[0] connection.open begin
03:56:43 '150 35a2_cpp[0] connection.open end
03:56:43 '150 35a2_cpp[1] connection.open end
03:56:43 '151 35a2_cpp[1] channel.open end
03:56:43 '151 35a2_cpp ___(C) DQ_ end
03:56:43 '151 35a2_cpp ___(C) CNS begin
03:56:43 '151 35a2_cpp ___(C) CNS end
03:56:43 '151 35a2_cpp ___(C) ___ ev_run begin
03:56:43 '151 35a2_cpp[0] channel.open end
03:56:43 '151 35a2_cpp (P)___ WKP.SM_.declareQueue end
03:56:43 '151 35a2_cpp (P)___ WKP.SM_.bindQueue begin
03:56:43 '151 35a2_cpp (P)___ WKP.SM_.bindQueue end
03:56:43 '151 35a2_cpp (P)___ WKP.SM_ ev_run begin
03:56:43 '167 35a2_cpp ___(C) CNS.received begin [s1]
03:56:43 '167 35a2_cpp rotationBuffer.putMessage begin [0] before lock [s1]
03:56:43 '167 35a2_cpp rotationBuffer.putMessage begin [0] [s1]
03:56:43 '167 35a2_cpp rotationBuffer.putMessage end next=[1]
03:56:43 '167 35a2_cpp ___(C) CNS.received.ack begin [1] [s1]
03:56:43 '167 35a2_cpp ___(C) CNS.received.ack end [1]
03:56:43 '167 35a2_cpp ___(C) CNS.received end [s1]
03:56:43 '168 35a2_cpp ___(C) CNS.received begin [s2]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage begin [1] before lock [s2]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage begin [1] [s2]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage end next=[2]
03:56:43 '168 35a2_cpp ___(C) CNS.received.ack begin [2] [s2]
03:56:43 '168 35a2_cpp ___(C) CNS.received.ack end [2]
03:56:43 '168 35a2_cpp ___(C) CNS.received end [s2]
03:56:43 '168 35a2_cpp ___(C) CNS.received begin [s3]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage begin [2] before lock [s3]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage begin [2] [s3]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage end next=[3]
03:56:43 '168 35a2_cpp ___(C) CNS.received.ack begin [3] [s3]
03:56:43 '168 35a2_cpp ___(C) CNS.received.ack end [3]
03:56:43 '168 35a2_cpp ___(C) CNS.received end [s3]
03:56:43 '168 35a2_cpp ___(C) CNS.received begin [s4]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage begin [3] before lock [s4]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage begin [3] [s4]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage end next=[4]
03:56:43 '168 35a2_cpp ___(C) CNS.received.ack begin [4] [s4]
03:56:43 '169 35a2_cpp ___(C) CNS.received.ack end [4]
03:56:43 '169 35a2_cpp ___(C) CNS.received end [s4]
03:56:43 '169 35a2_cpp ___(C) CNS.received begin [s5]
03:56:43 '169 35a2_cpp rotationBuffer.putMessage begin [4] before lock [s5]
03:56:43 '169 35a2_cpp rotationBuffer.putMessage begin [4] [s5]
03:56:43 '169 35a2_cpp rotationBuffer.putMessage end next=[5]
03:56:43 '169 35a2_cpp ___(C) CNS.received.ack begin [5] [s5]
03:56:43 '169 35a2_cpp ___(C) CNS.received.ack end [5]
03:56:43 '169 35a2_cpp ___(C) CNS.received end [s5]
03:56:43 '170 35a2_cpp ___(C) CNS.received begin [s6]
03:56:43 '170 35a2_cpp rotationBuffer.putMessage begin [5] before lock [s6]
03:56:43 '170 35a2_cpp rotationBuffer.putMessage begin [5] [s6]
03:56:43 '170 35a2_cpp rotationBuffer.putMessage end next=[6]
03:56:43 '170 35a2_cpp ___(C) CNS.received.ack begin [6] [s6]
03:56:43 '170 35a2_cpp ___(C) CNS.received.ack end [6]
03:56:43 '170 35a2_cpp ___(C) CNS.received end [s6]
03:56:43 '176 35a2_cpp rotationBuffer.removeMessage begin [0] [s1]
03:56:43 '176 35a2_cpp rotationBuffer.removeMessage end return=[s1] next=[1]
03:56:43 '176 35a2_cpp (P)___ WKP.bindQueue.onSuccess [s1daze]
03:56:43 '176 35a2_cpp[0] connection.close begin
03:56:43 '176 35a2_cpp[0] channel.close begin
03:56:43 '176 35a2_cpp[0] channel.close end
03:56:43 '176 35a2_cpp[0] connection.close end
03:56:43 '182 35a2_cpp (P)___ WKP.bindQueue.publish.commited []
03:56:43 '183 35a2_cpp (P)___ WKP.SM_ ev_run end
03:56:43 '183 35a2_cpp (P)___ WKP.SM_ end
03:56:43 '183 35a2_cpp (P)___ WKP.SM_ begin
03:56:43 '183 35a2_cpp (P)___ WKP.SM_.declareQueue begin
03:56:43 '183 35a2_cpp[0] channel.open begin
03:56:43 '183 35a2_cpp[0] connection.open begin
03:56:43 '183 35a2_cpp[0] connection.open end
03:56:43 '183 35a2_cpp[0] channel.open end
03:56:43 '183 35a2_cpp (P)___ WKP.SM_.declareQueue end
03:56:43 '183 35a2_cpp (P)___ WKP.SM_.bindQueue begin
03:56:43 '183 35a2_cpp (P)___ WKP.SM_.bindQueue end
03:56:43 '183 35a2_cpp (P)___ WKP.SM_ ev_run begin
03:56:43 '187 35a2_cpp rotationBuffer.removeMessage begin [1] [s2]
03:56:43 '187 35a2_cpp rotationBuffer.removeMessage end return=[s2] next=[2]
03:56:43 '187 35a2_cpp (P)___ WKP.bindQueue.onSuccess [s2daze]
03:56:43 '187 35a2_cpp[0] connection.close begin
03:56:43 '187 35a2_cpp[0] channel.close begin
03:56:43 '187 35a2_cpp[0] channel.close end
03:56:43 '187 35a2_cpp[0] connection.close end
03:56:43 '193 35a2_cpp (P)___ WKP.bindQueue.publish.commited []
03:56:43 '193 35a2_cpp (P)___ WKP.SM_ ev_run end
03:56:43 '193 35a2_cpp (P)___ WKP.SM_ end
03:56:43 '193 35a2_cpp (P)___ WKP.SM_ begin
03:56:43 '193 35a2_cpp (P)___ WKP.SM_.declareQueue begin
03:56:43 '193 35a2_cpp[0] channel.open begin
03:56:43 '194 35a2_cpp[0] connection.open begin
03:56:43 '194 35a2_cpp[0] connection.open end
03:56:43 '194 35a2_cpp[0] channel.open end
03:56:43 '194 35a2_cpp (P)___ WKP.SM_.declareQueue end
03:56:43 '194 35a2_cpp (P)___ WKP.SM_.bindQueue begin
03:56:43 '194 35a2_cpp (P)___ WKP.SM_.bindQueue end
03:56:43 '194 35a2_cpp (P)___ WKP.SM_ ev_run begin
03:56:43 '198 35a2_cpp rotationBuffer.removeMessage begin [2] [s3]
03:56:43 '198 35a2_cpp rotationBuffer.removeMessage end return=[s3] next=[3]
03:56:43 '198 35a2_cpp (P)___ WKP.bindQueue.onSuccess [s3daze]
03:56:43 '198 35a2_cpp[0] connection.close begin
03:56:43 '198 35a2_cpp[0] channel.close begin
03:56:43 '198 35a2_cpp[0] channel.close end
03:56:43 '198 35a2_cpp[0] connection.close end
03:56:43 '199 35a2_cpp (P)___ WKP.bindQueue.publish.commited []
03:56:43 '200 35a2_cpp (P)___ WKP.SM_ ev_run end
03:56:43 '200 35a2_cpp (P)___ WKP.SM_ end
03:56:43 '200 35a2_cpp (P)___ WKP.SM_ begin
03:56:43 '200 35a2_cpp (P)___ WKP.SM_.declareQueue begin
03:56:43 '200 35a2_cpp[0] channel.open begin
03:56:43 '200 35a2_cpp[0] connection.open begin
03:56:43 '200 35a2_cpp[0] connection.open end
03:56:43 '200 35a2_cpp[0] channel.open end
03:56:43 '200 35a2_cpp (P)___ WKP.SM_.declareQueue end
03:56:43 '200 35a2_cpp (P)___ WKP.SM_.bindQueue begin
03:56:43 '200 35a2_cpp (P)___ WKP.SM_.bindQueue end
03:56:43 '200 35a2_cpp (P)___ WKP.SM_ ev_run begin
03:56:43 '204 35a2_cpp rotationBuffer.removeMessage begin [3] [s4]
03:56:43 '204 35a2_cpp rotationBuffer.removeMessage end return=[s4] next=[4]
03:56:43 '204 35a2_cpp (P)___ WKP.bindQueue.onSuccess [s4daze]
03:56:43 '204 35a2_cpp[0] connection.close begin
03:56:43 '204 35a2_cpp[0] channel.close begin
03:56:43 '204 35a2_cpp[0] channel.close end
03:56:43 '204 35a2_cpp[0] connection.close end
03:56:43 '205 35a2_cpp (P)___ WKP.bindQueue.publish.commited []
03:56:43 '206 35a2_cpp (P)___ WKP.SM_ ev_run end
03:56:43 '206 35a2_cpp (P)___ WKP.SM_ end
03:56:43 '206 35a2_cpp (P)___ WKP.SM_ begin
03:56:43 '206 35a2_cpp (P)___ WKP.SM_.declareQueue begin
03:56:43 '206 35a2_cpp[0] channel.open begin
03:56:43 '206 35a2_cpp[0] connection.open begin
03:56:43 '206 35a2_cpp[0] connection.open end
03:56:43 '206 35a2_cpp[0] channel.open end
03:56:43 '206 35a2_cpp (P)___ WKP.SM_.declareQueue end
03:56:43 '206 35a2_cpp (P)___ WKP.SM_.bindQueue begin
03:56:43 '206 35a2_cpp (P)___ WKP.SM_.bindQueue end
03:56:43 '206 35a2_cpp (P)___ WKP.SM_ ev_run begin
03:56:43 '218 35a2_cpp rotationBuffer.removeMessage begin [4] [s5]
03:56:43 '218 35a2_cpp rotationBuffer.removeMessage end return=[s5] next=[5]
03:56:43 '218 35a2_cpp (P)___ WKP.bindQueue.onSuccess [s5daze]
03:56:43 '218 35a2_cpp[0] connection.close begin
03:56:43 '218 35a2_cpp[0] channel.close begin
03:56:43 '218 35a2_cpp[0] channel.close end
03:56:43 '218 35a2_cpp[0] connection.close end
03:56:43 '219 35a2_cpp (P)___ WKP.bindQueue.publish.commited []
03:56:43 '220 35a2_cpp (P)___ WKP.SM_ ev_run end
03:56:43 '220 35a2_cpp (P)___ WKP.SM_ end
03:56:43 '220 35a2_cpp (P)___ WKP.SM_ begin
03:56:43 '220 35a2_cpp (P)___ WKP.SM_.declareQueue begin
03:56:43 '220 35a2_cpp[0] channel.open begin
03:56:43 '220 35a2_cpp[0] connection.open begin
03:56:43 '220 35a2_cpp[0] connection.open end
03:56:43 '220 35a2_cpp[0] channel.open end
03:56:43 '220 35a2_cpp (P)___ WKP.SM_.declareQueue end
03:56:43 '220 35a2_cpp (P)___ WKP.SM_.bindQueue begin
03:56:43 '220 35a2_cpp (P)___ WKP.SM_.bindQueue end
03:56:43 '220 35a2_cpp (P)___ WKP.SM_ ev_run begin
03:56:43 '225 35a2_cpp rotationBuffer.removeMessage begin [5] [s6]
03:56:43 '225 35a2_cpp rotationBuffer.removeMessage end return=[s6] next=[6]
03:56:43 '225 35a2_cpp (P)___ WKP.bindQueue.onSuccess [s6daze]
03:56:43 '225 35a2_cpp[0] connection.close begin
03:56:43 '225 35a2_cpp[0] channel.close begin
03:56:43 '225 35a2_cpp[0] channel.close end
03:56:43 '225 35a2_cpp[0] connection.close end
03:56:43 '226 35a2_cpp (P)___ WKP.bindQueue.publish.commited []
03:56:43 '227 35a2_cpp (P)___ WKP.SM_ ev_run end
03:56:43 '227 35a2_cpp (P)___ WKP.SM_ end
03:56:43 '227 35a2_cpp (P)___ WKP.SM_ begin
03:56:43 '227 35a2_cpp (P)___ WKP.SM_.declareQueue begin
03:56:43 '227 35a2_cpp[0] channel.open begin
03:56:43 '227 35a2_cpp[0] connection.open begin
03:56:43 '227 35a2_cpp[0] connection.open end
03:56:43 '227 35a2_cpp[0] channel.open end
03:56:43 '227 35a2_cpp (P)___ WKP.SM_.declareQueue end
03:56:43 '227 35a2_cpp (P)___ WKP.SM_.bindQueue begin
03:56:43 '227 35a2_cpp (P)___ WKP.SM_.bindQueue end
03:56:43 '227 35a2_cpp (P)___ WKP.SM_ ev_run begin
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) s1daze
34_cs <-------- [interrupt!] Dequeue(^q^) s2daze
34_cs <-------- [interrupt!] Dequeue(^q^) s3daze
34_cs <-------- [interrupt!] Dequeue(^q^) s4daze
34_cs <-------- [interrupt!] Dequeue(^q^) s5daze
34_cs <-------- [interrupt!] Dequeue(^q^) s6daze
まあ、6件溜まっていれば、初回に6連発が飛んでくるわな。
ne
34_cs publish begin [ne]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > usi
34_cs publish begin [usi]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) usidaze
tora
34_cs publish begin [tora]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) toradaze
u
34_cs publish begin [u]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) udaze
tatu
34_cs publish begin [tatu]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) tatudaze
それでも 最初の1件が 返ってこない。
./tamesi34_cs.exe
の起動直後に 問題があるのだろうか?
C#の方、接続を2つ用意しては
Windows 10 だと接続1つを使いまわしても動いているが、Ubuntu 16.04 だと分からない。
センド用、レシーブ用に接続を2つ用意するか。
関係なかった。
10秒待つと
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
34_cs publish begin [a1]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) a1daze
うまいこといく。
じゃあ、プログラムは問題なくて、何秒待てばいいのか、という話し か。
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > 1
14:01:55 104 34_cs ___(C) start begin
14:01:55 110 34_cs consumer.create [1] begin
14:01:55 110 34_cs consumer.create [1] end
14:01:55 111 34_cs channel.open [1] begin
14:01:55 111 34_cs connection.open begin
14:01:55 289 34_cs connection.open end
14:01:55 294 34_cs channel.open [1] end
14:01:55 296 34_cs ___(C) start end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
14:01:57 351 34_cs (P)___ enqueue begin [a1]
14:01:57 352 34_cs channel.open [0] begin
14:01:57 352 34_cs connection.open begin
14:01:57 370 34_cs connection.open end
14:01:57 372 34_cs channel.open [0] end
14:01:57 376 34_cs (P)___ enqueue end
14:01:57 379 34_cs connection.close [0] begin
14:01:57 380 34_cs channel.close [0] begin
14:01:57 386 34_cs channel.close [0] end
14:01:57 392 34_cs connection.close [0] end
Enqueue? > b1
14:02:00 710 34_cs (P)___ enqueue begin [b1]
14:02:00 711 34_cs channel.open [0] begin
14:02:00 711 34_cs connection.open begin
14:02:00 720 34_cs connection.open end
14:02:00 722 34_cs channel.open [0] end
14:02:00 722 34_cs (P)___ enqueue end
14:02:00 722 34_cs connection.close [0] begin
14:02:00 722 34_cs channel.close [0] begin
14:02:00 724 34_cs channel.close [0] end
14:02:00 726 34_cs connection.close [0] end
Enqueue? > 14:02:00 749 34_cs <-------- [interrupt!] Dequeue(^q^) b1daze
これが失敗するとき。
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > 1
14:03:06 830 34_cs ___(C) start begin
14:03:06 837 34_cs consumer.create [1] begin
14:03:06 837 34_cs consumer.create [1] end
14:03:06 838 34_cs channel.open [1] begin
14:03:06 838 34_cs connection.open begin
14:03:07 031 34_cs connection.open end
14:03:07 037 34_cs channel.open [1] end
14:03:07 040 34_cs ___(C) start end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
14:03:14 278 34_cs (P)___ enqueue begin [a1]
14:03:14 278 34_cs channel.open [0] begin
14:03:14 278 34_cs connection.open begin
14:03:14 293 34_cs connection.open end
14:03:14 295 34_cs channel.open [0] end
14:03:14 301 34_cs (P)___ enqueue end
14:03:14 301 34_cs connection.close [0] begin
14:03:14 302 34_cs channel.close [0] begin
14:03:14 308 34_cs channel.close [0] end
14:03:14 316 34_cs <-------- [interrupt!] Dequeue(^q^) a1daze
14:03:14 319 34_cs connection.close [0] end
これが成功するとき。
3秒だと失敗して、7秒だと成功する。
成功するタイミングがあるんだったら、タイマーで教えてほしいぜ。
Windows 10 では
consumer.Registered += GetRegisteredHandler();
こういうイベント・ハンドラーが使えた。 Ubuntu 16.04 にはあるのか?
Dequeue ? > 1
14:23:11 570 34_cs ___(C) start begin
14:23:11 578 34_cs consumer.create [1] begin
14:23:11 579 34_cs consumer.create [1] end
14:23:11 580 34_cs channel.open [1] begin
14:23:11 580 34_cs connection.open begin
14:23:11 765 34_cs connection.open end
14:23:11 770 34_cs channel.open [1] end
14:23:11 772 34_cs <-------- [interrupt!] Registered(^q^)
14:23:11 773 34_cs ___(C) start end
14:23:11 773 34_cs (P)___ waitRegistered begin
14:23:11 773 34_cs (P)___ waitRegistered end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
14:23:16 984 34_cs (P)___ enqueue begin [a1]
14:23:16 984 34_cs channel.open [0] begin
14:23:16 984 34_cs connection.open begin
14:23:16 992 34_cs connection.open end
14:23:16 994 34_cs channel.open [0] end
14:23:16 998 34_cs (P)___ enqueue end
14:23:16 998 34_cs connection.close [0] begin
14:23:16 999 34_cs channel.close [0] begin
14:23:17 004 34_cs channel.close [0] end
14:23:17 009 34_cs connection.close [0] end
Enqueue? > a1
14:23:19 431 34_cs (P)___ enqueue begin [a1]
14:23:19 432 34_cs channel.open [0] begin
14:23:19 433 34_cs connection.open begin
14:23:19 443 34_cs connection.open end
14:23:19 445 34_cs channel.open [0] end
14:23:19 445 34_cs (P)___ enqueue end
14:23:19 445 34_cs connection.close [0] begin
14:23:19 445 34_cs channel.close [0] begin
14:23:19 448 34_cs channel.close [0] end
14:23:19 448 34_cs connection.close [0] end
Enqueue? > 14:23:19 469 34_cs <-------- [interrupt!] Dequeue(^q^) a1daze
関係なかった。
じゃあ逆にやっぱ C++ 側の問題なのか?
05:23:17 '000 35a2_cpp rotationBuffer.putMessage end next=[49]
05:23:17 '000 35a2_cpp ___(C) CNS.received.ack begin [49] [a1]
05:23:17 '000 35a2_cpp ___(C) CNS.received.ack end [49]
05:23:17 '000 35a2_cpp ___(C) CNS.received end [a1]
05:23:17 '001 35a2_cpp rotationBuffer.removeMessage begin [48] [a1]
05:23:17 '001 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[49]
05:23:17 '001 35a2_cpp (P)___ WKP.bindQueue.onSuccess [a1daze]
05:23:17 '001 35a2_cpp[0] connection.close begin
05:23:17 '001 35a2_cpp[0] channel.close begin
05:23:17 '001 35a2_cpp[0] channel.close end
05:23:17 '001 35a2_cpp[0] connection.close end
05:23:17 '001 35a2_cpp (P)___ WKP.SM_ ev_run end
05:23:17 '001 35a2_cpp (P)___ WKP.SM_ end
05:23:17 '001 35a2_cpp (P)___ WKP.SM_ begin
05:23:17 '002 35a2_cpp (P)___ WKP.SM_.declareQueue begin
05:23:17 '002 35a2_cpp[0] channel.open begin
05:23:17 '002 35a2_cpp[0] connection.open begin
05:23:17 '002 35a2_cpp[0] connection.open end
05:23:17 '002 35a2_cpp[0] channel.open end
05:23:17 '002 35a2_cpp (P)___ WKP.SM_.declareQueue end
05:23:17 '002 35a2_cpp (P)___ WKP.SM_.bindQueue begin
05:23:17 '002 35a2_cpp (P)___ WKP.SM_.bindQueue end
05:23:17 '002 35a2_cpp (P)___ WKP.SM_ ev_run begin
05:23:19 '447 35a2_cpp ___(C) CNS.received begin [a1]
05:23:19 '447 35a2_cpp rotationBuffer.putMessage begin [49] before lock [a1]
05:23:19 '447 35a2_cpp rotationBuffer.putMessage begin [49] [a1]
05:23:19 '447 35a2_cpp rotationBuffer.putMessage end next=[50]
05:23:19 '447 35a2_cpp ___(C) CNS.received.ack begin [50] [a1]
05:23:19 '447 35a2_cpp ___(C) CNS.received.ack end [50]
05:23:19 '447 35a2_cpp ___(C) CNS.received end [a1]
05:23:19 '457 35a2_cpp rotationBuffer.removeMessage begin [49] [a1]
05:23:19 '457 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[50]
05:23:19 '457 35a2_cpp (P)___ WKP.bindQueue.onSuccess [a1daze]
05:23:19 '457 35a2_cpp[0] connection.close begin
05:23:19 '457 35a2_cpp[0] channel.close begin
05:23:19 '458 35a2_cpp[0] channel.close end
05:23:19 '458 35a2_cpp[0] connection.close end
05:23:19 '459 35a2_cpp (P)___ WKP.bindQueue.publish.commited []
05:23:19 '466 35a2_cpp (P)___ WKP.SM_ ev_run end
05:23:19 '466 35a2_cpp (P)___ WKP.SM_ end
05:23:19 '466 35a2_cpp (P)___ WKP.SM_ begin
05:23:19 '466 35a2_cpp (P)___ WKP.SM_.declareQueue begin
05:23:19 '466 35a2_cpp[0] channel.open begin
05:23:19 '466 35a2_cpp[0] connection.open begin
05:23:19 '467 35a2_cpp[0] connection.open end
05:23:19 '467 35a2_cpp[0] channel.open end
05:23:19 '467 35a2_cpp (P)___ WKP.SM_.declareQueue end
05:23:19 '467 35a2_cpp (P)___ WKP.SM_.bindQueue begin
05:23:19 '467 35a2_cpp (P)___ WKP.SM_.bindQueue end
05:23:19 '467 35a2_cpp (P)___ WKP.SM_ ev_run begin
コミットできてないんじゃないか?
C++側でコミットに失敗するケースとは?
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > 1
14:46:47 424 34_cs ___(C) start begin
14:46:47 431 34_cs consumer.create [1] begin
14:46:47 431 34_cs consumer.create [1] end
14:46:47 433 34_cs channel.open [1] begin
14:46:47 433 34_cs connection.open begin
14:46:47 600 34_cs connection.open end
14:46:47 605 34_cs channel.open [1] end
14:46:47 607 34_cs <-------- [interrupt!] Registered(^q^)
14:46:47 608 34_cs ___(C) start end
14:46:47 608 34_cs (P)___ waitRegistered begin
14:46:47 608 34_cs (P)___ waitRegistered end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
14:46:49 703 34_cs (P)___ enqueue begin [a1]
14:46:49 703 34_cs channel.open [0] begin
14:46:49 704 34_cs connection.open begin
14:46:49 708 34_cs connection.open end
14:46:49 710 34_cs channel.open [0] end
14:46:49 713 34_cs (P)___ enqueue end
14:46:49 714 34_cs connection.close [0] begin
14:46:49 714 34_cs channel.close [0] begin
14:46:49 719 34_cs channel.close [0] end
14:46:49 724 34_cs connection.close [0] end
Enqueue? > 14:46:49 744 34_cs <-------- [interrupt!] Dequeue(^q^) a1daze
a2
14:46:51 199 34_cs (P)___ enqueue begin [a2]
14:46:51 199 34_cs channel.open [0] begin
14:46:51 199 34_cs connection.open begin
14:46:51 204 34_cs connection.open end
14:46:51 206 34_cs channel.open [0] end
14:46:51 207 34_cs (P)___ enqueue end
14:46:51 207 34_cs connection.close [0] begin
14:46:51 207 34_cs channel.close [0] begin
14:46:51 209 34_cs channel.close [0] end
14:46:51 209 34_cs connection.close [0] end
Enqueue? > 14:46:51 224 34_cs <-------- [interrupt!] Dequeue(^q^) a2daze
今度は すぐに応答が返ってきた。コメントを書き直してコンパイルして実行したが、何が違うのだろう?
05:48:09'921 35a2_cpp[0] connection.open end
05:48:09'921 35a2_cpp[0] channel.open end
05:48:09'921 35a2_cpp (P)___ WKP.SM_.declareQueue end
05:48:09'921 35a2_cpp (P)___ WKP.SM_.bindQueue begin
05:48:09'921 35a2_cpp (P)___ WKP.SM_.bindQueue end
05:48:09'921 35a2_cpp (P)___ WKP.SM_ ev_run begin
05:48:11'733 35a2_cpp ___(C) CNS.received begin [a4]
05:48:11'733 35a2_cpp rotationBuffer.putMessage begin [3] before lock [a4]
05:48:11'733 35a2_cpp rotationBuffer.putMessage begin [3] [a4]
05:48:11'733 35a2_cpp rotationBuffer.putMessage end next=[4]
05:48:11'733 35a2_cpp ___(C) CNS.received.ack begin [4] [a4]
05:48:11'733 35a2_cpp ___(C) CNS.received.ack end [4]
05:48:11'733 35a2_cpp ___(C) CNS.received end [a4]
05:48:11'747 35a2_cpp rotationBuffer.removeMessage begin [3] [a4]
05:48:11'748 35a2_cpp rotationBuffer.removeMessage end return=[a4] next=[4]
05:48:11'748 35a2_cpp (P)___ WKP.bindQueue.onSuccess next,publish message=[a4daze] ****important****
05:48:11'748 35a2_cpp[0] connection.close begin
05:48:11'748 35a2_cpp[0] channel.close begin
05:48:11'748 35a2_cpp[0] channel.close end
05:48:11'748 35a2_cpp[0] connection.close end
05:48:11'749 35a2_cpp (P)___ WKP.bindQueue.publish.commited [] **********important**********
05:48:11'751 35a2_cpp (P)___ WKP.SM_ ev_run end
05:48:11'751 35a2_cpp (P)___ WKP.SM_ end
05:48:11'751 35a2_cpp (P)___ WKP.SM_ begin
05:48:11'751 35a2_cpp (P)___ WKP.SM_.declareQueue begin
05:48:11'751 35a2_cpp[0] channel.open begin
05:48:11'751 35a2_cpp[0] connection.open begin
05:48:11'751 35a2_cpp[0] connection.open end
05:48:11'752 35a2_cpp[0] channel.open end
05:48:11'752 35a2_cpp (P)___ WKP.SM_.declareQueue end
05:48:11'752 35a2_cpp (P)___ WKP.SM_.bindQueue begin
05:48:11'752 35a2_cpp (P)___ WKP.SM_.bindQueue end
05:48:11'752 35a2_cpp (P)___ WKP.SM_ ev_run begin
05:48:12'999 35a2_cpp ___(C) CNS.received begin [a5]
05:48:12'999 35a2_cpp rotationBuffer.putMessage begin [4] before lock [a5]
05:48:12'999 35a2_cpp rotationBuffer.putMessage begin [4] [a5]
05:48:12'999 35a2_cpp rotationBuffer.putMessage end next=[5]
05:48:12'999 35a2_cpp ___(C) CNS.received.ack begin [5] [a5]
05:48:12'999 35a2_cpp ___(C) CNS.received.ack end [5]
05:48:12'999 35a2_cpp ___(C) CNS.received end [a5]
05:48:13'013 35a2_cpp rotationBuffer.removeMessage begin [4] [a5]
05:48:13'013 35a2_cpp rotationBuffer.removeMessage end return=[a5] next=[5]
05:48:13'013 35a2_cpp (P)___ WKP.bindQueue.onSuccess next,publish message=[a5daze] ****important****
05:48:13'013 35a2_cpp[0] connection.close begin
05:48:13'013 35a2_cpp[0] channel.close begin
05:48:13'013 35a2_cpp[0] channel.close end
05:48:13'013 35a2_cpp[0] connection.close end
05:48:13'014 35a2_cpp (P)___ WKP.bindQueue.publish.commited [] **********important**********
05:48:13'015 35a2_cpp (P)___ WKP.SM_ ev_run end
05:48:13'015 35a2_cpp (P)___ WKP.SM_ end
05:48:13'015 35a2_cpp (P)___ WKP.SM_ begin
05:48:13'015 35a2_cpp (P)___ WKP.SM_.declareQueue begin
05:48:13'015 35a2_cpp[0] channel.open begin
05:48:13'015 35a2_cpp[0] connection.open begin
05:48:13'015 35a2_cpp[0] connection.open end
important という目に見えやすいコメントを付けたが、
ここで commit のときに [] と空文字列になっているのが問題だ。ポインターの挙動をわたしが理解していないということだろう。何を指しているのか。
ソースコード
tamesi34_cs.cs
// OS : Windows 10
// IDE : Visual Studio 2015
// Install : NuGet : Install-Package RabbitMQ.Client -Version 4.1.1
//
// OS : Ubuntu 16.04
// Service : RabbitMQ
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
// : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
// : Manual : Command : man rabbitmqctl
// : Start : Command : (1) rabbitmq-server
// : (2) [Ctrl] + [Z]
// : (3) bg (Job Number)
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : Command : rabbitmqctl list_queues
// : // Count messages in queues.
//
// Program : this
// : Author : Satoshi TAKAHASHI (Handle. Muzudho)
// : License : MIT License https://opensource.org/licenses/MIT
// : Explain : For test. Practice of interprocess communication.
// : This program run on the background side.
// : Delete queue,
// : Or Send message to message queue.
// : Interrupt received message from queue.
// : Please settings source queue, destination queue to command line argument.
// : Explain : command line argument of life span of queue (Compositable)
// : durable The queue exists, Even if the RabbitMQ service closed.
// : autodelete The queue delete when consumers to be nothing.
// : passive Just check what queue exists. Don't use read/write to queue.
// : exclusive The queue delete when connection closed. The queue exists is this connection only.
// : Compile : Command : mcs /r:RabbitMQ.Client.dll -define:UBUNTU tamesi34_cs.cs
// : Explain : mcs Mono c# compiler
// : /r:RabbitMQ.Client.dll Append reference RabbitMQ
// : -define:UBUNTU #define UBUNTU
// : tamesi34_cs.cs Source file
// : Execute : Command : ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
// : // Run on the foreground.
// : Explain : ./tamesi34_cs.exe Executable file
// : 2> ./tamesi34_cs.err.log Standard error to log file
// : Reference : Web site: QueueDeclare (v1.0) http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
// : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
// : QueueDelete (v1.4) https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.QueueDelete(System.UInt16,System.String,System.Boolean,System.Boolean,System.Boolean)
// : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
// : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
//
//--------------------------------------------------------------------------------
// tamesi34_cs.cs
// Ubuntu version. Source code is old. API is difference.
// #define UBUNTU
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
namespace UsagiMQ
{
/// <summary>
/// This number is this program only. AMQP-CPP implements is difference.
/// AMQP-CPP implements(C++) : AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
/// </summary>
[Flags]
enum LifeSpanType
{
Durable = 1,
Autodelete = 0x1<<1,
Passive = 0x1 << 2,
Exclusive = 0x1 << 3
}
class Program
{
static string Dump()
{
StringBuilder sb = new StringBuilder();
sb.AppendLine("tamesi34_cs.cs Dump");
sb.AppendLine(" name_queues [ENQUEUE_INDEX] =[" + name_queues[ENQUEUE_INDEX]+"]");
sb.AppendLine(" lifeSpan_queues [ENQUEUE_INDEX] =[" + lifeSpan_queues[ENQUEUE_INDEX] + "]");
sb.AppendLine(" durable_lifeSpans [ENQUEUE_INDEX] =[" + durable_lifeSpans[ENQUEUE_INDEX] + "]");
sb.AppendLine(" autodelete_lifeSpans[ENQUEUE_INDEX] =[" + autodelete_lifeSpans[ENQUEUE_INDEX] + "]");
sb.AppendLine(" passive_lifeSpans [ENQUEUE_INDEX] =[" + passive_lifeSpans[ENQUEUE_INDEX] + "]");
sb.AppendLine(" exclusive_lifeSpans [ENQUEUE_INDEX] =[" + exclusive_lifeSpans[ENQUEUE_INDEX] + "]");
sb.AppendLine(" ----");
sb.AppendLine(" name_queues [DEQUEUE_INDEX] =[" + name_queues[DEQUEUE_INDEX] + "]");
sb.AppendLine(" lifeSpan_queues [DEQUEUE_INDEX] =[" + lifeSpan_queues[DEQUEUE_INDEX] + "]");
sb.AppendLine(" durable_lifeSpans [DEQUEUE_INDEX] =[" + durable_lifeSpans[DEQUEUE_INDEX] + "]");
sb.AppendLine(" autodelete_lifeSpans[DEQUEUE_INDEX] =[" + autodelete_lifeSpans[DEQUEUE_INDEX] + "]");
sb.AppendLine(" passive_lifeSpans [DEQUEUE_INDEX] =[" + passive_lifeSpans[DEQUEUE_INDEX] + "]");
sb.AppendLine(" exclusive_lifeSpans [DEQUEUE_INDEX] =[" + exclusive_lifeSpans[DEQUEUE_INDEX] + "]");
sb.AppendLine(" ----");
sb.AppendLine(" name_queues [DELETEQUEUE_INDEX]=[" + name_queues[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" lifeSpan_queues [DELETEQUEUE_INDEX]=[" + lifeSpan_queues[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" durable_lifeSpans [DELETEQUEUE_INDEX]=[" + durable_lifeSpans[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" autodelete_lifeSpans[DELETEQUEUE_INDEX]=[" + autodelete_lifeSpans[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" passive_lifeSpans [DELETEQUEUE_INDEX]=[" + passive_lifeSpans[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" exclusive_lifeSpans [DELETEQUEUE_INDEX]=[" + exclusive_lifeSpans[DELETEQUEUE_INDEX] + "]");
sb.AppendLine(" ----");
return sb.ToString();
}
static string GetNow()
{
return DateTime.Now.ToString("HH:mm:ss fff ");
}
const int ENQUEUE_INDEX = 0;
const int DEQUEUE_INDEX = 1;
const int DELETEQUEUE_INDEX = 2;
const int NUM_INDEX = 3;
const string HOST_NAME = "localhost";
static string[] name_queues = new string[NUM_INDEX];
static LifeSpanType[] lifeSpan_queues = new LifeSpanType[NUM_INDEX];
static bool[]
durable_lifeSpans = new bool[NUM_INDEX],
autodelete_lifeSpans = new bool[NUM_INDEX],
passive_lifeSpans = new bool[NUM_INDEX],
exclusive_lifeSpans = new bool[NUM_INDEX];
static void SetLifeSpan(int index_queue, string name_queue, LifeSpanType lifeSpan)
{
name_queues[index_queue] = name_queue;
lifeSpan_queues[index_queue] = lifeSpan;
// 一旦クリアー
durable_lifeSpans[index_queue] = false;
autodelete_lifeSpans[index_queue] = false;
passive_lifeSpans[index_queue] = false;
exclusive_lifeSpans[index_queue] = false;
// durable
if (((int)lifeSpan & (int)LifeSpanType.Durable) == (int)LifeSpanType.Durable) { durable_lifeSpans[index_queue] = true; }
// autodelete
if (((int)lifeSpan & (int)LifeSpanType.Autodelete) == (int)LifeSpanType.Autodelete) { autodelete_lifeSpans[index_queue] = true; }
// passive
if (((int)lifeSpan & (int)LifeSpanType.Passive) == (int)LifeSpanType.Passive) { passive_lifeSpans[index_queue] = true; }
// exclusive
if (((int)lifeSpan & (int)LifeSpanType.Exclusive) == (int)LifeSpanType.Exclusive) { exclusive_lifeSpans[index_queue] = true; }
}
public static ConnectionFactory GetFactory()
{
if (null == m_factory_)
{
m_factory_ = new ConnectionFactory() { HostName = HOST_NAME };
}
return m_factory_;
}
static ConnectionFactory m_factory_;
public static IConnection GetConnection(int index)
{
if (null == m_connection_[index])
{
Console.WriteLine(GetNow()+"34_cs connection.open begin");
m_connection_[index] = GetFactory().CreateConnection();
Console.WriteLine(GetNow() + "34_cs connection.open end");
}
return m_connection_[index];
}
static IConnection[] m_connection_ = new IConnection[NUM_INDEX];
public static IModel GetChannel(int index)
{
if (null == m_channels_[index])
{
Console.WriteLine(GetNow() + "34_cs channel.open [" + index + "] begin");
m_channels_[index] = GetConnection(index).CreateModel();
// 引数が7つの QueueDeclare でエラーを吐くことがあるので、情報を出力しておく。
//Console.Error.WriteLine(Dump());
//Console.Error.WriteLine("(tamesi34_cs.cs?) GetChannel index = [" + index + "]");
//Console.Error.WriteLine(" name_queues[index] = [" + name_queues[index] + "]");
//Console.Error.WriteLine(" passive_lifeSpans[index] = [" + passive_lifeSpans[index] + "]");
//Console.Error.WriteLine(" durable_lifeSpans[index] = [" + durable_lifeSpans[index] + "]");
//Console.Error.WriteLine(" exclusive_lifeSpans[index] = [" + exclusive_lifeSpans[index] + "]");
//Console.Error.WriteLine(" autodelete_lifeSpans[index] = [" + autodelete_lifeSpans[index] + "]");
//Console.Error.WriteLine(" nowait は仮に false 固定");
//Console.Error.WriteLine(" arguments は仮に null 固定");
#if UBUNTU
// Ubuntuでは何故か Spring.Messaging.Amqp.Rabbit の引数 7 つのやつになっている。
// RabbitMQ.Client.Framing.Impl.v0_8.Model.QueueDeclare (System.String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Boolean nowait, IDictionary arguments)
// そして false になっているようだ。
m_channels_[index].QueueDeclare(name_queues[index], passive_lifeSpans[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], false, null);
//m_channels_[index].QueueDeclare(name_queues[index], false, false, false, false, false, null);
#else
m_channels_[index].QueueDeclare(name_queues[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], null);
#endif
Console.WriteLine(GetNow() + "34_cs channel.open [" + index + "] end");
}
return m_channels_[index];
}
static IModel[] m_channels_ = new IModel[NUM_INDEX];
public static EventingBasicConsumer GetConsumer(int index)
{
if (null == m_consumers_[index])
{
Console.WriteLine(GetNow() + "34_cs consumer.create [" + index + "] begin");
#if UBUNTU
// Ubuntuでは何故か v1.4.0 の引数が 0 個のやつになっている。調べたが引数が1個~6個のものは無かった。
m_consumers_[index] = new EventingBasicConsumer();
#else
m_consumers_[index] = new EventingBasicConsumer(GetChannel(index));
#endif
Console.WriteLine(GetNow() + "34_cs consumer.create [" + index + "] end");
}
return m_consumers_[index];
}
static EventingBasicConsumer[] m_consumers_ = new EventingBasicConsumer[NUM_INDEX];
/// <summary>
/// コンシューマーが準備できたときに割り込んでくる処理
/// </summary>
#if UBUNTU
public static ConsumerEventHandler GetRegisteredHandler()
#else
public static EventHandler<ConsumerEventArgs> GetRegisteredHandler()
#endif
{
if (null == m_registeredHandler_)
{
#if UBUNTU
m_registeredHandler_ = new ConsumerEventHandler((model, ea) =>
#else
m_registeredHandler_ = new EventHandler<ConsumerEventArgs>((model, ea) =>
#endif
{
Console.WriteLine(GetNow() + "34_cs <-------- [interrupt!] Registered(^q^)");
flag_registered = true;
});
}
return m_registeredHandler_;
}
#if UBUNTU
static ConsumerEventHandler m_registeredHandler_;
#else
static EventHandler<ConsumerEventArgs> m_registeredHandler_;
#endif
static bool flag_registered;
/// <summary>
/// 受信できたときに割り込んでくる処理
/// </summary>
#if UBUNTU
public static BasicDeliverEventHandler GetDequeueHandler()
#else
public static EventHandler<BasicDeliverEventArgs> GetDequeueHandler()
#endif
{
if (null == m_dequeueHandler_)
{
#if UBUNTU
m_dequeueHandler_ = new BasicDeliverEventHandler((model, ea) =>
#else
m_dequeueHandler_ = new EventHandler<BasicDeliverEventArgs>((model, ea) =>
#endif
{
byte[] body = ea.Body;
string message = Encoding.UTF8.GetString(body);
Console.WriteLine(GetNow() + "34_cs <-------- [interrupt!] Dequeue(^q^) {0}", message);
});
}
return m_dequeueHandler_;
}
#if UBUNTU
static BasicDeliverEventHandler m_dequeueHandler_;
#else
static EventHandler<BasicDeliverEventArgs> m_dequeueHandler_;
#endif
/// <summary>
/// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
/// </summary>
static void CloseConnection(int index)
{
if (null != m_connection_[index])
{
Console.WriteLine(GetNow() + "34_cs connection.close [" + index + "] begin");
// フラグとして、ヌルを入れる
CloseChannel(index);
m_connection_[index].Close();
m_connection_[index] = null;
Console.WriteLine(GetNow() + "34_cs connection.close [" + index + "] end");
}
}
/// <summary>
/// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
/// </summary>
static void CloseChannel(int index)
{
if (null != m_channels_[index])
{
Console.WriteLine(GetNow() + "34_cs channel.close [" + index + "] begin");
m_channels_[index].Close();
m_channels_[index] = null;
Console.WriteLine(GetNow() + "34_cs channel.close [" + index + "] end");
}
}
static void Main(string[] args)
{
//----------------------------------------
// Delete
//----------------------------------------
for (;;)
{
Console.Write(@"削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
Name : キューを削除します
"""" + [Enter] : 次のステップへ進む
ex.) BrokenBox
Delete ? > ");
string queueName_delete = Console.ReadLine();
if (""== queueName_delete.Trim())
{
break;
}
Console.Write(@"削除するメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
("+ ((int)LifeSpanType.Durable) + @") durable : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Delete ? > ");
LifeSpanType lifeSpan_delete = (LifeSpanType)int.Parse(Console.ReadLine());
SetLifeSpan(DELETEQUEUE_INDEX, queueName_delete, lifeSpan_delete);
uint result = DeleteQueue();
Console.WriteLine(@"["+ queueName_delete + "]キューを削除したはずだぜ☆(^~^) result=["+ result + "] ただし、中にメッセージが残っていると消すのに失敗するみたいだが。");
}
//----------------------------------------
// Enqueue settings
//----------------------------------------
for (;;)
{
Console.Write(@"エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) SunnyBox
Enqueue ? > ");
string queueName_enqueue = Console.ReadLine();
Console.Write(@"エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(" + ((int)LifeSpanType.Durable) + @") durable : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive : この接続でだけ使える。この接続が切れたら消す
ex.) 3
Enqueue ? > ");
int lifeSpan_enqueue;
if(int.TryParse(Console.ReadLine(),out lifeSpan_enqueue))
{
SetLifeSpan(ENQUEUE_INDEX, queueName_enqueue, (LifeSpanType)lifeSpan_enqueue);
break;
}
}
//----------------------------------------
// Enqueue settings
//----------------------------------------
EventingBasicConsumer consumer = null;
for (;;)
{
Console.Write(@"デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > ");
string queueName_dequeue = Console.ReadLine();
Console.Write(@"デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(" + ((int)LifeSpanType.Durable) + @") durable : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > ");
int lifeSpan_dequeue;
if(int.TryParse(Console.ReadLine(),out lifeSpan_dequeue))
{
SetLifeSpan(DEQUEUE_INDEX, queueName_dequeue, (LifeSpanType)lifeSpan_dequeue);
break;
}
}
StartConsume(out consumer);
Console.WriteLine(GetNow() + "34_cs (P)___ waitRegistered begin");
while (!flag_registered)
{
Thread.Sleep(20);
}
Console.WriteLine(GetNow() + "34_cs (P)___ waitRegistered end");
Console.Write(@"終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > ");
for (;;)
{
// "Hello World!" などを入力
string line = Console.ReadLine();
Enqueue(line);
Console.Write(@"Enqueue? > ");
}
// ここには来ない
// CloseConnection();
}
static uint DeleteQueue()
{
IModel channel = GetChannel(DELETEQUEUE_INDEX);
#if UBUNTU
// Ubuntuでは何故か 昔の .Net 用の引数 5 つのやつの変則版になっている。
// uint QueueDelete(ushort ticket, string queue, bool ifUnused, bool ifEmpty, bool nowait)
uint result = channel.QueueDelete( name_queues[DELETEQUEUE_INDEX],true,true,true);
#else
uint result = channel.QueueDelete(name_queues[DELETEQUEUE_INDEX],true,true);
#endif
// 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
CloseConnection(DELETEQUEUE_INDEX);
//CloseChannel(DELETEQUEUE_INDEX);
return result;
}
static void Enqueue(string message)
{
byte[] body = Encoding.UTF8.GetBytes(message);
Console.WriteLine(GetNow() + "34_cs (P)___ enqueue begin [" + message + "]");
GetChannel(ENQUEUE_INDEX).BasicPublish("", name_queues[ENQUEUE_INDEX], null, body);
Console.WriteLine(GetNow() + "34_cs (P)___ enqueue end");
// 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
CloseConnection(ENQUEUE_INDEX);
//CloseChannel(ENQUEUE_INDEX);
}
static void StartConsume(out EventingBasicConsumer consumer)
{
Console.WriteLine(GetNow() + "34_cs ___(C) start begin");
consumer = GetConsumer(DEQUEUE_INDEX); // ローカル変数だとスコープが切れて寿命が尽きてしまうので、外側に返す
// 受信できたときに割り込んでくる処理
consumer.Received += GetDequeueHandler();
consumer.Registered += GetRegisteredHandler();
#if UBUNTU
// Ubuntuでは何故か引数が 6 個のやつに似ている。
// string BasicConsume(ushort ticket, string queue, bool noAck, string consumerTag, IDictionary filter, IBasicConsumer consumer)
GetChannel(DEQUEUE_INDEX).BasicConsume( name_queues[DEQUEUE_INDEX], true, "", null, consumer);
#else
GetChannel(DEQUEUE_INDEX).BasicConsume( name_queues[DEQUEUE_INDEX], true, consumer);
#endif
// 終了はさせない
// consumer.Received -= GetReceiveHandler();
// CloseChannel(DEQUEUE_INDEX);
Console.WriteLine(GetNow() + "34_cs ___(C) start end");
}
}
}
tamesi35a2_cpp.cpp
//--------------------------------------------------------------------------------
// OS : Windows10 : It not work. This program for Ubuntu 16.04.
//
// OS : Ubuntu 16.04
// Library : libev
// : Install : Command : sudo apt-get update
// : sudo apt-get install libev-dev
// Service : RabbitMQ
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
// : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
// : Manual : Command : man rabbitmqctl
// : Start : Command : (1) rabbitmq-server
// : (2) [Ctrl] + [Z]
// : (3) bg (Job Number)
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : : Command : rabbitmqctl list_queues
// : // Count messages in queues.
// Library : AMQP-CPP
// : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
// : Reference : Web site : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
// : Reference : Web site : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
// : Reference : Web site : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
// : Reference : Web site : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
// : Reference : Web site : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//
// Program : this
// : Author : Satoshi TAKAHASHI (Handle. Muzudho)
// : License : MIT License https://opensource.org/licenses/MIT
// : Explain : For test. Practice of interprocess communication.
// : This program run on the background side.
// : Append daze to received message on interprocess communication.
// : Daze meaning is dialect in the suburbs of Tokyo.
// : Please settings source queue, destination queue to command line argument.
// : And standard input to /dev/null
// : Compile : Command : g++ -std=c++11 tamesi35a2_cpp.cpp -o tamesi35a2_cpp.exe -lev -lamqpcpp -pthread
// : Explain : g++ C++ Compiler
// : -std=c++11 C++ 11
// : tamesi35a2_cpp.cpp Source file
// : -o tamesi35a2_cpp.exe Output executable file
// : -lev Used library libev
// : -lamqpcpp Used library AMQP-CPP
// : -pthread Used library thread
// : Execute : Command : // Run on the background.
// : ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
// : Explain : ./tamesi35a2_cpp.exe Executable file
// : --enqueue Destination queue settings section
// : 1113 Destination queue name (string ok)
// : durable Destination queue life span (multiple)
// : --dequeue Source queue settings section
// : 1112 Source queue name (string ok)
// : durable Source queue life span
// : autodelete Source queue life span
// : > ./tamesi35a2_cpp.out.log Standard output to ./tamesi35a2_cpp.out.log
// : 2> ./tamesi35a2_cpp.err.log Standard error to ./tamesi35a2_cpp.err.log
// : < /dev/null Standard input from nothing
// : & Process goto the background
// : Explain : command line argument of life span of queue (Compositable)
// : durable The queue exists, Even if the RabbitMQ service closed.
// : autodelete The queue delete when consumers to be nothing.
// : passive Just check what queue exists. Don't use read/write to queue.
// : exclusive The queue delete when connection closed. The queue exists is this connection only.
// : Example : Windows 10 (This program not work) default queue life span. durable
// : Ubuntu 16.04 default queue life span. durable autodelete
// : Stop : Typing : [Ctrl]+[C]
//--------------------------------------------------------------------------------
#include <string> // std::string
#include <iostream> // std::cout
#include <iomanip> // std::setfill(...)
#include <sstream> // std::ostringstream
#include <chrono>
#include <thread> // sleep_for
#include <future> // thread
// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
// AMQP-CPPでの実装 :
// AMQP::durable=[1] RabbitMQが止まってもキューを残す
// AMQP::autodelete=[2] コンシューマーが1人も接続していなかったら消す
// AMQP::passive=[8] キューが存在するかどうかチェックするだけ。中身見ない時これ
// AMQP::exclusive=[512] この接続でだけ使える。この接続が切れたら消す
typedef int LifeSpan_t;
static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
// 0 : enqueue用設定
// 1 : dequeue用設定
static const int ENQUEUE_INDEX = 0;
static const int DEQUEUE_INDEX = 1;
static const int NUM_INDEX = 2;
static std::string name_queues[] = { "", "" };
static LifeSpan_t lifeSpan_queues[] = { (LifeSpan_t)0, (LifeSpan_t)0 };
static LifeSpan_t LifeSpanString_To_Int(std::string lifeSpan)
{
if ("durable"==lifeSpan) {
return AMQP::durable;
} else if ("autodelete" == lifeSpan) {
return AMQP::autodelete;
} else if ("passive" == lifeSpan) {
return AMQP::passive;
} else if ("exclusive" == lifeSpan) {
return AMQP::exclusive;
} else {
std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
exit(1);
}
}
std::string exchange_name = "myexchange";
std::string routing_key = "";
static std::string Dump()
{
static std::ostringstream sb;
sb << "tamesi35a2_cs.cs Dump" << std::endl
<< " name_queues [ENQUEUE_INDEX] =[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
<< " lifeSpan_queues [ENQUEUE_INDEX] =[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl
<< " ----" << std::endl
<< " name_queues [DEQUEUE_INDEX] =[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl
<< " lifeSpan_queues [DEQUEUE_INDEX] =[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl
<< " ----" << std::endl;
return sb.str();
}
// 時計はずれているが、現在時刻が返ってくる
static std::string getTimeslipNow()
{
// どこか外国の 1969年12月31日19時ぐらいから数えたミリ秒だろうか?
std::chrono::milliseconds ms = std::chrono::duration_cast< std::chrono::milliseconds >(
std::chrono::system_clock::now().time_since_epoch()
);
long long hours = ms.count() / 1000 / 60 / 60 % 24;
long long minutes = ms.count() / 1000 / 60 % 60;
long long seconds = ms.count() / 1000 % 60;
long long milliseconds = ms.count() % 1000;
std::ostringstream oss;
oss << std::setfill('0') << std::setw(2) << hours << ":" << std::setfill('0') << std::setw(2) << minutes << ":" << std::setfill('0') << std::setw(2) << seconds << "'" << std::setfill('0') << std::setw(3) << milliseconds << " ";
return oss.str();
}
// 接続はシングルトンにします
static struct ev_loop* pLoop_ev[2] = { ev_loop_new(), ev_loop_new() };
static AMQP::LibEvHandler* pHandler_ev[2] = { new AMQP::LibEvHandler(pLoop_ev[ENQUEUE_INDEX]), new AMQP::LibEvHandler(pLoop_ev[DEQUEUE_INDEX]) };
static AMQP::TcpConnection* pConnection_ev[2] = { nullptr,nullptr };
static AMQP::TcpConnection* getConnection(int index)
{
if (nullptr== pConnection_ev[index]) {
std::cout << getTimeslipNow() << " 35a2_cpp[" << index << "] connection.open begin" << std::endl;
pConnection_ev[index] = new AMQP::TcpConnection( pHandler_ev[index], ADDRESS );
std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] connection.open end" << std::endl;
}
return pConnection_ev[index];
}
static void closeChannel(int index);
static void closeConnection(int index)
{
if (nullptr != pConnection_ev[index]) {
std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] connection.close begin" << std::endl;
// チャンネルにもヌルのフラグを入れる
closeChannel(index);
pConnection_ev[index]->close();
pConnection_ev[index] = nullptr;
std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] connection.close end" << std::endl;
}
}
// チャンネルはシングルトンにします。
static AMQP::TcpChannel* pChannel_ev[2] = { nullptr,nullptr };
static AMQP::TcpChannel* getChannel(int index)
{
if (nullptr== pChannel_ev[index])
{
std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] channel.open begin" << std::endl;
pChannel_ev[index] = new AMQP::TcpChannel(getConnection(index));
std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] channel.open end" << std::endl;
}
return pChannel_ev[index];
}
static void closeChannel(int index)
{
if (nullptr != pChannel_ev[index]) {
std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] channel.close begin" << std::endl;
pChannel_ev[index]->close();
pChannel_ev[index] = nullptr;
std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] channel.close end" << std::endl;
}
}
/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
const int bufferSize = 1024;
static std::string buffer[bufferSize] = {};
static int bufferCursors[2] = { 0, 0 };
const int PUT_INDEX = 0;
const int GET_INDEX = 1;
std::mutex _mutex;
static void putMessage(std::string message)
{
std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.putMessage begin [" << bufferCursors[PUT_INDEX] << "] before lock [" << message << "]" << std::endl;
std::lock_guard<std::mutex> lock(_mutex);
buffer[bufferCursors[PUT_INDEX]] = message;
std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.putMessage begin [" << bufferCursors[PUT_INDEX] << "] [" << message << "]" << std::endl;
bufferCursors[PUT_INDEX]++;
if (!(bufferCursors[PUT_INDEX] < bufferSize))
{
bufferCursors[PUT_INDEX] = 0;
}
std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.putMessage end next=[" << bufferCursors[PUT_INDEX] << "]" << std::endl;
}
static std::string removeMessage()
{
//std::cout << "35a2_cpp rotationBuffer.removeMessage begin [" << bufferCursors[GET_INDEX] << "] before lock" << std::endl;
std::lock_guard<std::mutex> lock(_mutex);
if ("" != buffer[bufferCursors[GET_INDEX]])
{
std::string message = buffer[bufferCursors[GET_INDEX]];
std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.removeMessage begin [" << bufferCursors[GET_INDEX] << "] [" << message << "]" << std::endl;
buffer[bufferCursors[GET_INDEX]] = "";
bufferCursors[GET_INDEX]++;
if (!(bufferCursors[GET_INDEX] < bufferSize))
{
bufferCursors[GET_INDEX] = 0;
}
std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.removeMessage end return=[" << message << "] next=[" << bufferCursors[GET_INDEX] << "]" << std::endl;
return message;
}
return "";
}
}
static void sendMessage()
{
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_ begin" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_.declareQueue begin" << std::endl;
getChannel(ENQUEUE_INDEX)->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan_queues[ENQUEUE_INDEX])
.onError([](const char* errMsg) {
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.SM_.declareQueue.onError [" << errMsg << "] queue name=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan=[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << "\n";
});
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_.declareQueue end" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_.bindQueue begin" << std::endl;
getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
// ev_run から1回だけ呼び出される。接続を切るとev_runから抜ける。コミットするにはこのハンドラから抜けること。
.onSuccess([]() {
for (;;)
{
// 回転式バッファーからメッセージを切り取る。
std::string message = rotationBuffer::removeMessage();
if ("" != message)
{
// 末尾に daze を付ける。
message += "daze";
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.bindQueue.onSuccess next,publish message=[" << message << "] ****important**** " << std::endl;
getChannel(ENQUEUE_INDEX)->startTransaction();
if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
getChannel(ENQUEUE_INDEX)->commitTransaction()
.onSuccess([&message]() {
// パブリッシュ成功
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.bindQueue.publish.commited [" << message << "] **********important**********" << std::endl;
})
.onError([&message](const char *errMsg) {
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.publish.failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
});
}
else {
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.publish.failuer message=[" << message << "]" << std::endl;
getChannel(ENQUEUE_INDEX)->rollbackTransaction();
}
// 接続を切って ev_run ループから抜けよう。かつ、このハンドラーから抜けよう。パブリッシュのコミットに必要
closeConnection(ENQUEUE_INDEX);
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
}).onError([](const char* errMsg) {
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
});
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_.bindQueue end" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_ ev_run begin" << std::endl;
// ここで、バインドキューのオンサクセスの処理へ。
ev_run(pLoop_ev[ENQUEUE_INDEX], 0);
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_ ev_run end" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_ end" << std::endl;
}
// メッセージ・キューの送信を担当
static void workProduce()
{
std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP begin" << std::endl;
// おっきな無限ループ
for (;;) {
sendMessage();
}
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP outOfInfinityLoop error" << std::endl;
}
// メッセージ・キューの監視を開始
static void workConsume()
{
std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ begin" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp ___(C) DQ_ begin" << std::endl;
// I will go to the front of the box named "1111".
LifeSpan_t lifeSpan = lifeSpan_queues[DEQUEUE_INDEX];
getChannel(DEQUEUE_INDEX)->declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);
std::cout << getTimeslipNow() << "35a2_cpp ___(C) DQ_ end" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS begin" << std::endl;
// I look inside the box.
auto errorCb = [&lifeSpan](const char *errMsg) {
std::cerr << getTimeslipNow() << "(>_<)35a2_cpp ___(C) CNS.onError My ID watching failed [" << errMsg << "] lifeSpan=[" << lifeSpan << "]" << std::endl;
};
auto messageCb = [](const AMQP::Message &amqpMessage, uint64_t deliveryTag, bool redelivered) {
std::string message(amqpMessage.body(), amqpMessage.bodySize());
std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received begin [" << message << "]" << std::endl;
rotationBuffer::putMessage(message);
std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received.ack begin [" << deliveryTag << "] [" << message << "]" << std::endl;
getChannel(DEQUEUE_INDEX)->ack(deliveryTag);
std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received.ack end [" << deliveryTag << "]" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received end [" << message << "]" << std::endl;
};
getChannel(DEQUEUE_INDEX)->consume(name_queues[DEQUEUE_INDEX])
.onReceived(messageCb)
.onError(errorCb);
std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS end" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ ev_run begin" << std::endl;
// I will keep on forever.
ev_run(pLoop_ev[DEQUEUE_INDEX]);
std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ ev_run end" << std::endl;
std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ end" << std::endl;
// I will not come here.
return;
}
int main(int argc, char* argv[])
{
// 引数の解析
if(0<argc)
{
// プログラム名を省き、コマンドライン引数だけをつなげる。
std::string cmdArg;
for (int i = 1; i < argc; ++i)
{
cmdArg += std::string(argv[i]);
if (i < argc) {
cmdArg += " ";
}
}
std::istringstream data(cmdArg);
std::cout << getTimeslipNow() << "35a2_cpp main commandLineArguments=[" << data.str() << "]" << std::endl;
// 与件
// 「--enqueue 1117 durable autodelete --dequeue 1116 durable autodelete」
// 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」
// 受け皿
// name_queues 結果はこれらの配列に入れる
// lifeSpan_queues
// 記憶
int m0 = -1; // enqueue or dequeue index.
int m1 = -1; // lifespans index.
// 解析器
std::string a;
while (data >> a) {
if ("--enqueue" == a) { m0 = ENQUEUE_INDEX; m1 = -1; }
else if ("--dequeue" == a) { m0 = DEQUEUE_INDEX; m1 = -1; }
else if (-1 == m1 && 0 == m0) { // エンキュー先キュー名
name_queues[m0] = a;
lifeSpan_queues[m0] = 0;
m1++;
}
else if (-1 == m1 && 1 == m0) { // デキュー元キュー名
name_queues[m0] = a;
lifeSpan_queues[m0] = 0;
m1++;
}
else { lifeSpan_queues[m0] |= LifeSpanString_To_Int(a); }
}
if ("" == name_queues[ENQUEUE_INDEX] || "" == name_queues[DEQUEUE_INDEX]) {
goto gt_CommandlineParameterError;
}
std::cout << getTimeslipNow() << "35a2_cpp main dump begin" << std::endl;
std::cout << getTimeslipNow() << Dump();
std::cout << getTimeslipNow() << "35a2_cpp main dump end" << std::endl;
}
else
{
goto gt_CommandlineParameterError;
}
// キューの監視スレッド、メッセージの送信スレッドをスタート
{
auto th1 = std::thread([] { workConsume(); });
auto th2 = std::thread([] { workProduce(); });
for (;;) // このメイン・スレッドはずっと寝てる
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
// このプログラムは、自分では接続を切らないので、ここから下には来ない
th2.join();
th1.join();
}
return 0;
gt_CommandlineParameterError:
std::cerr << getTimeslipNow() << "コマンドライン引数の「--enqueue 送信先キュー名 寿命」「--dequeue 受信元キュー名 寿命」を漏れなく指定してください。" << std::endl
<< "例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete" << std::endl
<< "寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」" << std::endl
<< "queueNames[0]=[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
<< "queueNames[1]=[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl;
exit(1);
}