LoginSignup
1
1

More than 5 years have passed since last update.

コンピュータ将棋ソフトとの対局サーバーを立てよう<その14>

Last updated at Posted at 2017-03-15

前回の記事 : http://qiita.com/muzudho1/items/90eec636fc1664dbfcee

これでどうか。

// メッセージ・キューの送信を担当
static void workProduce()
{
    std::cout << "(^▽^)エンキュー開始だぜ☆" << std::endl;
    std::string exchange_name = "myexchange";
    std::string routing_key = "";

    int lifeSpan = LifeSpanString_To_AmqpInt(lifeSpan_queues[ENQUEUE_INDEX]);

    getChannel(ENQUEUE_INDEX)->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
        .onError([&lifeSpan](const char* errMsg) {
        std::cerr << "(>_<)enqueue: error declaring queue: [" << errMsg << "] queue name=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan=[" << lifeSpan << "]" << "\n";
    });
    std::cout << "(^▽^)デクレア・キューしたぜ☆" << std::endl;

    getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
        .onSuccess([&exchange_name, &routing_key]() {

            std::string message = rotationBuffer::removeMessage();
            std::cout << "(^q^) workProduce : メッセージを受け取ったか? message=[" << message << "]" << std::endl;

            // 末尾に daze を付ける。
            message += "daze";

            std::cout << "(^▽^)workProduce : これからパブリッシュするぜ☆" << std::endl;
            if (!getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
                std::cerr << "failed to publish?\n";
            }
            std::cout << "(^▽^)パブリッシュしたぜ☆" << std::endl;

            // break in ev loop.
            //closeConnection(ENQUEUE_INDEX);
            //closeChannel(ENQUEUE_INDEX);
        });
    std::cout << "(^▽^)バインド・キューしたぜ☆" << std::endl;

    std::cout << "(^q^) workProduce : メッセージ監視ループに入るぜ☆" << std::endl;
    for (;;)
    {
        if (rotationBuffer::hasMessage())
        {
            std::cout << "(^q^) workProduce : メッセージがあるぜ☆ [" << rotationBuffer.getMessage() << "]" << std::endl;

            // ここで、バインドキューのオンサクセスの処理へ。
            ev_run(pLoop_ev[ENQUEUE_INDEX]);
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(20));
    }
    std::cout << "(>_<) workProduce : メッセージ監視ループを出てしまったぜ☆?" << std::endl;
}
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^)レシーブしたぜ☆ myString=[imo]
(^◇^)回転式バッファー : putMessage : [1] 入れた☆! message=[imo]
(^◇^)回転式バッファー : putMessage : 次回は[2] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[2]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[2]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]
(^q^) workProduce : メッセージがあるぜ☆ [nanban]

なんでローテーションしてないんだ?

        }).onError([](const char* errMsg) {
            std::cerr << "(>_<) バインドキュー : error declaring queue: [" << errMsg << "]" << "\n";
        });

これを付けても エラー・ログには出ていない。

(^◇^)回転式バッファー : ゲットメッセージ : [2] 出した☆! message=[kame]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[3] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[kame]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^)レシーブしたぜ☆ myString=[usagi]
(^◇^)回転式バッファー : putMessage : [3] 入れた☆! message=[usagi]
(^◇^)回転式バッファー : putMessage : 次回は[4] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[4]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[4]
(^◇^)回転式バッファー : ゲットメッセージ : [3] 出した☆! message=[usagi]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[4] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[usagi]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^)レシーブしたぜ☆ myString=[inu]

パブリッシュが成功するのは 接続を切ったときだったか。

(^q^)レシーブ : ackしたぜ☆ deliveryTag=[4]
(^◇^)回転式バッファー : ゲットメッセージ : [3] 出した☆! message=[usagi]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[4] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[usagi]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^)レシーブしたぜ☆ myString=[inu]
(^◇^)回転式バッファー : putMessage : [4] 入れた☆! message=[inu]
(^◇^)回転式バッファー : putMessage : 次回は[5] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[5]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[5]
(^◇^)回転式バッファー : ゲットメッセージ : [4] 出した☆! message=[inu]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[5] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[inu]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^)レシーブしたぜ☆ myString=[neko]
(^◇^)回転式バッファー : putMessage : [5] 入れた☆! message=[neko]
(^◇^)回転式バッファー : putMessage : 次回は[6] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[6]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[6]
(^◇^)回転式バッファー : ゲットメッセージ : [5] 出した☆! message=[neko]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[6] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[neko]

パブリッシュできてないと思うが。

# rabbitmqctl list_queues
Listing queues ...
1112    0
1113    0
        std::cout << "(^q^) workProduce : メッセージ監視ループに入るぜ☆" << std::endl;
        // ここで、バインドキューのオンサクセスの処理へ。
        //ev_run(pLoop_ev[ENQUEUE_INDEX]);
        ev_run(pLoop_ev[ENQUEUE_INDEX], 0);
        std::cout << "(^q^) workProduce : メッセージ監視ループを出たぜ☆ 再接続しようぜ☆" << std::endl;

引数 0 を付けてみるか。

変わり無し

接続が切れるのは1回?

(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^)レシーブしたぜ☆ myString=[sika]
(^◇^)回転式バッファー : putMessage : [0] 入れた☆! message=[sika]
(^◇^)回転式バッファー : putMessage : 次回は[1] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[1]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[1]
(^◇^)回転式バッファー : ゲットメッセージ : [0] 出した☆! message=[sika]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[1] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[sika]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^▽^)これから接続を閉じるぜ☆ index=[0]
(^▽^)接続を閉じたぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆

パブリッシュのあとに接続が切れるが、2回切れることがない。ずっと切れっぱなしなのか。
そして 1回もパブリッシュされてない。

ヌルのフラグ

static void closeConnection(int index)
{
    if (nullptr != pConnection_ev[index]) {

        // チャンネルにもヌルのフラグを入れる
        closeChannel(index);

        std::cout << "(^▽^)これから接続を閉じるぜ☆ index=[" << index << "]" << std::endl;
        pConnection_ev[index]->close();
        std::cout << "(^▽^)接続を閉じたぜ☆" << std::endl;
        pConnection_ev[index] = nullptr;
    }
}

チャンネルもヌルにしよう。

(^◇^)回転式バッファー : ゲットメッセージ : [1] 出した☆! message=[usagi]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[2] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[usagi]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)パブリッシュしたぜ☆
(^▽^)これからチャンネルを閉じるぜ☆ index=[0]
(^▽^)チャンネルを閉じたぜ☆
(^▽^)これから接続を閉じるぜ☆ index=[0]
(^▽^)接続を閉じたぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^)レシーブしたぜ☆ myString=[hituzi]
(^◇^)回転式バッファー : putMessage : [2] 入れた☆! message=[hituzi]
(^◇^)回転式バッファー : putMessage : 次回は[3] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[3]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[3]
(^◇^)回転式バッファー : ゲットメッセージ : [2] 出した☆! message=[hituzi]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[3] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[hituzi]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)パブリッシュしたぜ☆
(^▽^)これからチャンネルを閉じるぜ☆ index=[0]
(^▽^)チャンネルを閉じたぜ☆
(^▽^)これから接続を閉じるぜ☆ index=[0]
(^▽^)接続を閉じたぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆

毎回 接続を閉じたり 開けたりするようになったが、パブリッシュは行われていない。

コンシューマー変数のスコープ

        static void StartConsume()
        {
            IModel channel = GetChannel(DEQUEUE_INDEX);
            EventingBasicConsumer consumer = GetConsumer(DEQUEUE_INDEX);

            // 受信できたときに割り込んでくる処理
            consumer.Received += GetDequeueHandler();

#if UBUNTU
            // Ubuntuでは何故か引数が 5 個のやつになっている。
            channel.BasicConsume( name_queues[DEQUEUE_INDEX], true, "", null, consumer);
#else
            channel.BasicConsume( name_queues[DEQUEUE_INDEX], true, consumer);
#endif
            // 終了はさせない
            // consumer.Received -= GetReceiveHandler();
            // CloseChannel(DEQUEUE_INDEX);
        }

この consumer 変数は、ローカル変数でいいんだろうか?

                    std::cout << "(^▽^)bindQueue.onSuccess : これからトランザクションを開始するぜ☆" << std::endl;
                    getChannel(ENQUEUE_INDEX)->startTransaction();
                    std::cout << "(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆" << std::endl;
                    if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
                        getChannel(ENQUEUE_INDEX)->commitTransaction()
                            .onSuccess([]() {
                            std::cout << "(^▽^)コミット成功☆!" << std::endl;
                        })
                            .onError([](const char *errMsg) {
                            std::cout << "(>_<)コミット失敗☆ errMsg=[" << errMsg << "]" << std::endl;
                        });
                        std::cout << "(^▽^)パブリッシュしたぜ☆" << std::endl;
                    }else{
                        std::cout << "(/_\)パブリッシュできなかったぜ☆" << std::endl;
                        getChannel(ENQUEUE_INDEX)->rollbackTransaction();
                    }
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^)レシーブしたぜ☆ myString=[usagi]
(^◇^)回転式バッファー : putMessage : [1] 入れた☆! message=[usagi]
(^◇^)回転式バッファー : putMessage : 次回は[2] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[2]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[2]
(^◇^)回転式バッファー : ゲットメッセージ : [1] 出した☆! message=[usagi]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[2] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[usagi]
(^▽^)bindQueue.onSuccess : これからトランザクションを開始するぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^▽^)これからチャンネルを閉じるぜ☆ index=[0]
(^▽^)チャンネルを閉じたぜ☆
(^▽^)これから接続を閉じるぜ☆ index=[0]
(^▽^)接続を閉じたぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆
(^q^) bindQueue.onSuccess : メッセージは無いぜ☆

コミットできてないのか。

(^q^)レシーブしたぜ☆ myString=[inu]
(^◇^)回転式バッファー : putMessage : [0] 入れた☆! message=[inu]
(^◇^)回転式バッファー : putMessage : 次回は[1] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[1]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[1]
(^◇^)回転式バッファー : ゲットメッセージ : [0] 出した☆! message=[inu]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[1] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[inu]
(^▽^)bindQueue.onSuccess : これからトランザクションを開始するぜ☆
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^▽^)これからチャンネルを閉じるぜ☆ index=[0]
(^▽^)チャンネルを閉じたぜ☆
(^▽^)これから接続を閉じるぜ☆ index=[0]
(^▽^)接続を閉じたぜ☆
(^q^)レシーブしたぜ☆ myString=[neko]
(^◇^)回転式バッファー : putMessage : [1] 入れた☆! message=[neko]
(^◇^)回転式バッファー : putMessage : 次回は[2] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[2]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[2]
(^◇^)回転式バッファー : ゲットメッセージ : [1] 出した☆! message=[neko]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[2] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[neko]
(^▽^)bindQueue.onSuccess : これからトランザクションを開始するぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^▽^)これからチャンネルを閉じるぜ☆ index=[0]
(^▽^)チャンネルを閉じたぜ☆
(^▽^)これから接続を閉じるぜ☆ index=[0]
(^▽^)接続を閉じたぜ☆
(^q^)レシーブしたぜ☆ myString=[saru]
(^◇^)回転式バッファー : putMessage : [2] 入れた☆! message=[saru]
(^◇^)回転式バッファー : putMessage : 次回は[3] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[3]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[3]
(^◇^)回転式バッファー : ゲットメッセージ : [2] 出した☆! message=[saru]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[3] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[saru]
(^▽^)bindQueue.onSuccess : これからトランザクションを開始するぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^▽^)これからチャンネルを閉じるぜ☆ index=[0]
(^▽^)チャンネルを閉じたぜ☆
(^▽^)これから接続を閉じるぜ☆ index=[0]
(^▽^)接続を閉じたぜ☆
(^q^)レシーブしたぜ☆ myString=[hituji]

これで パブリッシュできてないのか。

接続を1つにして、切断しないようにしてみるか

ev_run loop で構造体が被る。むり。

切断、チャンネル閉鎖ではなく、関数を抜けるか。

(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[1112] lifeSpan_queues[DEQUEUE_INDEX]=[3]
tamesi35a2_cs.cs Dump
    name_queues         [ENQUEUE_INDEX]    =[1113]
    lifeSpan_queues     [ENQUEUE_INDEX]    =[1]
    ----
    name_queues         [DEQUEUE_INDEX]    =[1112]
    lifeSpan_queues     [DEQUEUE_INDEX]    =[3]
    ----
(^q^)ダンプおわり
(^q^)コンシューム開始だぜ☆
(^q^)メインスレッドはずっと回ってるぜ☆
(^▽^)エンキュー開始だぜ☆
(^▽^)接続したぜ☆ index=[1]
(^▽^)チャンネル開いたぜ☆ index=[1]
(^q^)コンシュームは、デクレア・キューしたぜ☆
(^q^)コンシュームを設定したぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)workProduce : デクレア・キューしたぜ☆
(^▽^)workProduce : バインド・キューしたぜ☆
(^q^) workProduce : メッセージ監視ループに入るぜ☆
(^q^) bindQueue.onSuccess : 開始だぜ☆
(^q^)レシーブしたぜ☆ myString=[kame]
(^◇^)回転式バッファー : putMessage : [0] 入れた☆! message=[kame]
(^◇^)回転式バッファー : putMessage : 次回は[1] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[1]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[1]
(^◇^)回転式バッファー : ゲットメッセージ : [0] 出した☆! message=[kame]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[1] ☆!
(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[kame]
(^▽^)bindQueue.onSuccess : これからトランザクションを開始するぜ☆
(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^q^) bindQueue.onSuccess : 終了だぜ☆
(^▽^)コミット成功☆!
(^q^)レシーブしたぜ☆ myString=[usagi]
(^◇^)回転式バッファー : putMessage : [1] 入れた☆! message=[usagi]
(^◇^)回転式バッファー : putMessage : 次回は[2] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[2]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[2]
(^q^)レシーブしたぜ☆ myString=[hituji]
(^◇^)回転式バッファー : putMessage : [2] 入れた☆! message=[hituji]
(^◇^)回転式バッファー : putMessage : 次回は[3] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[3]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[3]
(^q^)レシーブしたぜ☆ myString=[neko]
(^◇^)回転式バッファー : putMessage : [3] 入れた☆! message=[neko]
(^◇^)回転式バッファー : putMessage : 次回は[4] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[4]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[4]

1回だけ コミットに成功している。なぜ2回目以降は成功しないのか?

接続を切って、関数から抜けようぜ

Enqueue? > <---- [interrupt!] Dequeue(^q^) inudaze
i
(^q^)Enqueue: エンキュー開始☆ message=[i]
tamesi34_cs.cs Dump
    name_queues         [ENQUEUE_INDEX]    =[1112]
    lifeSpan_queues     [ENQUEUE_INDEX]    =[Durable, Autodelete]
    durable_lifeSpans   [ENQUEUE_INDEX]    =[True]
    autodelete_lifeSpans[ENQUEUE_INDEX]    =[True]
    passive_lifeSpans   [ENQUEUE_INDEX]    =[False]
    exclusive_lifeSpans [ENQUEUE_INDEX]    =[False]
    ----
    name_queues         [DEQUEUE_INDEX]    =[1113]
    lifeSpan_queues     [DEQUEUE_INDEX]    =[Durable]
    durable_lifeSpans   [DEQUEUE_INDEX]    =[True]
    autodelete_lifeSpans[DEQUEUE_INDEX]    =[False]
    passive_lifeSpans   [DEQUEUE_INDEX]    =[False]
    exclusive_lifeSpans [DEQUEUE_INDEX]    =[False]
    ----
    name_queues         [DELETEQUEUE_INDEX]=[]
    lifeSpan_queues     [DELETEQUEUE_INDEX]=[0]
    durable_lifeSpans   [DELETEQUEUE_INDEX]=[False]
    autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
    passive_lifeSpans   [DELETEQUEUE_INDEX]=[False]
    exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
    ----

(tamesi34_cs.cs?) GetChannel index = [0]
    name_queues[index] = [1112]
    passive_lifeSpans[index] = [False]
    durable_lifeSpans[index] = [True]
    exclusive_lifeSpans[index] = [False]
    autodelete_lifeSpans[index] = [True]
    nowait は仮に false 固定
    arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ index=[0]
(^q^)Enqueue: body.Length=[1]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
Enqueue? > <---- [interrupt!] Dequeue(^q^) idaze
hituji
(^q^)Enqueue: エンキュー開始☆ message=[hituji]
tamesi34_cs.cs Dump
    name_queues         [ENQUEUE_INDEX]    =[1112]
    lifeSpan_queues     [ENQUEUE_INDEX]    =[Durable, Autodelete]
    durable_lifeSpans   [ENQUEUE_INDEX]    =[True]
    autodelete_lifeSpans[ENQUEUE_INDEX]    =[True]
    passive_lifeSpans   [ENQUEUE_INDEX]    =[False]
    exclusive_lifeSpans [ENQUEUE_INDEX]    =[False]
    ----
    name_queues         [DEQUEUE_INDEX]    =[1113]
    lifeSpan_queues     [DEQUEUE_INDEX]    =[Durable]
    durable_lifeSpans   [DEQUEUE_INDEX]    =[True]
    autodelete_lifeSpans[DEQUEUE_INDEX]    =[False]
    passive_lifeSpans   [DEQUEUE_INDEX]    =[False]
    exclusive_lifeSpans [DEQUEUE_INDEX]    =[False]
    ----
    name_queues         [DELETEQUEUE_INDEX]=[]
    lifeSpan_queues     [DELETEQUEUE_INDEX]=[0]
    durable_lifeSpans   [DELETEQUEUE_INDEX]=[False]
    autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
    passive_lifeSpans   [DELETEQUEUE_INDEX]=[False]
    exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
    ----

(tamesi34_cs.cs?) GetChannel index = [0]
    name_queues[index] = [1112]
    passive_lifeSpans[index] = [False]
    durable_lifeSpans[index] = [True]
    exclusive_lifeSpans[index] = [False]
    autodelete_lifeSpans[index] = [True]
    nowait は仮に false 固定
    arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ index=[0]
(^q^)Enqueue: body.Length=[6]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
Enqueue? > <---- [interrupt!] Dequeue(^q^) hitujidaze

それでOK。

tamesi34_cs.cs

// OS  : Windows 10
// IDE : Visual Studio 2015
//       Install : NuGet   : Install-Package RabbitMQ.Client -Version 4.1.1
//
// OS  : Ubuntu 16.04
//       Compile : Command : mcs /r:RabbitMQ.Client.dll -define:UBUNTU tamesi34_cs.cs
//       Execute : Command : // フォアグラウンドで実行する
//                         : ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
//       Check   : Command : // キューの中身の数を調べる
//                         : rabbitmqctl list_queues
//
// Library : RabbitMQ
//           Refference : Website : RabbitMQ http://www.rabbitmq.com/
//                      : Website : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
//                      : Website : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
//--------------------------------------------------------------------------------
// tamesi34_cs.cs

// Ubuntu の RabbitMQ はソースのバージョンが古いのか、API が異なった。
// #define UBUNTU

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace UsagiMQ
{
    /// <summary>
    /// このプログラム内だけで使われる数字。AMQP-CPPでの実装とは異なる。
    /// AMQP-CPPでの実装 : AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
    /// </summary>
    [Flags]
    enum LifeSpanType
    {
        /// <summary>
        /// RabbitMQが止まってもキューを残す
        /// </summary>
        Durable = 1,
        /// <summary>
        /// コンシューマーが1人も接続していなかったら消す
        /// </summary>
        Autodelete = 0x1<<1,
        /// <summary>
        /// キューが存在するかどうかチェックするだけ。中身見ない時これ
        /// </summary>
        Passive = 0x1 << 2,
        /// <summary>
        /// この接続でだけ使える。この接続が切れたら消す
        /// </summary>
        Exclusive = 0x1 << 3
    }

    /// <summary>
    /// メッセージを エンキューします。
    /// キューの名前は指定してください。
    /// デキューは割込みを受け付けます。
    /// 
    /// 参照 : QueueDeclare (v1.0) http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
    /// 参照 : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
    /// 参照 : QueueDelete (v1.4) https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.QueueDelete(System.UInt16,System.String,System.Boolean,System.Boolean,System.Boolean)
    /// 参照 : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
    /// 参照 : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
    /// </summary>
    class Program
    {
        static string Dump()
        {
            StringBuilder sb = new StringBuilder();
            sb.AppendLine("tamesi34_cs.cs Dump");
            sb.AppendLine("    name_queues         [ENQUEUE_INDEX]    =[" + name_queues[ENQUEUE_INDEX]+"]");
            sb.AppendLine("    lifeSpan_queues     [ENQUEUE_INDEX]    =[" + lifeSpan_queues[ENQUEUE_INDEX] + "]");
            sb.AppendLine("    durable_lifeSpans   [ENQUEUE_INDEX]    =[" + durable_lifeSpans[ENQUEUE_INDEX] + "]");
            sb.AppendLine("    autodelete_lifeSpans[ENQUEUE_INDEX]    =[" + autodelete_lifeSpans[ENQUEUE_INDEX] + "]");
            sb.AppendLine("    passive_lifeSpans   [ENQUEUE_INDEX]    =[" + passive_lifeSpans[ENQUEUE_INDEX] + "]");
            sb.AppendLine("    exclusive_lifeSpans [ENQUEUE_INDEX]    =[" + exclusive_lifeSpans[ENQUEUE_INDEX] + "]");
            sb.AppendLine("    ----");
            sb.AppendLine("    name_queues         [DEQUEUE_INDEX]    =[" + name_queues[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    lifeSpan_queues     [DEQUEUE_INDEX]    =[" + lifeSpan_queues[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    durable_lifeSpans   [DEQUEUE_INDEX]    =[" + durable_lifeSpans[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    autodelete_lifeSpans[DEQUEUE_INDEX]    =[" + autodelete_lifeSpans[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    passive_lifeSpans   [DEQUEUE_INDEX]    =[" + passive_lifeSpans[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    exclusive_lifeSpans [DEQUEUE_INDEX]    =[" + exclusive_lifeSpans[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    ----");
            sb.AppendLine("    name_queues         [DELETEQUEUE_INDEX]=[" + name_queues[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    lifeSpan_queues     [DELETEQUEUE_INDEX]=[" + lifeSpan_queues[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    durable_lifeSpans   [DELETEQUEUE_INDEX]=[" + durable_lifeSpans[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    autodelete_lifeSpans[DELETEQUEUE_INDEX]=[" + autodelete_lifeSpans[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    passive_lifeSpans   [DELETEQUEUE_INDEX]=[" + passive_lifeSpans[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    exclusive_lifeSpans [DELETEQUEUE_INDEX]=[" + exclusive_lifeSpans[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    ----");
            return sb.ToString();
        }

        const int ENQUEUE_INDEX = 0;
        const int DEQUEUE_INDEX = 1;
        const int DELETEQUEUE_INDEX = 2;
        const int NUM_INDEX = 3;
        const string HOST_NAME = "localhost";
        static string[] name_queues = new string[NUM_INDEX];
        /// <summary>
        /// キューの寿命(複合可能)
        /// durable    : RabbitMQが止まってもキューを残す
        /// autodelete : コンシューマーが1人も接続していなかったら消す
        /// passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
        /// exclusive  : この接続でだけ使える。この接続が切れたら消す
        /// </summary>
        static LifeSpanType[] lifeSpan_queues = new LifeSpanType[NUM_INDEX];
        static bool[]
            durable_lifeSpans = new bool[NUM_INDEX],
            autodelete_lifeSpans = new bool[NUM_INDEX],
            passive_lifeSpans = new bool[NUM_INDEX],
            exclusive_lifeSpans = new bool[NUM_INDEX];
        static void SetLifeSpan(int index_queue, string name_queue, LifeSpanType lifeSpan)
        {
            name_queues[index_queue] = name_queue;
            lifeSpan_queues[index_queue] = lifeSpan;

            // 一旦クリアー
            durable_lifeSpans[index_queue] = false;
            autodelete_lifeSpans[index_queue] = false;
            passive_lifeSpans[index_queue] = false;
            exclusive_lifeSpans[index_queue] = false;

            // durable
            if (((int)lifeSpan & (int)LifeSpanType.Durable) == (int)LifeSpanType.Durable) { durable_lifeSpans[index_queue] = true; }
            // autodelete
            if (((int)lifeSpan & (int)LifeSpanType.Autodelete) == (int)LifeSpanType.Autodelete) { autodelete_lifeSpans[index_queue] = true; }
            // passive
            if (((int)lifeSpan & (int)LifeSpanType.Passive) == (int)LifeSpanType.Passive) { passive_lifeSpans[index_queue] = true; }
            // exclusive
            if (((int)lifeSpan & (int)LifeSpanType.Exclusive) == (int)LifeSpanType.Exclusive) { exclusive_lifeSpans[index_queue] = true; }
        }


        public static ConnectionFactory GetFactory()
        {
            if (null == m_factory_)
            {
                m_factory_ = new ConnectionFactory() { HostName = HOST_NAME };
                Console.WriteLine("(^q^)ファクトリーを用意したぜ☆ HostName=[" + HOST_NAME + "]");
            }
            return m_factory_;
        }
        static ConnectionFactory m_factory_;

        public static IConnection GetConnection()
        {
            if (null == m_connection_)
            {
                m_connection_ = GetFactory().CreateConnection();
                Console.WriteLine("(^q^)接続したぜ☆");
            }
            return m_connection_;
        }
        static IConnection m_connection_;

        public static IModel GetChannel(int index)
        {
            if (null == m_channels_[index])
            {
                m_channels_[index] = GetConnection().CreateModel();

                // 引数が7つの QueueDeclare でエラーを吐くことがあるので、情報を出力しておく。
                Console.Error.WriteLine(Dump());
                Console.Error.WriteLine("(tamesi34_cs.cs?) GetChannel index = [" + index + "]");
                Console.Error.WriteLine("    name_queues[index] = [" + name_queues[index] + "]");
                Console.Error.WriteLine("    passive_lifeSpans[index] = [" + passive_lifeSpans[index] + "]");
                Console.Error.WriteLine("    durable_lifeSpans[index] = [" + durable_lifeSpans[index] + "]");
                Console.Error.WriteLine("    exclusive_lifeSpans[index] = [" + exclusive_lifeSpans[index] + "]");
                Console.Error.WriteLine("    autodelete_lifeSpans[index] = [" + autodelete_lifeSpans[index] + "]");
                Console.Error.WriteLine("    nowait は仮に false 固定");
                Console.Error.WriteLine("    arguments は仮に null 固定");

#if UBUNTU
                // Ubuntuでは何故か Spring.Messaging.Amqp.Rabbit の引数 7 つのやつになっている。
                // RabbitMQ.Client.Framing.Impl.v0_8.Model.QueueDeclare (System.String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Boolean nowait, IDictionary arguments)
                // そして false になっているようだ。
                m_channels_[index].QueueDeclare(name_queues[index], passive_lifeSpans[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], false, null);
                //m_channels_[index].QueueDeclare(name_queues[index], false, false, false, false, false, null);
#else
                m_channels_[index].QueueDeclare(name_queues[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], null);
#endif
            }

            Console.WriteLine("(^q^)チャンネルを開いたぜ☆ index=[" + index + "]");
            return m_channels_[index];
        }
        static IModel[] m_channels_ = new IModel[NUM_INDEX];

        public static EventingBasicConsumer GetConsumer(int index)
        {
            if (null == m_consumers_[index])
            {
#if UBUNTU
                // Ubuntuでは何故か v1.4.0 の引数が 0 個のやつになっている。調べたが引数が1個~6個のものは無かった。
                m_consumers_[index] = new EventingBasicConsumer();
#else
                m_consumers_[index] = new EventingBasicConsumer(GetChannel(index));
#endif
            }
            Console.WriteLine("(^q^)コンシューマーを取得したぜ☆ index=["+ index + "]");
            return m_consumers_[index];
        }
        static EventingBasicConsumer[] m_consumers_ = new EventingBasicConsumer[NUM_INDEX];

        /// <summary>
        /// 受信できたときに割り込んでくる処理
        /// </summary>
#if UBUNTU
        public static BasicDeliverEventHandler GetDequeueHandler()
#else
        public static EventHandler<BasicDeliverEventArgs> GetDequeueHandler()
#endif
        {
            if (null == m_dequeueHandler_)
            {
#if UBUNTU
                m_dequeueHandler_ = new BasicDeliverEventHandler((model, ea) =>
#else
                m_dequeueHandler_ = new EventHandler<BasicDeliverEventArgs>((model, ea) =>
#endif
                {
                    byte[] body = ea.Body;
                    string message = Encoding.UTF8.GetString(body);
                    Console.WriteLine("<---- [interrupt!] Dequeue(^q^) {0}", message);
                });
            }

            return m_dequeueHandler_;
        }

#if UBUNTU
        static BasicDeliverEventHandler m_dequeueHandler_;
#else
        static EventHandler<BasicDeliverEventArgs> m_dequeueHandler_;
#endif

        /// <summary>
        /// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
        /// </summary>
        static void CloseConnection()
        {
            if (null != m_connection_)
            {
                m_connection_.Close();
                m_connection_ = null;
                Console.WriteLine("(^q^)接続を閉じたぜ☆");
            }
        }
        /// <summary>
        /// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
        /// </summary>
        static void CloseChannel(int index)
        {
            if (null != m_channels_[index])
            {
                m_channels_[index].Close();
                m_channels_[index] = null;
                Console.WriteLine("(^q^)チャンネルを閉じたぜ☆");
            }
        }

        static void Main(string[] args)
        {
            //----------------------------------------
            // Delete
            //----------------------------------------
            for (;;)
            {
                Console.Write(@"削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力    : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? > ");
                string queueName_delete = Console.ReadLine();
                if (""== queueName_delete.Trim())
                {
                    break;
                }


                Console.Write(@"削除するメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
("+ ((int)LifeSpanType.Durable) + @") durable    : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
                LifeSpanType lifeSpan_delete = (LifeSpanType)int.Parse(Console.ReadLine());
                SetLifeSpan(DELETEQUEUE_INDEX, queueName_delete, lifeSpan_delete);
                uint result = DeleteQueue();
                Console.WriteLine(@"["+ queueName_delete + "]キューを削除したはずだぜ☆(^~^) result=["+ result + "] ただし、中にメッセージが残っていると消すのに失敗するみたいだが。");
            }

            //----------------------------------------
            // Enqueue settings
            //----------------------------------------
            for (;;)
            {
                Console.Write(@"エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > ");
                string queueName_enqueue = Console.ReadLine();

                Console.Write(@"エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(" + ((int)LifeSpanType.Durable) + @") durable    : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
                int lifeSpan_enqueue;
                if(int.TryParse(Console.ReadLine(),out lifeSpan_enqueue))
                {
                    SetLifeSpan(ENQUEUE_INDEX, queueName_enqueue, (LifeSpanType)lifeSpan_enqueue);
                    break;
                }
            }

            //----------------------------------------
            // Enqueue settings
            //----------------------------------------
            EventingBasicConsumer consumer = null;
            for (;;)
            {
                Console.Write(@"デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > ");
                string queueName_dequeue = Console.ReadLine();

                Console.Write(@"デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(" + ((int)LifeSpanType.Durable) + @") durable    : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
                int lifeSpan_dequeue;
                if(int.TryParse(Console.ReadLine(),out lifeSpan_dequeue))
                {
                    SetLifeSpan(DEQUEUE_INDEX, queueName_dequeue, (LifeSpanType)lifeSpan_dequeue);

                    StartConsume(out consumer);
                    break;
                }
            }

            Console.Write(@"終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > ");
            for (;;)
            {
                // "Hello World!" などを入力
                string line = Console.ReadLine();
                Enqueue(line);
                Console.Write(@"Enqueue? > ");
            }

            // ここには来ない
            // CloseConnection();
        }

        static uint DeleteQueue()
        {
            IModel channel = GetChannel(DELETEQUEUE_INDEX);

#if UBUNTU
            // Ubuntuでは何故か 昔の .Net 用の引数 5 つのやつの変則版になっている。
            // uint QueueDelete(ushort ticket, string queue, bool ifUnused, bool ifEmpty, bool nowait)
            uint result = channel.QueueDelete( name_queues[DELETEQUEUE_INDEX],true,true,true);
#else
            uint result = channel.QueueDelete(name_queues[DELETEQUEUE_INDEX],true,true);
#endif

            // 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
            CloseChannel(DELETEQUEUE_INDEX);
            return result;
        }

        static void Enqueue(string message)
        {
            Console.WriteLine("(^q^)Enqueue: エンキュー開始☆ message=[" + message + "]");

            IModel channel = GetChannel(ENQUEUE_INDEX);

            byte[] body = Encoding.UTF8.GetBytes(message);
            Console.WriteLine("(^q^)Enqueue: body.Length=[" + body.Length+"]");

            channel.BasicPublish("", name_queues[ENQUEUE_INDEX], null, body);
            Console.WriteLine("(^q^)Enqueue: パブリッシュしたぜ。キュー名=[" + name_queues[ENQUEUE_INDEX] + "]");

            // 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
            CloseChannel(ENQUEUE_INDEX);
        }

        static void StartConsume(out EventingBasicConsumer consumer)
        {
            IModel channel = GetChannel(DEQUEUE_INDEX);
            consumer = GetConsumer(DEQUEUE_INDEX); // ローカル変数だとスコープが切れて寿命が尽きてしまうので、外側に返す

            // 受信できたときに割り込んでくる処理
            consumer.Received += GetDequeueHandler();

#if UBUNTU
            // Ubuntuでは何故か引数が 5 個のやつになっている。
            channel.BasicConsume( name_queues[DEQUEUE_INDEX], true, "", null, consumer);
#else
            channel.BasicConsume( name_queues[DEQUEUE_INDEX], true, consumer);
#endif
            // 終了はさせない
            // consumer.Received -= GetReceiveHandler();
            // CloseChannel(DEQUEUE_INDEX);
        }
    }
}

tamesi35a2_cpp.cpp

//--------------------------------------------------------------------------------
// OS      : Windows10 : // このプログラムは動かない
//
// OS      : Ubuntu 16.04
// Library : libev
//         : Install   : Command  : sudo apt-get update
//                                : sudo apt-get install libev-dev
// Service : RabbitMQ
//         : Reference : Web site : Top page http://www.rabbitmq.com/
//         : Install   : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
//         : Manual    : Command  : man rabbitmqctl
//         : Start     : Command  : rabbitmq-server
//         : Stop      : Command  : rabbitmqctl stop
//         : Check     : Command  : rabbitmqctl status
//         :           : Command  : rabbitmqctl list_queues
// Library : AMQP-CPP
//         : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
//         : Reference : Web site : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
//         : Reference : Web site : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
//         : Reference : Web site : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
//         : Reference : Web site : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
//         : Reference : Web site : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//
// Program : this
//         : Compile   : Command  : g++ -std=c++11 tamesi35a2_cpp.cpp -o tamesi35a2_cpp.exe -lev -lamqpcpp -pthread
//         : Execute   : Command  : // バックグラウンドで実行
//                                : ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
//         : Stop      : Typing   : [Ctrl]+[C]
//--------------------------------------------------------------------------------
//
// メッセージの末尾に daze を付けます。
// デキューは「1112」、エンキューは「1113」キューに向けて行います。
// 標準入出力は、キーボード、画面には接続しないようにします。

#include <string> // std::string
#include <iostream> // std::cout
#include <sstream> // std::ostringstream
#include <chrono>
#include <thread> // sleep_for
#include <future> // thread

// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

// このプログラム内だけで使われる数字。AMQP-CPPでの実装とは異なる。
// AMQP-CPPでの実装 : AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
enum LifeSpanType
{
    // RabbitMQが止まってもキューを残す
    durable = 1,
    // コンシューマーが1人も接続していなかったら消す
    autodelete = 0x1 << 1,
    // キューが存在するかどうかチェックするだけ。中身見ない時これ
    passive = 0x1 << 2,
    // この接続でだけ使える。この接続が切れたら消す
    exclusive = 0x1 << 3
};

static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
// 0 : enqueue用設定
// 1 : dequeue用設定
static const int ENQUEUE_INDEX = 0;
static const int DEQUEUE_INDEX = 1;
static const int NUM_INDEX = 2;
static std::string name_queues[] = { "", "" };
static LifeSpanType lifeSpan_queues[] = { (LifeSpanType)0, (LifeSpanType)0 };
static LifeSpanType LifeSpanString_To_Enum(std::string lifeSpan)
{
    if ("durable"==lifeSpan) {
        return durable;
    } else if ("autodelete" == lifeSpan) {
        return autodelete;
    } else if ("passive" == lifeSpan) {
        return passive;
    } else if ("exclusive" == lifeSpan) {
        return exclusive;
    } else {
        std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
        exit(1);
    }
}
static int LifeSpanString_To_AmqpInt(LifeSpanType lifeSpan)
{
    int amqpInt = 0;
    if ( ((int)durable & (int)lifeSpan) == (int)durable) {
        amqpInt |= AMQP::durable;
    }

    if (((int)autodelete & (int)lifeSpan) == (int)autodelete) {
        amqpInt |= AMQP::autodelete;
    }

    if (((int)passive & (int)lifeSpan) == (int)passive) {
        amqpInt |= AMQP::passive;
    }

    if (((int)exclusive & (int)lifeSpan) == (int)exclusive) {
        amqpInt |= AMQP::exclusive;
    }

    return amqpInt;
}

static std::string Dump()
{
    static std::ostringstream sb;
    sb << "tamesi35a2_cs.cs Dump" << std::endl
        << "    name_queues         [ENQUEUE_INDEX]    =[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
        << "    lifeSpan_queues     [ENQUEUE_INDEX]    =[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl
        << "    ----" << std::endl
        << "    name_queues         [DEQUEUE_INDEX]    =[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl
        << "    lifeSpan_queues     [DEQUEUE_INDEX]    =[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl
        << "    ----" << std::endl;
    return sb.str();
}

// 接続はシングルトンにします
static struct ev_loop* pLoop_ev[2] = { ev_loop_new(), ev_loop_new() };
static AMQP::LibEvHandler* pHandler_ev[2] = { new AMQP::LibEvHandler(pLoop_ev[ENQUEUE_INDEX]), new AMQP::LibEvHandler(pLoop_ev[DEQUEUE_INDEX]) };

static AMQP::TcpConnection* pConnection_ev[2] = { nullptr,nullptr };
static AMQP::TcpConnection* getConnection(int index)
{
    if (nullptr== pConnection_ev[index]) {
        pConnection_ev[index] = new AMQP::TcpConnection( pHandler_ev[index], ADDRESS );
        std::cout << "(^▽^)接続したぜ☆ index=[" << index << "]" << std::endl;
    }
    return pConnection_ev[index];
}
static void closeChannel(int index);
static void closeConnection(int index)
{
    if (nullptr != pConnection_ev[index]) {

        // チャンネルにもヌルのフラグを入れる
        closeChannel(index);

        std::cout << "(^▽^)これから接続を閉じるぜ☆ index=[" << index << "]" << std::endl;
        pConnection_ev[index]->close();
        std::cout << "(^▽^)接続を閉じたぜ☆" << std::endl;
        pConnection_ev[index] = nullptr;
    }
}

// チャンネルはシングルトンにします。
static AMQP::TcpChannel* pChannel_ev[2] = { nullptr,nullptr };
static AMQP::TcpChannel* getChannel(int index)
{
    if (nullptr== pChannel_ev[index])
    {
        pChannel_ev[index] = new AMQP::TcpChannel(getConnection(index));
        std::cout << "(^▽^)チャンネル開いたぜ☆ index=[" << index << "]" << std::endl;
    }
    return pChannel_ev[index];
}
static void closeChannel(int index)
{
    if (nullptr != pChannel_ev[index]) {
        std::cout << "(^▽^)これからチャンネルを閉じるぜ☆ index=[" << index << "]" << std::endl;
        pChannel_ev[index]->close();
        std::cout << "(^▽^)チャンネルを閉じたぜ☆" << std::endl;
        pChannel_ev[index] = nullptr;
    }
}


/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
    const int bufferSize = 100;
    static std::string buffer[bufferSize] = {};
    static int bufferCursors[2] = { 0, 0 };
    const int PUT_INDEX = 0;
    const int GET_INDEX = 1;

    std::mutex _mutex;
    static void putMessage(std::string message)
    {
        //std::unique_lock<std::mutex> lock(_mutex);
        std::lock_guard<std::mutex> lock(_mutex);

        buffer[bufferCursors[PUT_INDEX]] = message;
        std::cout << "(^◇^)回転式バッファー : putMessage : [" << bufferCursors[PUT_INDEX] << "] 入れた☆! message=[" << message << "]" << std::endl;

        bufferCursors[PUT_INDEX]++;
        if (!(bufferCursors[PUT_INDEX] < bufferSize))
        {
            bufferCursors[PUT_INDEX] = 0;
        }
        std::cout << "(^◇^)回転式バッファー : putMessage : 次回は[" << bufferCursors[PUT_INDEX] << "] ☆!" << std::endl;
    }
    static bool hasMessage() {
        std::lock_guard<std::mutex> lock(_mutex);
        return "" != buffer[bufferCursors[GET_INDEX]];
    }
    static std::string removeMessage()
    {
        //std::unique_lock<std::mutex> lock(_mutex);
        std::lock_guard<std::mutex> lock(_mutex);

        if ("" != buffer[bufferCursors[GET_INDEX]])
        {
            std::string message = buffer[bufferCursors[GET_INDEX]];
            std::cout << "(^◇^)回転式バッファー : ゲットメッセージ : [" << bufferCursors[GET_INDEX] << "] 出した☆! message=[" << message << "]" << std::endl;

            buffer[bufferCursors[GET_INDEX]] = "";
            bufferCursors[GET_INDEX]++;
            if (!(bufferCursors[GET_INDEX] < bufferSize))
            {
                bufferCursors[GET_INDEX] = 0;
            }
            std::cout << "(^◇^)回転式バッファー : ゲットメッセージ : 次回は[" << bufferCursors[GET_INDEX] << "] ☆!" << std::endl;

            return message;
        }
        return "";
    }
    static std::string getMessage()
    {
        std::lock_guard<std::mutex> lock(_mutex);
        return buffer[bufferCursors[GET_INDEX]];
    }
}

// メッセージ・キューの送信を担当
static void workProduce()
{
    std::cout << "(^▽^)エンキュー開始だぜ☆" << std::endl;
    std::string exchange_name = "myexchange";
    std::string routing_key = "";

    int lifeSpan = LifeSpanString_To_AmqpInt(lifeSpan_queues[ENQUEUE_INDEX]);

    // おっきな無限ループ
    for (;;) {
        getChannel(ENQUEUE_INDEX)->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
            .onError([&lifeSpan](const char* errMsg) {
            std::cerr << "(>_<)enqueue: error declaring queue: [" << errMsg << "] queue name=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan=[" << lifeSpan << "]" << "\n";
        });
        std::cout << "(^▽^)workProduce : デクレア・キューしたぜ☆" << std::endl;

        getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
            // ev_run で1回呼び出される。onSuccessが終わっても、ev_run は抜けない。
            .onSuccess([&exchange_name, &routing_key]() {

            std::cout << "(^q^) bindQueue.onSuccess : 開始だぜ☆" << std::endl;

            for (;;)
            {
                if (rotationBuffer::hasMessage())
                {
                    std::string message = rotationBuffer::removeMessage();
                    std::cout << "(^q^) bindQueue.onSuccess : メッセージがあるぜ☆ message=[" << message << "]" << std::endl;

                    // 末尾に daze を付ける。
                    message += "daze";

                    std::cout << "(^▽^)bindQueue.onSuccess : これからトランザクションを開始するぜ☆" << std::endl;
                    getChannel(ENQUEUE_INDEX)->startTransaction();
                    std::cout << "(^▽^)bindQueue.onSuccess : これからパブリッシュするぜ☆" << std::endl;
                    if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
                        getChannel(ENQUEUE_INDEX)->commitTransaction()
                            .onSuccess([]() {
                            std::cout << "(^▽^)コミット成功☆!" << std::endl;
                        })
                            .onError([](const char *errMsg) {
                            std::cout << "(>_<)コミット失敗☆ errMsg=[" << errMsg << "]" << std::endl;
                        });
                        std::cout << "(^▽^)パブリッシュしたぜ☆" << std::endl;
                    }else{
                        std::cout << "(/_\)パブリッシュできなかったぜ☆" << std::endl;
                        getChannel(ENQUEUE_INDEX)->rollbackTransaction();
                    }

                    // 接続を切って、かつ、このハンドラーから抜けよう。
                    // break in ev_run loop.
                    closeConnection(ENQUEUE_INDEX);
                    //closeChannel(ENQUEUE_INDEX);
                    break;
                }
                //else
                //{
                //  std::cout << "(^q^) bindQueue.onSuccess : メッセージは無いぜ☆" << std::endl;
                //}
                std::this_thread::sleep_for(std::chrono::milliseconds(20));
            }

            std::cout << "(^q^) bindQueue.onSuccess : 終了だぜ☆" << std::endl;

        }).onError([](const char* errMsg) {
            std::cerr << "(>_<) バインドキュー : error declaring queue: [" << errMsg << "]" << "\n";
        });
        std::cout << "(^▽^)workProduce : バインド・キューしたぜ☆" << std::endl;

        std::cout << "(^q^) workProduce : メッセージ監視ループに入るぜ☆" << std::endl;
        // ここで、バインドキューのオンサクセスの処理へ。
        //ev_run(pLoop_ev[ENQUEUE_INDEX]);
        ev_run(pLoop_ev[ENQUEUE_INDEX], 0);
        std::cout << "(^q^) workProduce : メッセージ監視ループを出たぜ☆ 再接続しようぜ☆" << std::endl;
    }
    std::cout << "(>_<) workProduce : おっきな無限ループを出てしまったぜ☆?" << std::endl;
}

// メッセージ・キューの監視を開始
static void workConsume()
{
    std::cout << "(^q^)コンシューム開始だぜ☆" << std::endl;
    // I will go to the front of the box named "1111".
    int lifeSpan = LifeSpanString_To_AmqpInt(lifeSpan_queues[DEQUEUE_INDEX]);
    getChannel(DEQUEUE_INDEX)->declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);
    std::cout << "(^q^)コンシュームは、デクレア・キューしたぜ☆" << std::endl;

    // I look inside the box.
    auto errorCb = [&lifeSpan](const char *errMsg) {
        std::cerr << "My ID watching failed [" << errMsg << "] lifeSpan=[" << lifeSpan << "]" << std::endl;
    };
    auto messageCb = [](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {

        std::string myString(message.body(), message.bodySize());
        std::cout << "(^q^)レシーブしたぜ☆ myString=[" << myString << "]" << std::endl;

        rotationBuffer::putMessage(myString);

        std::cout << "(^q^)レシーブ : これからackするぜ☆ deliveryTag=[" << deliveryTag << "]" << std::endl;
        getChannel(DEQUEUE_INDEX)->ack(deliveryTag);
        std::cout << "(^q^)レシーブ : ackしたぜ☆ deliveryTag=[" << deliveryTag << "]" << std::endl;
    };
    getChannel(DEQUEUE_INDEX)->consume(name_queues[DEQUEUE_INDEX])
        .onReceived(messageCb)
        .onError(errorCb);
    std::cout << "(^q^)コンシュームを設定したぜ☆" << std::endl;

    // I will keep on forever.
    ev_run(pLoop_ev[DEQUEUE_INDEX]);

    std::cout << "(^q^)コンシュームのループから抜けたぜ☆?" << std::endl;

    // I will not come here.
    return;
}

int main(int argc, char* argv[])
{
    std::cout << "(^q^)Welcome! AMQP::durable=[" << AMQP::durable << "] AMQP::autodelete=[" << AMQP::autodelete << "] AMQP::passive=[" << AMQP::passive << "] AMQP::exclusive=[" << AMQP::exclusive << "]" << std::endl;

    // 引数の解析
    if(0<argc)
    {
        // プログラム名を省き、コマンドライン引数だけをつなげる。
        std::string cmdArg;
        for (int i = 1; i < argc; ++i)
        {
            cmdArg += std::string(argv[i]);
            if (i < argc) {
                cmdArg += " ";
            }
        }
        std::cout << "(^q^)cmdArg=[" << cmdArg << "]" << std::endl;
        std::istringstream data(cmdArg);
        //data >> std::skipws;
        std::cout << "(^q^)data.str()=[" << data.str() << "]" << std::endl;

        // 与件
        // 「--enqueue 1117 durable autodelete --dequeue 1116 durable autodelete」
        // 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」

        // 受け皿
        std::string queueNames[2] = {}; // 結果はこれらの配列に入れる
        std::vector<std::string> lifespans[2] = {};

        // 記憶
        int m0 = -1; // 0:enqueue, 1:dequeue
        int m1 = -1; // lifespans index.

        // 解析器
        std::string a;
        while (data >> a) {
            std::cout << "(^_^)a0=[" << a << "]" << std::endl;

            if ("--enqueue" == a)
            {
                std::cout << "(^q^)エンキュー!" << std::endl;
                m0 = 0;
                m1 = -1;
            }
            else if ("--dequeue" == a)
            {
                std::cout << "(^q^)デキュー!" << std::endl;
                m0 = 1;
                m1 = -1;
            }
            else if (-1 == m1 && 0 == m0) {
                std::cout << "(^q^)エンキューのキュー名!" << std::endl;
                queueNames[m0] = a; // キュー名
                lifespans[m0] = std::vector<std::string>();// 配列の中にリストを準備
                m1++;
            }
            else if (-1 == m1 && 1 == m0) {
                std::cout << "(^q^)デキューのキュー名!" << std::endl;
                queueNames[m0] = a; // キュー名
                lifespans[m0] = std::vector<std::string>();// 配列の中にリストを準備
                m1++;
            }
            else
            {
                std::cout << "(^q^)寿命! m0=[" << m0 << "] m1=[" << m1 << "]" << std::endl;
                lifespans[m0].push_back(a);
                std::cout << "(^q^)zyumyo-!" << std::endl;
            } // 要素数-1 で最後の要素
        }
        std::cout << "(^-^)Loop end." << std::endl;

        if ("" == queueNames[0] || "" == queueNames[1]) {
            std::cerr << "コマンドライン引数の「--enqueue キュー名 寿命」「--dequeue キュー名 寿命」を漏れなく指定してください。例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete" << std::endl;
            std::cerr << "queueNames[0]=[" << queueNames[0] << "]" << std::endl;
            //std::cerr << "queueNames[1]=[" << queueNames[1] << "]" << std::endl;
            //std::cerr << Dump();
            exit(1);
        }
        std::cout << "(^q^)エンキューのキュー名=[" << queueNames[0] << "]" << std::endl;
        std::cout << "(^q^)デキューのキュー名=[" << queueNames[1] << "]" << std::endl;
        std::cout << "(^q^)エンキューの寿命の数=[" << lifespans[0].size() << "]" << std::endl;
        std::cout << "(^q^)デキューの寿命の数=[" << lifespans[1].size() << "]" << std::endl;

        // エンキュー
        {
            m0 = 0;
            name_queues[ENQUEUE_INDEX] = queueNames[m0];
            lifeSpan_queues[ENQUEUE_INDEX] = (LifeSpanType)0;
            {
                for (std::string lifespan : lifespans[m0])
                {
                    lifeSpan_queues[ENQUEUE_INDEX] = (LifeSpanType)((int)lifeSpan_queues[ENQUEUE_INDEX] | (int)LifeSpanString_To_Enum(lifespan));
                }
            }
        }
        std::cout << "(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan_queues[ENQUEUE_INDEX]=[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl;

        // デキュー
        {
            m0 = 1;
            name_queues[DEQUEUE_INDEX] = queueNames[m0];
            lifeSpan_queues[DEQUEUE_INDEX] = (LifeSpanType)0;
            {
                for (std::string lifespan : lifespans[m0])
                {
                    lifeSpan_queues[DEQUEUE_INDEX] = (LifeSpanType)((int)lifeSpan_queues[DEQUEUE_INDEX] | (int)LifeSpanString_To_Enum(lifespan));
                }
            }
        }
        std::cout << "(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[" << name_queues[DEQUEUE_INDEX] << "] lifeSpan_queues[DEQUEUE_INDEX]=[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl;

        std::cout << Dump();
        std::cout << "(^q^)ダンプおわり" << std::endl;
    }
    else
    {
        std::cerr << "コマンドライン引数を指定してください。例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete";
        exit(1);
    }

    // 常時監視をスタート
    auto th1 = std::thread([] { workConsume(); });
    auto th2 = std::thread([] { workProduce(); });

    std::cout << "(^q^)メインスレッドはずっと回ってるぜ☆" << std::endl;
    for (;;) // このメイン・スレッドはずっと寝てる
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(20));
    }
    std::cout << "(^q^)メインスレッドのループを抜けた☆?" << std::endl;
    // このプログラムは、自分では接続を切らない。

    // スレッドの終了を待つ
    th2.join();
    th1.join();

    std::cout << "(^q^)Finished!" << std::endl;
    return 0;
}

これを浮かむ瀬に組み込むには 長ったらしい。もっと短く書けんのか。

最初の1つ

# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力    : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 1
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > saru
Enqueue? > suzume
Enqueue? > <---- [interrupt!] Dequeue(^q^) suzumedaze
penguin
Enqueue? > <---- [interrupt!] Dequeue(^q^) penguindaze
sika
Enqueue? > <---- [interrupt!] Dequeue(^q^) sikadaze
nezumi
Enqueue? > <---- [interrupt!] Dequeue(^q^) nezumidaze

こんどは最初の1つが返ってこないぞ。

最初の方

# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力    : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 1
34_cs consumer.create [1] begin
34_cs consumer.create [1] end
34_cs channel.open [1] begin
34_cs connection.open begin
34_cs connection.open end
34_cs channel.open [1] end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > usagi
34_cs publish begin [usagi]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > kame
34_cs publish begin [kame]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > rakko
34_cs publish begin [rakko]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) rakkodaze
surume
34_cs publish begin [surume]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) surumedaze

途中から反応しだすのは何でだろう?

じゃあ、しばらく待ってから通信したらいいのか?

# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力    : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 1
34_cs consumer.create [1] begin
34_cs consumer.create [1] end
34_cs channel.open [1] begin
34_cs connection.open begin
34_cs connection.open end
34_cs channel.open [1] end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > medaka
34_cs publish begin [medaka]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs <-------- [interrupt!] Dequeue(^q^) medakadaze
34_cs channel.close [0] end
Enqueue? > batta
34_cs publish begin [batta]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) battadaze
mebosi
34_cs publish begin [mebosi]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) mebosidaze

たしかに しばらく待つ と 最初の通信から応答が返ってくるようだが、プロセス間通信をするために最初に1秒待たないと行けないのだとしたら本末転倒だ。

tamesi35a2_cpp.out.log

35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
35a2_cpp main dump begin
tamesi35a2_cs.cs Dump
    name_queues         [ENQUEUE_INDEX]    =[1113]
    lifeSpan_queues     [ENQUEUE_INDEX]    =[1]
    ----
    name_queues         [DEQUEUE_INDEX]    =[1112]
    lifeSpan_queues     [DEQUEUE_INDEX]    =[3]
    ----
35a2_cpp main dump end
35a2_cpp[1] channel.open begin
35a2_cpp[1] connection.open begin
35a2_cpp[0] channel.open begin
35a2_cpp[0] connection.open begin
35a2_cpp[35a2_cpp[1] connection.open end
0] connection.open end35a2_cpp[1] channel.open end

35a2_cpp[0] channel.open end
35a2_cpp rotationBuffer.putMessage begin [0] [kame]
35a2_cpp rotationBuffer.putMessage end next=[1]
35a2_cpp consumer.consume.ack begin [1] [kame]
35a2_cpp consumer.consume.ack end [1]
35a2_cpp rotationBuffer.removeMessage begin [0] [kame]
35a2_cpp rotationBuffer.removeMessage end next=[1]
35a2_cpp[0] connection.close begin
35a2_cpp[0] channel.close begin
35a2_cpp[0] channel.close end
35a2_cpp[0] connection.close end
35a2_cpp[0] channel.open begin
35a2_cpp[0] connection.open begin
35a2_cpp[0] connection.open end
35a2_cpp[0] channel.open end
35a2_cpp rotationBuffer.putMessage begin [1] [surume]
35a2_cpp rotationBuffer.putMessage end next=[2]
35a2_cpp consumer.consume.ack begin [2] [surume]
35a2_cpp consumer.consume.ack end [2]
35a2_cpp rotationBuffer.removeMessage begin [1] [surume]
35a2_cpp rotationBuffer.removeMessage end next=[2]
35a2_cpp[0] connection.close begin
35a2_cpp[0] channel.close begin
35a2_cpp[0] channel.close end
35a2_cpp[0] connection.close end
35a2_cpp producer.publish.commited [ F�x^A^?^@^@B�nx^A^?^@^@$^@^@^@^@^@^@^@B�nx^A^?^@^@ F�x^A^?^@^@�F�w^A^?^@^@B�nx^A^?^@^@^A^@^@^@^@^@^@^@�G�w^A^?^@^@��mx^A^?^@^@�%c^@^@^@^@^@^PH�w^A^?^@^@�%c^@^@^@^@^@^@^@^@^@^@^@^@^@B�nx^A^$

ログに kame と surume しかいないのも分からない。 usagi や rakko はどこに行ったのか?

時刻はずれているが、時刻を出してみた。

03:20:17 '254 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
03:20:17 '254 35a2_cpp main dump begin
03:20:17 '254 tamesi35a2_cs.cs Dump
    name_queues         [ENQUEUE_INDEX]    =[1113]
    lifeSpan_queues     [ENQUEUE_INDEX]    =[1]
    ----
    name_queues         [DEQUEUE_INDEX]    =[1112]
    lifeSpan_queues     [DEQUEUE_INDEX]    =[3]
    ----
03:20:17 '254 35a2_cpp main dump end
03:20:17 '255 35a2_cpp producer.sendMessage begin
03:20:17 '255 35a2_cpp producer.sendMessage.declareQueue begin
03:20:17 '255 35a2_cpp[0] channel.open begin
03:20:17 '255 35a2_cpp consumer begin
03:20:17 '255 35a2_cpp consumer.declareQueue begin
03:20:17 '255  35a2_cpp[0] connection.open begin
03:20:17 '255 35a2_cpp[1] channel.open begin
03:20:17 '255  35a2_cpp[1] connection.open begin
03:20:17 '256 35a2_cpp[1] connection.open end
03:20:17 '257 35a2_cpp[1] channel.open end
03:20:17 '256 35a2_cpp[0] connection.open end
03:20:17 '257 35a2_cpp[0] channel.open end
03:20:17 '257 35a2_cpp producer.sendMessage.declareQueue end
03:20:17 '257 35a2_cpp producer.sendMessage.bindQueue begin
03:20:17 '257 35a2_cpp producer.sendMessage.bindQueue end
03:20:17 '257 35a2_cpp producer.sendMessage.ev_run begin
03:20:17 '257 35a2_cpp consumer.declareQueue end
03:20:17 '257 35a2_cpp consumer.consume begin
03:20:17 '257 35a2_cpp consumer.consume end
03:20:17 '257 35a2_cpp consumer.ev_run begin
03:20:56 '695 35a2_cpp rotationBuffer.putMessage begin [0] before lock [neko]
03:20:56 '695 35a2_cpp rotationBuffer.putMessage begin [0] [neko]
03:20:56 '695 35a2_cpp rotationBuffer.putMessage end next=[1]
03:20:56 '695 35a2_cpp consumer.consume.received.ack begin [1] [neko]
03:20:56 '695 35a2_cpp consumer.consume.received.ack end [1]
03:20:56 '699 35a2_cpp rotationBuffer.removeMessage begin [0] [neko]
03:20:56 '699 35a2_cpp rotationBuffer.removeMessage end return=[neko] next=[1]
03:20:56 '700 35a2_cpp producer.sendMessage.bindQueue.onSuccess [nekodaze]
03:20:56 '700 35a2_cpp[0] connection.close begin
03:20:56 '700 35a2_cpp[0] channel.close begin
03:20:56 '700 35a2_cpp[0] channel.close end
03:20:56 '700 35a2_cpp[0] connection.close end
03:20:56 '709 35a2_cpp producer.sendMessage.publish.commited []
03:20:56 '711 35a2_cpp producer.sendMessage.ev_run end
03:20:56 '711 35a2_cpp producer.sendMessage end
03:20:56 '711 35a2_cpp producer.sendMessage begin
03:20:56 '711 35a2_cpp producer.sendMessage.declareQueue begin
03:20:56 '711 35a2_cpp[0] channel.open begin
03:20:56 '711  35a2_cpp[0] connection.open begin
03:20:56 '712 35a2_cpp[0] connection.open end
03:20:56 '712 35a2_cpp[0] channel.open end
03:20:56 '712 35a2_cpp producer.sendMessage.declareQueue end
03:20:56 '712 35a2_cpp producer.sendMessage.bindQueue begin
03:20:56 '712 35a2_cpp producer.sendMessage.bindQueue end
03:20:56 '712 35a2_cpp producer.sendMessage.ev_run begin

ev_run は2つとも待機状態で、突然 neko から始まっているが、やはり inu は飛ばされている。このあたり調べてみたい。

03:20:17 '257 35a2_cpp producer.sendMessage.ev_run begin
03:20:17 '257 35a2_cpp consumer.declareQueue end
03:20:17 '257 35a2_cpp consumer.consume begin
03:20:17 '257 35a2_cpp consumer.consume end
03:20:17 '257 35a2_cpp consumer.ev_run begin
03:20:56 '695 35a2_cpp rotationBuffer.putMessage begin [0] before lock [neko]
03:20:56 '695 35a2_cpp rotationBuffer.putMessage begin [0] [neko]
03:20:56 '695 35a2_cpp rotationBuffer.putMessage end next=[1]
03:20:56 '695 35a2_cpp consumer.consume.received.ack begin [1] [neko]
03:20:56 '695 35a2_cpp consumer.consume.received.ack end [1]
03:20:56 '699 35a2_cpp rotationBuffer.removeMessage begin [0] [neko]
03:20:56 '699 35a2_cpp rotationBuffer.removeMessage end return=[neko] next=[1]
03:20:56 '700 35a2_cpp producer.sendMessage.bindQueue.onSuccess [nekodaze]
03:20:56 '700 35a2_cpp[0] connection.close begin
03:20:56 '700 35a2_cpp[0] channel.close begin
03:20:56 '700 35a2_cpp[0] channel.close end
03:20:56 '700 35a2_cpp[0] connection.close end
03:20:56 '709 35a2_cpp producer.sendMessage.publish.commited []
03:20:56 '711 35a2_cpp producer.sendMessage.ev_run end
03:20:56 '711 35a2_cpp producer.sendMessage end
03:20:56 '711 35a2_cpp producer.sendMessage begin
03:20:56 '711 35a2_cpp producer.sendMessage.declareQueue begin
03:20:56 '711 35a2_cpp[0] channel.open begin
03:20:56 '711  35a2_cpp[0] connection.open begin
03:20:56 '712 35a2_cpp[0] connection.open end
03:20:56 '712 35a2_cpp[0] channel.open end
03:20:56 '712 35a2_cpp producer.sendMessage.declareQueue end
03:20:56 '712 35a2_cpp producer.sendMessage.bindQueue begin
03:20:56 '712 35a2_cpp producer.sendMessage.bindQueue end
03:20:56 '712 35a2_cpp producer.sendMessage.ev_run begin
03:26:07 '706 35a2_cpp rotationBuffer.putMessage begin [1] before lock [saru]
03:26:07 '706 35a2_cpp rotationBuffer.putMessage begin [1] [saru]
03:26:07 '706 35a2_cpp rotationBuffer.putMessage end next=[2]
03:26:07 '706 35a2_cpp consumer.consume.received.ack begin [2] [saru]
03:26:07 '707 35a2_cpp consumer.consume.received.ack end [2]
03:26:07 '708 35a2_cpp rotationBuffer.removeMessage begin [1] [saru]
03:26:07 '708 35a2_cpp rotationBuffer.removeMessage end return=[saru] next=[2]
03:26:07 '708 35a2_cpp producer.sendMessage.bindQueue.onSuccess [sarudaze]
03:26:07 '708 35a2_cpp[0] connection.close begin
03:26:07 '708 35a2_cpp[0] channel.close begin
03:26:07 '708 35a2_cpp[0] channel.close end
03:26:07 '708 35a2_cpp[0] connection.close end
03:26:07 '710 35a2_cpp producer.sendMessage.ev_run end
03:26:07 '710 35a2_cpp producer.sendMessage end
03:26:07 '710 35a2_cpp producer.sendMessage begin
03:26:07 '710 35a2_cpp producer.sendMessage.declareQueue begin
03:26:07 '710 35a2_cpp[0] channel.open begin
03:26:07 '710  35a2_cpp[0] connection.open begin
03:26:07 '710 35a2_cpp[0] connection.open end
03:26:07 '711 35a2_cpp[0] channel.open end
03:26:07 '711 35a2_cpp producer.sendMessage.declareQueue end
03:26:07 '711 35a2_cpp producer.sendMessage.bindQueue begin
03:26:07 '711 35a2_cpp producer.sendMessage.bindQueue end
03:26:07 '711 35a2_cpp producer.sendMessage.ev_run begin
03:26:10 '447 35a2_cpp rotationBuffer.putMessage begin [2] before lock [inu]
03:26:10 '447 35a2_cpp rotationBuffer.putMessage begin [2] [inu]
03:26:10 '447 35a2_cpp rotationBuffer.putMessage end next=[3]
03:26:10 '447 35a2_cpp consumer.consume.received.ack begin [3] [inu]
03:26:10 '448 35a2_cpp consumer.consume.received.ack end [3]
03:26:10 '466 35a2_cpp rotationBuffer.removeMessage begin [2] [inu]
03:26:10 '466 35a2_cpp rotationBuffer.removeMessage end return=[inu] next=[3]
03:26:10 '466 35a2_cpp producer.sendMessage.bindQueue.onSuccess [inudaze]
03:26:10 '467 35a2_cpp[0] connection.close begin
03:26:10 '467 35a2_cpp[0] channel.close begin
03:26:10 '467 35a2_cpp[0] channel.close end
03:26:10 '467 35a2_cpp[0] connection.close end
03:26:10 '469 35a2_cpp producer.sendMessage.publish.commited []
03:26:10 '477 35a2_cpp producer.sendMessage.ev_run end
03:26:10 '477 35a2_cpp producer.sendMessage end
03:26:10 '477 35a2_cpp producer.sendMessage begin
03:26:10 '477 35a2_cpp producer.sendMessage.declareQueue begin
03:26:10 '477 35a2_cpp[0] channel.open begin
03:26:10 '477  35a2_cpp[0] connection.open begin
03:26:10 '477 35a2_cpp[0] connection.open end
03:26:10 '478 35a2_cpp[0] channel.open end
03:26:10 '478 35a2_cpp producer.sendMessage.declareQueue end
03:26:10 '478 35a2_cpp producer.sendMessage.bindQueue begin
03:26:10 '478 35a2_cpp producer.sendMessage.bindQueue end
03:26:10 '478 35a2_cpp producer.sendMessage.ev_run begin
03:26:15 '916 35a2_cpp rotationBuffer.putMessage begin [3] before lock [nezumi]
03:26:15 '916 35a2_cpp rotationBuffer.putMessage begin [3] [nezumi]
03:26:15 '916 35a2_cpp rotationBuffer.putMessage end next=[4]
03:26:15 '916 35a2_cpp consumer.consume.received.ack begin [4] [nezumi]

saru tori inu i nezumi で、ひとつ飛ばしになっている気もする。

手順を確認

RabbitMQ サービスを再起動。

rabbitmqctl stop
rabbitmq-server

[Ctrl] + [Z]

bg 1

1 はジョブ番号

./tamesi34_cs.exe 2> ./tamesi34_cs.err.log

送信先キュー 1112、durable + autodelete
受信元キュー 1113、durable + autodelete

6件送信

# rabbitmqctl list_queues
Listing queues ...
1112    6
# ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
[2] 16393
# jobs
[1]-  Running                 rabbitmq-server &  (wd: /home/★user/shogi/csharp_service)
[2]+  Running                 ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
# rabbitmqctl list_queues
Listing queues ...
1112    0
1113    6

6件の返信はできているようだ。

つまり うまく行っているときは

03:56:43 '147 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
03:56:43 '148 35a2_cpp main dump begin
03:56:43 '148 tamesi35a2_cs.cs Dump
    name_queues         [ENQUEUE_INDEX]    =[1113]
    lifeSpan_queues     [ENQUEUE_INDEX]    =[1]
    ----
    name_queues         [DEQUEUE_INDEX]    =[1112]
    lifeSpan_queues     [DEQUEUE_INDEX]    =[3]
    ----
03:56:43 '148 35a2_cpp main dump end
03:56:43 '148 35a2_cpp ___(C) ___ begin
03:56:43 '148 03:56:43 '148 35a2_cpp ___(C) DQ_ begin
35a2_cpp (P)___ WKP begin03:56:43 '149 35a2_cpp[
1] channel.open begin
03:56:43 '149  35a2_cpp[1] connection.open begin
03:56:43 '149 35a2_cpp (P)___ WKP.SM_ begin
03:56:43 '149 35a2_cpp (P)___ WKP.SM_.declareQueue begin
03:56:43 '149 35a2_cpp[0] channel.open begin
03:56:43 '149  35a2_cpp[0] connection.open begin
03:56:43 '150 35a2_cpp[0] connection.open end
03:56:43 '150 35a2_cpp[1] connection.open end
03:56:43 '151 35a2_cpp[1] channel.open end
03:56:43 '151 35a2_cpp ___(C) DQ_ end
03:56:43 '151 35a2_cpp ___(C) CNS begin
03:56:43 '151 35a2_cpp ___(C) CNS end
03:56:43 '151 35a2_cpp ___(C) ___ ev_run begin
03:56:43 '151 35a2_cpp[0] channel.open end
03:56:43 '151 35a2_cpp (P)___ WKP.SM_.declareQueue end
03:56:43 '151 35a2_cpp (P)___ WKP.SM_.bindQueue begin
03:56:43 '151 35a2_cpp (P)___ WKP.SM_.bindQueue end
03:56:43 '151 35a2_cpp (P)___ WKP.SM_ ev_run begin
03:56:43 '167 35a2_cpp ___(C) CNS.received begin [s1]
03:56:43 '167 35a2_cpp rotationBuffer.putMessage begin [0] before lock [s1]
03:56:43 '167 35a2_cpp rotationBuffer.putMessage begin [0] [s1]
03:56:43 '167 35a2_cpp rotationBuffer.putMessage end next=[1]
03:56:43 '167 35a2_cpp ___(C) CNS.received.ack begin [1] [s1]
03:56:43 '167 35a2_cpp ___(C) CNS.received.ack end [1]
03:56:43 '167 35a2_cpp ___(C) CNS.received end [s1]
03:56:43 '168 35a2_cpp ___(C) CNS.received begin [s2]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage begin [1] before lock [s2]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage begin [1] [s2]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage end next=[2]
03:56:43 '168 35a2_cpp ___(C) CNS.received.ack begin [2] [s2]
03:56:43 '168 35a2_cpp ___(C) CNS.received.ack end [2]
03:56:43 '168 35a2_cpp ___(C) CNS.received end [s2]
03:56:43 '168 35a2_cpp ___(C) CNS.received begin [s3]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage begin [2] before lock [s3]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage begin [2] [s3]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage end next=[3]
03:56:43 '168 35a2_cpp ___(C) CNS.received.ack begin [3] [s3]
03:56:43 '168 35a2_cpp ___(C) CNS.received.ack end [3]
03:56:43 '168 35a2_cpp ___(C) CNS.received end [s3]
03:56:43 '168 35a2_cpp ___(C) CNS.received begin [s4]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage begin [3] before lock [s4]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage begin [3] [s4]
03:56:43 '168 35a2_cpp rotationBuffer.putMessage end next=[4]
03:56:43 '168 35a2_cpp ___(C) CNS.received.ack begin [4] [s4]
03:56:43 '169 35a2_cpp ___(C) CNS.received.ack end [4]
03:56:43 '169 35a2_cpp ___(C) CNS.received end [s4]
03:56:43 '169 35a2_cpp ___(C) CNS.received begin [s5]
03:56:43 '169 35a2_cpp rotationBuffer.putMessage begin [4] before lock [s5]
03:56:43 '169 35a2_cpp rotationBuffer.putMessage begin [4] [s5]
03:56:43 '169 35a2_cpp rotationBuffer.putMessage end next=[5]
03:56:43 '169 35a2_cpp ___(C) CNS.received.ack begin [5] [s5]
03:56:43 '169 35a2_cpp ___(C) CNS.received.ack end [5]
03:56:43 '169 35a2_cpp ___(C) CNS.received end [s5]
03:56:43 '170 35a2_cpp ___(C) CNS.received begin [s6]
03:56:43 '170 35a2_cpp rotationBuffer.putMessage begin [5] before lock [s6]
03:56:43 '170 35a2_cpp rotationBuffer.putMessage begin [5] [s6]
03:56:43 '170 35a2_cpp rotationBuffer.putMessage end next=[6]
03:56:43 '170 35a2_cpp ___(C) CNS.received.ack begin [6] [s6]
03:56:43 '170 35a2_cpp ___(C) CNS.received.ack end [6]
03:56:43 '170 35a2_cpp ___(C) CNS.received end [s6]
03:56:43 '176 35a2_cpp rotationBuffer.removeMessage begin [0] [s1]
03:56:43 '176 35a2_cpp rotationBuffer.removeMessage end return=[s1] next=[1]
03:56:43 '176 35a2_cpp (P)___ WKP.bindQueue.onSuccess [s1daze]
03:56:43 '176 35a2_cpp[0] connection.close begin
03:56:43 '176 35a2_cpp[0] channel.close begin
03:56:43 '176 35a2_cpp[0] channel.close end
03:56:43 '176 35a2_cpp[0] connection.close end
03:56:43 '182 35a2_cpp (P)___ WKP.bindQueue.publish.commited []
03:56:43 '183 35a2_cpp (P)___ WKP.SM_ ev_run end
03:56:43 '183 35a2_cpp (P)___ WKP.SM_ end
03:56:43 '183 35a2_cpp (P)___ WKP.SM_ begin
03:56:43 '183 35a2_cpp (P)___ WKP.SM_.declareQueue begin
03:56:43 '183 35a2_cpp[0] channel.open begin
03:56:43 '183  35a2_cpp[0] connection.open begin
03:56:43 '183 35a2_cpp[0] connection.open end
03:56:43 '183 35a2_cpp[0] channel.open end
03:56:43 '183 35a2_cpp (P)___ WKP.SM_.declareQueue end
03:56:43 '183 35a2_cpp (P)___ WKP.SM_.bindQueue begin
03:56:43 '183 35a2_cpp (P)___ WKP.SM_.bindQueue end
03:56:43 '183 35a2_cpp (P)___ WKP.SM_ ev_run begin
03:56:43 '187 35a2_cpp rotationBuffer.removeMessage begin [1] [s2]
03:56:43 '187 35a2_cpp rotationBuffer.removeMessage end return=[s2] next=[2]
03:56:43 '187 35a2_cpp (P)___ WKP.bindQueue.onSuccess [s2daze]
03:56:43 '187 35a2_cpp[0] connection.close begin
03:56:43 '187 35a2_cpp[0] channel.close begin
03:56:43 '187 35a2_cpp[0] channel.close end
03:56:43 '187 35a2_cpp[0] connection.close end
03:56:43 '193 35a2_cpp (P)___ WKP.bindQueue.publish.commited []
03:56:43 '193 35a2_cpp (P)___ WKP.SM_ ev_run end
03:56:43 '193 35a2_cpp (P)___ WKP.SM_ end
03:56:43 '193 35a2_cpp (P)___ WKP.SM_ begin
03:56:43 '193 35a2_cpp (P)___ WKP.SM_.declareQueue begin
03:56:43 '193 35a2_cpp[0] channel.open begin
03:56:43 '194  35a2_cpp[0] connection.open begin
03:56:43 '194 35a2_cpp[0] connection.open end
03:56:43 '194 35a2_cpp[0] channel.open end
03:56:43 '194 35a2_cpp (P)___ WKP.SM_.declareQueue end
03:56:43 '194 35a2_cpp (P)___ WKP.SM_.bindQueue begin
03:56:43 '194 35a2_cpp (P)___ WKP.SM_.bindQueue end
03:56:43 '194 35a2_cpp (P)___ WKP.SM_ ev_run begin
03:56:43 '198 35a2_cpp rotationBuffer.removeMessage begin [2] [s3]
03:56:43 '198 35a2_cpp rotationBuffer.removeMessage end return=[s3] next=[3]
03:56:43 '198 35a2_cpp (P)___ WKP.bindQueue.onSuccess [s3daze]
03:56:43 '198 35a2_cpp[0] connection.close begin
03:56:43 '198 35a2_cpp[0] channel.close begin
03:56:43 '198 35a2_cpp[0] channel.close end
03:56:43 '198 35a2_cpp[0] connection.close end
03:56:43 '199 35a2_cpp (P)___ WKP.bindQueue.publish.commited []
03:56:43 '200 35a2_cpp (P)___ WKP.SM_ ev_run end
03:56:43 '200 35a2_cpp (P)___ WKP.SM_ end
03:56:43 '200 35a2_cpp (P)___ WKP.SM_ begin
03:56:43 '200 35a2_cpp (P)___ WKP.SM_.declareQueue begin
03:56:43 '200 35a2_cpp[0] channel.open begin
03:56:43 '200  35a2_cpp[0] connection.open begin
03:56:43 '200 35a2_cpp[0] connection.open end
03:56:43 '200 35a2_cpp[0] channel.open end
03:56:43 '200 35a2_cpp (P)___ WKP.SM_.declareQueue end
03:56:43 '200 35a2_cpp (P)___ WKP.SM_.bindQueue begin
03:56:43 '200 35a2_cpp (P)___ WKP.SM_.bindQueue end
03:56:43 '200 35a2_cpp (P)___ WKP.SM_ ev_run begin
03:56:43 '204 35a2_cpp rotationBuffer.removeMessage begin [3] [s4]
03:56:43 '204 35a2_cpp rotationBuffer.removeMessage end return=[s4] next=[4]
03:56:43 '204 35a2_cpp (P)___ WKP.bindQueue.onSuccess [s4daze]
03:56:43 '204 35a2_cpp[0] connection.close begin
03:56:43 '204 35a2_cpp[0] channel.close begin
03:56:43 '204 35a2_cpp[0] channel.close end
03:56:43 '204 35a2_cpp[0] connection.close end
03:56:43 '205 35a2_cpp (P)___ WKP.bindQueue.publish.commited []
03:56:43 '206 35a2_cpp (P)___ WKP.SM_ ev_run end
03:56:43 '206 35a2_cpp (P)___ WKP.SM_ end
03:56:43 '206 35a2_cpp (P)___ WKP.SM_ begin
03:56:43 '206 35a2_cpp (P)___ WKP.SM_.declareQueue begin
03:56:43 '206 35a2_cpp[0] channel.open begin
03:56:43 '206  35a2_cpp[0] connection.open begin
03:56:43 '206 35a2_cpp[0] connection.open end
03:56:43 '206 35a2_cpp[0] channel.open end
03:56:43 '206 35a2_cpp (P)___ WKP.SM_.declareQueue end
03:56:43 '206 35a2_cpp (P)___ WKP.SM_.bindQueue begin
03:56:43 '206 35a2_cpp (P)___ WKP.SM_.bindQueue end
03:56:43 '206 35a2_cpp (P)___ WKP.SM_ ev_run begin
03:56:43 '218 35a2_cpp rotationBuffer.removeMessage begin [4] [s5]
03:56:43 '218 35a2_cpp rotationBuffer.removeMessage end return=[s5] next=[5]
03:56:43 '218 35a2_cpp (P)___ WKP.bindQueue.onSuccess [s5daze]
03:56:43 '218 35a2_cpp[0] connection.close begin
03:56:43 '218 35a2_cpp[0] channel.close begin
03:56:43 '218 35a2_cpp[0] channel.close end
03:56:43 '218 35a2_cpp[0] connection.close end
03:56:43 '219 35a2_cpp (P)___ WKP.bindQueue.publish.commited []
03:56:43 '220 35a2_cpp (P)___ WKP.SM_ ev_run end
03:56:43 '220 35a2_cpp (P)___ WKP.SM_ end
03:56:43 '220 35a2_cpp (P)___ WKP.SM_ begin
03:56:43 '220 35a2_cpp (P)___ WKP.SM_.declareQueue begin
03:56:43 '220 35a2_cpp[0] channel.open begin
03:56:43 '220  35a2_cpp[0] connection.open begin
03:56:43 '220 35a2_cpp[0] connection.open end
03:56:43 '220 35a2_cpp[0] channel.open end
03:56:43 '220 35a2_cpp (P)___ WKP.SM_.declareQueue end
03:56:43 '220 35a2_cpp (P)___ WKP.SM_.bindQueue begin
03:56:43 '220 35a2_cpp (P)___ WKP.SM_.bindQueue end
03:56:43 '220 35a2_cpp (P)___ WKP.SM_ ev_run begin
03:56:43 '225 35a2_cpp rotationBuffer.removeMessage begin [5] [s6]
03:56:43 '225 35a2_cpp rotationBuffer.removeMessage end return=[s6] next=[6]
03:56:43 '225 35a2_cpp (P)___ WKP.bindQueue.onSuccess [s6daze]
03:56:43 '225 35a2_cpp[0] connection.close begin
03:56:43 '225 35a2_cpp[0] channel.close begin
03:56:43 '225 35a2_cpp[0] channel.close end
03:56:43 '225 35a2_cpp[0] connection.close end
03:56:43 '226 35a2_cpp (P)___ WKP.bindQueue.publish.commited []
03:56:43 '227 35a2_cpp (P)___ WKP.SM_ ev_run end
03:56:43 '227 35a2_cpp (P)___ WKP.SM_ end
03:56:43 '227 35a2_cpp (P)___ WKP.SM_ begin
03:56:43 '227 35a2_cpp (P)___ WKP.SM_.declareQueue begin
03:56:43 '227 35a2_cpp[0] channel.open begin
03:56:43 '227  35a2_cpp[0] connection.open begin
03:56:43 '227 35a2_cpp[0] connection.open end
03:56:43 '227 35a2_cpp[0] channel.open end
03:56:43 '227 35a2_cpp (P)___ WKP.SM_.declareQueue end
03:56:43 '227 35a2_cpp (P)___ WKP.SM_.bindQueue begin
03:56:43 '227 35a2_cpp (P)___ WKP.SM_.bindQueue end
03:56:43 '227 35a2_cpp (P)___ WKP.SM_ ev_run begin
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) s1daze
34_cs <-------- [interrupt!] Dequeue(^q^) s2daze
34_cs <-------- [interrupt!] Dequeue(^q^) s3daze
34_cs <-------- [interrupt!] Dequeue(^q^) s4daze
34_cs <-------- [interrupt!] Dequeue(^q^) s5daze
34_cs <-------- [interrupt!] Dequeue(^q^) s6daze

まあ、6件溜まっていれば、初回に6連発が飛んでくるわな。

ne
34_cs publish begin [ne]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > usi
34_cs publish begin [usi]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) usidaze
tora
34_cs publish begin [tora]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) toradaze
u
34_cs publish begin [u]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) udaze
tatu
34_cs publish begin [tatu]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) tatudaze

それでも 最初の1件が 返ってこない。

./tamesi34_cs.exe

の起動直後に 問題があるのだろうか?

C#の方、接続を2つ用意しては

Windows 10 だと接続1つを使いまわしても動いているが、Ubuntu 16.04 だと分からない。
センド用、レシーブ用に接続を2つ用意するか。

関係なかった。

10秒待つと

終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
34_cs publish begin [a1]
34_cs channel.open [0] begin
34_cs channel.open [0] end
34_cs publish end
34_cs channel.close [0] begin
34_cs channel.close [0] end
Enqueue? > 34_cs <-------- [interrupt!] Dequeue(^q^) a1daze

うまいこといく。

じゃあ、プログラムは問題なくて、何秒待てばいいのか、という話し か。

デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > 1
14:01:55 104 34_cs ___(C) start begin
14:01:55 110 34_cs consumer.create [1] begin
14:01:55 110 34_cs consumer.create [1] end
14:01:55 111 34_cs channel.open [1] begin
14:01:55 111 34_cs connection.open begin
14:01:55 289 34_cs connection.open end
14:01:55 294 34_cs channel.open [1] end
14:01:55 296 34_cs ___(C) start end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
14:01:57 351 34_cs (P)___ enqueue begin [a1]
14:01:57 352 34_cs channel.open [0] begin
14:01:57 352 34_cs connection.open begin
14:01:57 370 34_cs connection.open end
14:01:57 372 34_cs channel.open [0] end
14:01:57 376 34_cs (P)___ enqueue end
14:01:57 379 34_cs connection.close [0] begin
14:01:57 380 34_cs channel.close [0] begin
14:01:57 386 34_cs channel.close [0] end
14:01:57 392 34_cs connection.close [0] end
Enqueue? > b1
14:02:00 710 34_cs (P)___ enqueue begin [b1]
14:02:00 711 34_cs channel.open [0] begin
14:02:00 711 34_cs connection.open begin
14:02:00 720 34_cs connection.open end
14:02:00 722 34_cs channel.open [0] end
14:02:00 722 34_cs (P)___ enqueue end
14:02:00 722 34_cs connection.close [0] begin
14:02:00 722 34_cs channel.close [0] begin
14:02:00 724 34_cs channel.close [0] end
14:02:00 726 34_cs connection.close [0] end
Enqueue? > 14:02:00 749 34_cs <-------- [interrupt!] Dequeue(^q^) b1daze

これが失敗するとき。

デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > 1
14:03:06 830 34_cs ___(C) start begin
14:03:06 837 34_cs consumer.create [1] begin
14:03:06 837 34_cs consumer.create [1] end
14:03:06 838 34_cs channel.open [1] begin
14:03:06 838 34_cs connection.open begin
14:03:07 031 34_cs connection.open end
14:03:07 037 34_cs channel.open [1] end
14:03:07 040 34_cs ___(C) start end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
14:03:14 278 34_cs (P)___ enqueue begin [a1]
14:03:14 278 34_cs channel.open [0] begin
14:03:14 278 34_cs connection.open begin
14:03:14 293 34_cs connection.open end
14:03:14 295 34_cs channel.open [0] end
14:03:14 301 34_cs (P)___ enqueue end
14:03:14 301 34_cs connection.close [0] begin
14:03:14 302 34_cs channel.close [0] begin
14:03:14 308 34_cs channel.close [0] end
14:03:14 316 34_cs <-------- [interrupt!] Dequeue(^q^) a1daze
14:03:14 319 34_cs connection.close [0] end

これが成功するとき。

3秒だと失敗して、7秒だと成功する。
成功するタイミングがあるんだったら、タイマーで教えてほしいぜ。

Windows 10 では

            consumer.Registered += GetRegisteredHandler();

こういうイベント・ハンドラーが使えた。 Ubuntu 16.04 にはあるのか?

Dequeue ? > 1
14:23:11 570 34_cs ___(C) start begin
14:23:11 578 34_cs consumer.create [1] begin
14:23:11 579 34_cs consumer.create [1] end
14:23:11 580 34_cs channel.open [1] begin
14:23:11 580 34_cs connection.open begin
14:23:11 765 34_cs connection.open end
14:23:11 770 34_cs channel.open [1] end
14:23:11 772 34_cs <-------- [interrupt!] Registered(^q^)
14:23:11 773 34_cs ___(C) start end
14:23:11 773 34_cs (P)___ waitRegistered begin
14:23:11 773 34_cs (P)___ waitRegistered end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
14:23:16 984 34_cs (P)___ enqueue begin [a1]
14:23:16 984 34_cs channel.open [0] begin
14:23:16 984 34_cs connection.open begin
14:23:16 992 34_cs connection.open end
14:23:16 994 34_cs channel.open [0] end
14:23:16 998 34_cs (P)___ enqueue end
14:23:16 998 34_cs connection.close [0] begin
14:23:16 999 34_cs channel.close [0] begin
14:23:17 004 34_cs channel.close [0] end
14:23:17 009 34_cs connection.close [0] end
Enqueue? > a1
14:23:19 431 34_cs (P)___ enqueue begin [a1]
14:23:19 432 34_cs channel.open [0] begin
14:23:19 433 34_cs connection.open begin
14:23:19 443 34_cs connection.open end
14:23:19 445 34_cs channel.open [0] end
14:23:19 445 34_cs (P)___ enqueue end
14:23:19 445 34_cs connection.close [0] begin
14:23:19 445 34_cs channel.close [0] begin
14:23:19 448 34_cs channel.close [0] end
14:23:19 448 34_cs connection.close [0] end
Enqueue? > 14:23:19 469 34_cs <-------- [interrupt!] Dequeue(^q^) a1daze

関係なかった。

じゃあ逆にやっぱ C++ 側の問題なのか?

05:23:17 '000 35a2_cpp rotationBuffer.putMessage end next=[49]
05:23:17 '000 35a2_cpp ___(C) CNS.received.ack begin [49] [a1]
05:23:17 '000 35a2_cpp ___(C) CNS.received.ack end [49]
05:23:17 '000 35a2_cpp ___(C) CNS.received end [a1]
05:23:17 '001 35a2_cpp rotationBuffer.removeMessage begin [48] [a1]
05:23:17 '001 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[49]
05:23:17 '001 35a2_cpp (P)___ WKP.bindQueue.onSuccess [a1daze]
05:23:17 '001 35a2_cpp[0] connection.close begin
05:23:17 '001 35a2_cpp[0] channel.close begin
05:23:17 '001 35a2_cpp[0] channel.close end
05:23:17 '001 35a2_cpp[0] connection.close end
05:23:17 '001 35a2_cpp (P)___ WKP.SM_ ev_run end
05:23:17 '001 35a2_cpp (P)___ WKP.SM_ end
05:23:17 '001 35a2_cpp (P)___ WKP.SM_ begin
05:23:17 '002 35a2_cpp (P)___ WKP.SM_.declareQueue begin
05:23:17 '002 35a2_cpp[0] channel.open begin
05:23:17 '002  35a2_cpp[0] connection.open begin
05:23:17 '002 35a2_cpp[0] connection.open end
05:23:17 '002 35a2_cpp[0] channel.open end
05:23:17 '002 35a2_cpp (P)___ WKP.SM_.declareQueue end
05:23:17 '002 35a2_cpp (P)___ WKP.SM_.bindQueue begin
05:23:17 '002 35a2_cpp (P)___ WKP.SM_.bindQueue end
05:23:17 '002 35a2_cpp (P)___ WKP.SM_ ev_run begin
05:23:19 '447 35a2_cpp ___(C) CNS.received begin [a1]
05:23:19 '447 35a2_cpp rotationBuffer.putMessage begin [49] before lock [a1]
05:23:19 '447 35a2_cpp rotationBuffer.putMessage begin [49] [a1]
05:23:19 '447 35a2_cpp rotationBuffer.putMessage end next=[50]
05:23:19 '447 35a2_cpp ___(C) CNS.received.ack begin [50] [a1]
05:23:19 '447 35a2_cpp ___(C) CNS.received.ack end [50]
05:23:19 '447 35a2_cpp ___(C) CNS.received end [a1]
05:23:19 '457 35a2_cpp rotationBuffer.removeMessage begin [49] [a1]
05:23:19 '457 35a2_cpp rotationBuffer.removeMessage end return=[a1] next=[50]
05:23:19 '457 35a2_cpp (P)___ WKP.bindQueue.onSuccess [a1daze]
05:23:19 '457 35a2_cpp[0] connection.close begin
05:23:19 '457 35a2_cpp[0] channel.close begin
05:23:19 '458 35a2_cpp[0] channel.close end
05:23:19 '458 35a2_cpp[0] connection.close end
05:23:19 '459 35a2_cpp (P)___ WKP.bindQueue.publish.commited []
05:23:19 '466 35a2_cpp (P)___ WKP.SM_ ev_run end
05:23:19 '466 35a2_cpp (P)___ WKP.SM_ end
05:23:19 '466 35a2_cpp (P)___ WKP.SM_ begin
05:23:19 '466 35a2_cpp (P)___ WKP.SM_.declareQueue begin
05:23:19 '466 35a2_cpp[0] channel.open begin
05:23:19 '466  35a2_cpp[0] connection.open begin
05:23:19 '467 35a2_cpp[0] connection.open end
05:23:19 '467 35a2_cpp[0] channel.open end
05:23:19 '467 35a2_cpp (P)___ WKP.SM_.declareQueue end
05:23:19 '467 35a2_cpp (P)___ WKP.SM_.bindQueue begin
05:23:19 '467 35a2_cpp (P)___ WKP.SM_.bindQueue end
05:23:19 '467 35a2_cpp (P)___ WKP.SM_ ev_run begin

コミットできてないんじゃないか?

C++側でコミットに失敗するケースとは?

デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > 1
14:46:47 424 34_cs ___(C) start begin
14:46:47 431 34_cs consumer.create [1] begin
14:46:47 431 34_cs consumer.create [1] end
14:46:47 433 34_cs channel.open [1] begin
14:46:47 433 34_cs connection.open begin
14:46:47 600 34_cs connection.open end
14:46:47 605 34_cs channel.open [1] end
14:46:47 607 34_cs <-------- [interrupt!] Registered(^q^)
14:46:47 608 34_cs ___(C) start end
14:46:47 608 34_cs (P)___ waitRegistered begin
14:46:47 608 34_cs (P)___ waitRegistered end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > a1
14:46:49 703 34_cs (P)___ enqueue begin [a1]
14:46:49 703 34_cs channel.open [0] begin
14:46:49 704 34_cs connection.open begin
14:46:49 708 34_cs connection.open end
14:46:49 710 34_cs channel.open [0] end
14:46:49 713 34_cs (P)___ enqueue end
14:46:49 714 34_cs connection.close [0] begin
14:46:49 714 34_cs channel.close [0] begin
14:46:49 719 34_cs channel.close [0] end
14:46:49 724 34_cs connection.close [0] end
Enqueue? > 14:46:49 744 34_cs <-------- [interrupt!] Dequeue(^q^) a1daze
a2
14:46:51 199 34_cs (P)___ enqueue begin [a2]
14:46:51 199 34_cs channel.open [0] begin
14:46:51 199 34_cs connection.open begin
14:46:51 204 34_cs connection.open end
14:46:51 206 34_cs channel.open [0] end
14:46:51 207 34_cs (P)___ enqueue end
14:46:51 207 34_cs connection.close [0] begin
14:46:51 207 34_cs channel.close [0] begin
14:46:51 209 34_cs channel.close [0] end
14:46:51 209 34_cs connection.close [0] end
Enqueue? > 14:46:51 224 34_cs <-------- [interrupt!] Dequeue(^q^) a2daze

今度は すぐに応答が返ってきた。コメントを書き直してコンパイルして実行したが、何が違うのだろう?

05:48:09'921 35a2_cpp[0] connection.open end
05:48:09'921 35a2_cpp[0] channel.open end
05:48:09'921 35a2_cpp (P)___ WKP.SM_.declareQueue end
05:48:09'921 35a2_cpp (P)___ WKP.SM_.bindQueue begin
05:48:09'921 35a2_cpp (P)___ WKP.SM_.bindQueue end
05:48:09'921 35a2_cpp (P)___ WKP.SM_ ev_run begin
05:48:11'733 35a2_cpp ___(C) CNS.received begin [a4]
05:48:11'733 35a2_cpp rotationBuffer.putMessage begin [3] before lock [a4]
05:48:11'733 35a2_cpp rotationBuffer.putMessage begin [3] [a4]
05:48:11'733 35a2_cpp rotationBuffer.putMessage end next=[4]
05:48:11'733 35a2_cpp ___(C) CNS.received.ack begin [4] [a4]
05:48:11'733 35a2_cpp ___(C) CNS.received.ack end [4]
05:48:11'733 35a2_cpp ___(C) CNS.received end [a4]
05:48:11'747 35a2_cpp rotationBuffer.removeMessage begin [3] [a4]
05:48:11'748 35a2_cpp rotationBuffer.removeMessage end return=[a4] next=[4]
05:48:11'748 35a2_cpp (P)___ WKP.bindQueue.onSuccess next,publish message=[a4daze] ****important****
05:48:11'748 35a2_cpp[0] connection.close begin
05:48:11'748 35a2_cpp[0] channel.close begin
05:48:11'748 35a2_cpp[0] channel.close end
05:48:11'748 35a2_cpp[0] connection.close end
05:48:11'749 35a2_cpp (P)___ WKP.bindQueue.publish.commited [] **********important**********
05:48:11'751 35a2_cpp (P)___ WKP.SM_ ev_run end
05:48:11'751 35a2_cpp (P)___ WKP.SM_ end
05:48:11'751 35a2_cpp (P)___ WKP.SM_ begin
05:48:11'751 35a2_cpp (P)___ WKP.SM_.declareQueue begin
05:48:11'751 35a2_cpp[0] channel.open begin
05:48:11'751  35a2_cpp[0] connection.open begin
05:48:11'751 35a2_cpp[0] connection.open end
05:48:11'752 35a2_cpp[0] channel.open end
05:48:11'752 35a2_cpp (P)___ WKP.SM_.declareQueue end
05:48:11'752 35a2_cpp (P)___ WKP.SM_.bindQueue begin
05:48:11'752 35a2_cpp (P)___ WKP.SM_.bindQueue end
05:48:11'752 35a2_cpp (P)___ WKP.SM_ ev_run begin
05:48:12'999 35a2_cpp ___(C) CNS.received begin [a5]
05:48:12'999 35a2_cpp rotationBuffer.putMessage begin [4] before lock [a5]
05:48:12'999 35a2_cpp rotationBuffer.putMessage begin [4] [a5]
05:48:12'999 35a2_cpp rotationBuffer.putMessage end next=[5]
05:48:12'999 35a2_cpp ___(C) CNS.received.ack begin [5] [a5]
05:48:12'999 35a2_cpp ___(C) CNS.received.ack end [5]
05:48:12'999 35a2_cpp ___(C) CNS.received end [a5]
05:48:13'013 35a2_cpp rotationBuffer.removeMessage begin [4] [a5]
05:48:13'013 35a2_cpp rotationBuffer.removeMessage end return=[a5] next=[5]
05:48:13'013 35a2_cpp (P)___ WKP.bindQueue.onSuccess next,publish message=[a5daze] ****important****
05:48:13'013 35a2_cpp[0] connection.close begin
05:48:13'013 35a2_cpp[0] channel.close begin
05:48:13'013 35a2_cpp[0] channel.close end
05:48:13'013 35a2_cpp[0] connection.close end
05:48:13'014 35a2_cpp (P)___ WKP.bindQueue.publish.commited [] **********important**********
05:48:13'015 35a2_cpp (P)___ WKP.SM_ ev_run end
05:48:13'015 35a2_cpp (P)___ WKP.SM_ end
05:48:13'015 35a2_cpp (P)___ WKP.SM_ begin
05:48:13'015 35a2_cpp (P)___ WKP.SM_.declareQueue begin
05:48:13'015 35a2_cpp[0] channel.open begin
05:48:13'015  35a2_cpp[0] connection.open begin
05:48:13'015 35a2_cpp[0] connection.open end

important という目に見えやすいコメントを付けたが、
ここで commit のときに [] と空文字列になっているのが問題だ。ポインターの挙動をわたしが理解していないということだろう。何を指しているのか。

ソースコード

tamesi34_cs.cs

// OS      : Windows 10
// IDE     : Visual Studio 2015
//           Install : NuGet   : Install-Package RabbitMQ.Client -Version 4.1.1
//
// OS      : Ubuntu 16.04

// Service : RabbitMQ
//         : Install   : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
//         : Reference : Web site : Top page http://www.rabbitmq.com/
//                     : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
//                     : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
//         : Manual    : Command  : man rabbitmqctl
//         : Start     : Command  : (1) rabbitmq-server
//                                : (2) [Ctrl] + [Z]
//                                : (3) bg (Job Number)
//         : Stop      : Command  : rabbitmqctl stop
//         : Check     : Command  : rabbitmqctl status
//                     : Command  : rabbitmqctl list_queues
//                                : // Count messages in queues.
//
// Program : this
//         : Author                         : Satoshi TAKAHASHI (Handle. Muzudho)
//         : License                        : MIT License https://opensource.org/licenses/MIT
//         : Explain                        : For test. Practice of interprocess communication.
//                                          : This program run on the background side.
//                                          : Delete queue,
//                                          :   Or Send message to message queue.
//                                          :      Interrupt received message from queue.
//                                          : Please settings source queue, destination queue to command line argument.
//                                : Explain : command line argument of life span of queue (Compositable)
//                                          :     durable     The queue exists, Even if the RabbitMQ service closed.
//                                          :     autodelete  The queue delete when consumers to be nothing.
//                                          :     passive     Just check what queue exists. Don't use read/write to queue.
//                                          :     exclusive   The queue delete when connection closed. The queue exists is this connection only.
//         : Compile   : Command            : mcs /r:RabbitMQ.Client.dll -define:UBUNTU tamesi34_cs.cs
//                                : Explain :     mcs                     Mono c# compiler
//                                          :     /r:RabbitMQ.Client.dll  Append reference RabbitMQ
//                                          :     -define:UBUNTU          #define UBUNTU
//                                          :     tamesi34_cs.cs          Source file
//         : Execute   : Command            : ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
//                                          :     // Run on the foreground.
//                                : Explain :     ./tamesi34_cs.exe         Executable file
//                                          :     2> ./tamesi34_cs.err.log  Standard error to log file
//         : Reference            : Web site: QueueDeclare (v1.0) http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
//                                          : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
//                                          : QueueDelete (v1.4) https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.QueueDelete(System.UInt16,System.String,System.Boolean,System.Boolean,System.Boolean)
//                                          : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
//                                          : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
//
//--------------------------------------------------------------------------------
// tamesi34_cs.cs

// Ubuntu version. Source code is old. API is difference.
// #define UBUNTU

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace UsagiMQ
{
    /// <summary>
    /// This number is this program only. AMQP-CPP implements is difference.
    /// AMQP-CPP implements(C++) : AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
    /// </summary>
    [Flags]
    enum LifeSpanType
    {
        Durable = 1,
        Autodelete = 0x1<<1,
        Passive = 0x1 << 2,
        Exclusive = 0x1 << 3
    }

    class Program
    {
        static string Dump()
        {
            StringBuilder sb = new StringBuilder();
            sb.AppendLine("tamesi34_cs.cs Dump");
            sb.AppendLine("    name_queues         [ENQUEUE_INDEX]    =[" + name_queues[ENQUEUE_INDEX]+"]");
            sb.AppendLine("    lifeSpan_queues     [ENQUEUE_INDEX]    =[" + lifeSpan_queues[ENQUEUE_INDEX] + "]");
            sb.AppendLine("    durable_lifeSpans   [ENQUEUE_INDEX]    =[" + durable_lifeSpans[ENQUEUE_INDEX] + "]");
            sb.AppendLine("    autodelete_lifeSpans[ENQUEUE_INDEX]    =[" + autodelete_lifeSpans[ENQUEUE_INDEX] + "]");
            sb.AppendLine("    passive_lifeSpans   [ENQUEUE_INDEX]    =[" + passive_lifeSpans[ENQUEUE_INDEX] + "]");
            sb.AppendLine("    exclusive_lifeSpans [ENQUEUE_INDEX]    =[" + exclusive_lifeSpans[ENQUEUE_INDEX] + "]");
            sb.AppendLine("    ----");
            sb.AppendLine("    name_queues         [DEQUEUE_INDEX]    =[" + name_queues[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    lifeSpan_queues     [DEQUEUE_INDEX]    =[" + lifeSpan_queues[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    durable_lifeSpans   [DEQUEUE_INDEX]    =[" + durable_lifeSpans[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    autodelete_lifeSpans[DEQUEUE_INDEX]    =[" + autodelete_lifeSpans[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    passive_lifeSpans   [DEQUEUE_INDEX]    =[" + passive_lifeSpans[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    exclusive_lifeSpans [DEQUEUE_INDEX]    =[" + exclusive_lifeSpans[DEQUEUE_INDEX] + "]");
            sb.AppendLine("    ----");
            sb.AppendLine("    name_queues         [DELETEQUEUE_INDEX]=[" + name_queues[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    lifeSpan_queues     [DELETEQUEUE_INDEX]=[" + lifeSpan_queues[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    durable_lifeSpans   [DELETEQUEUE_INDEX]=[" + durable_lifeSpans[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    autodelete_lifeSpans[DELETEQUEUE_INDEX]=[" + autodelete_lifeSpans[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    passive_lifeSpans   [DELETEQUEUE_INDEX]=[" + passive_lifeSpans[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    exclusive_lifeSpans [DELETEQUEUE_INDEX]=[" + exclusive_lifeSpans[DELETEQUEUE_INDEX] + "]");
            sb.AppendLine("    ----");
            return sb.ToString();
        }
        static string GetNow()
        {
            return DateTime.Now.ToString("HH:mm:ss fff ");
        }

        const int ENQUEUE_INDEX = 0;
        const int DEQUEUE_INDEX = 1;
        const int DELETEQUEUE_INDEX = 2;
        const int NUM_INDEX = 3;
        const string HOST_NAME = "localhost";
        static string[] name_queues = new string[NUM_INDEX];
        static LifeSpanType[] lifeSpan_queues = new LifeSpanType[NUM_INDEX];
        static bool[]
            durable_lifeSpans = new bool[NUM_INDEX],
            autodelete_lifeSpans = new bool[NUM_INDEX],
            passive_lifeSpans = new bool[NUM_INDEX],
            exclusive_lifeSpans = new bool[NUM_INDEX];
        static void SetLifeSpan(int index_queue, string name_queue, LifeSpanType lifeSpan)
        {
            name_queues[index_queue] = name_queue;
            lifeSpan_queues[index_queue] = lifeSpan;

            // 一旦クリアー
            durable_lifeSpans[index_queue] = false;
            autodelete_lifeSpans[index_queue] = false;
            passive_lifeSpans[index_queue] = false;
            exclusive_lifeSpans[index_queue] = false;

            // durable
            if (((int)lifeSpan & (int)LifeSpanType.Durable) == (int)LifeSpanType.Durable) { durable_lifeSpans[index_queue] = true; }
            // autodelete
            if (((int)lifeSpan & (int)LifeSpanType.Autodelete) == (int)LifeSpanType.Autodelete) { autodelete_lifeSpans[index_queue] = true; }
            // passive
            if (((int)lifeSpan & (int)LifeSpanType.Passive) == (int)LifeSpanType.Passive) { passive_lifeSpans[index_queue] = true; }
            // exclusive
            if (((int)lifeSpan & (int)LifeSpanType.Exclusive) == (int)LifeSpanType.Exclusive) { exclusive_lifeSpans[index_queue] = true; }
        }

        public static ConnectionFactory GetFactory()
        {
            if (null == m_factory_)
            {
                m_factory_ = new ConnectionFactory() { HostName = HOST_NAME };
            }
            return m_factory_;
        }
        static ConnectionFactory m_factory_;

        public static IConnection GetConnection(int index)
        {
            if (null == m_connection_[index])
            {
                Console.WriteLine(GetNow()+"34_cs connection.open begin");
                m_connection_[index] = GetFactory().CreateConnection();
                Console.WriteLine(GetNow() + "34_cs connection.open end");
            }
            return m_connection_[index];
        }
        static IConnection[] m_connection_ = new IConnection[NUM_INDEX];

        public static IModel GetChannel(int index)
        {
            if (null == m_channels_[index])
            {
                Console.WriteLine(GetNow() + "34_cs channel.open [" + index + "] begin");
                m_channels_[index] = GetConnection(index).CreateModel();

                // 引数が7つの QueueDeclare でエラーを吐くことがあるので、情報を出力しておく。
                //Console.Error.WriteLine(Dump());
                //Console.Error.WriteLine("(tamesi34_cs.cs?) GetChannel index = [" + index + "]");
                //Console.Error.WriteLine("    name_queues[index] = [" + name_queues[index] + "]");
                //Console.Error.WriteLine("    passive_lifeSpans[index] = [" + passive_lifeSpans[index] + "]");
                //Console.Error.WriteLine("    durable_lifeSpans[index] = [" + durable_lifeSpans[index] + "]");
                //Console.Error.WriteLine("    exclusive_lifeSpans[index] = [" + exclusive_lifeSpans[index] + "]");
                //Console.Error.WriteLine("    autodelete_lifeSpans[index] = [" + autodelete_lifeSpans[index] + "]");
                //Console.Error.WriteLine("    nowait は仮に false 固定");
                //Console.Error.WriteLine("    arguments は仮に null 固定");

#if UBUNTU
                // Ubuntuでは何故か Spring.Messaging.Amqp.Rabbit の引数 7 つのやつになっている。
                // RabbitMQ.Client.Framing.Impl.v0_8.Model.QueueDeclare (System.String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Boolean nowait, IDictionary arguments)
                // そして false になっているようだ。
                m_channels_[index].QueueDeclare(name_queues[index], passive_lifeSpans[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], false, null);
                //m_channels_[index].QueueDeclare(name_queues[index], false, false, false, false, false, null);
#else
                m_channels_[index].QueueDeclare(name_queues[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], null);
#endif
                Console.WriteLine(GetNow() + "34_cs channel.open [" + index + "] end");
            }
            return m_channels_[index];
        }
        static IModel[] m_channels_ = new IModel[NUM_INDEX];

        public static EventingBasicConsumer GetConsumer(int index)
        {
            if (null == m_consumers_[index])
            {
                Console.WriteLine(GetNow() + "34_cs consumer.create [" + index + "] begin");
#if UBUNTU
                // Ubuntuでは何故か v1.4.0 の引数が 0 個のやつになっている。調べたが引数が1個~6個のものは無かった。
                m_consumers_[index] = new EventingBasicConsumer();
#else
                m_consumers_[index] = new EventingBasicConsumer(GetChannel(index));
#endif
                Console.WriteLine(GetNow() + "34_cs consumer.create [" + index + "] end");
            }
            return m_consumers_[index];
        }
        static EventingBasicConsumer[] m_consumers_ = new EventingBasicConsumer[NUM_INDEX];

        /// <summary>
        /// コンシューマーが準備できたときに割り込んでくる処理
        /// </summary>
#if UBUNTU
        public static ConsumerEventHandler GetRegisteredHandler()
#else
        public static EventHandler<ConsumerEventArgs> GetRegisteredHandler()
#endif
        {
            if (null == m_registeredHandler_)
            {
#if UBUNTU
                m_registeredHandler_ = new ConsumerEventHandler((model, ea) =>
#else
                m_registeredHandler_ = new EventHandler<ConsumerEventArgs>((model, ea) =>
#endif
                {
                    Console.WriteLine(GetNow() + "34_cs <-------- [interrupt!] Registered(^q^)");
                    flag_registered = true;
                });
            }

            return m_registeredHandler_;
        }

#if UBUNTU
        static ConsumerEventHandler m_registeredHandler_;
#else
        static EventHandler<ConsumerEventArgs> m_registeredHandler_;
#endif
        static bool flag_registered;

        /// <summary>
        /// 受信できたときに割り込んでくる処理
        /// </summary>
#if UBUNTU
        public static BasicDeliverEventHandler GetDequeueHandler()
#else
        public static EventHandler<BasicDeliverEventArgs> GetDequeueHandler()
#endif
        {
            if (null == m_dequeueHandler_)
            {
#if UBUNTU
                m_dequeueHandler_ = new BasicDeliverEventHandler((model, ea) =>
#else
                m_dequeueHandler_ = new EventHandler<BasicDeliverEventArgs>((model, ea) =>
#endif
                {
                    byte[] body = ea.Body;
                    string message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(GetNow() + "34_cs <-------- [interrupt!] Dequeue(^q^) {0}", message);
                });
            }

            return m_dequeueHandler_;
        }

#if UBUNTU
        static BasicDeliverEventHandler m_dequeueHandler_;
#else
        static EventHandler<BasicDeliverEventArgs> m_dequeueHandler_;
#endif

        /// <summary>
        /// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
        /// </summary>
        static void CloseConnection(int index)
        {
            if (null != m_connection_[index])
            {
                Console.WriteLine(GetNow() + "34_cs connection.close [" + index + "] begin");
                // フラグとして、ヌルを入れる
                CloseChannel(index);

                m_connection_[index].Close();
                m_connection_[index] = null;
                Console.WriteLine(GetNow() + "34_cs connection.close [" + index + "] end");
            }
        }
        /// <summary>
        /// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
        /// </summary>
        static void CloseChannel(int index)
        {
            if (null != m_channels_[index])
            {
                Console.WriteLine(GetNow() + "34_cs channel.close [" + index + "] begin");
                m_channels_[index].Close();
                m_channels_[index] = null;
                Console.WriteLine(GetNow() + "34_cs channel.close [" + index + "] end");
            }
        }

        static void Main(string[] args)
        {
            //----------------------------------------
            // Delete
            //----------------------------------------
            for (;;)
            {
                Console.Write(@"削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
Name         : キューを削除します
"""" + [Enter] : 次のステップへ進む
ex.) BrokenBox
Delete ? > ");
                string queueName_delete = Console.ReadLine();
                if (""== queueName_delete.Trim())
                {
                    break;
                }


                Console.Write(@"削除するメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
("+ ((int)LifeSpanType.Durable) + @") durable    : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Delete ? > ");
                LifeSpanType lifeSpan_delete = (LifeSpanType)int.Parse(Console.ReadLine());
                SetLifeSpan(DELETEQUEUE_INDEX, queueName_delete, lifeSpan_delete);
                uint result = DeleteQueue();
                Console.WriteLine(@"["+ queueName_delete + "]キューを削除したはずだぜ☆(^~^) result=["+ result + "] ただし、中にメッセージが残っていると消すのに失敗するみたいだが。");
            }

            //----------------------------------------
            // Enqueue settings
            //----------------------------------------
            for (;;)
            {
                Console.Write(@"エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) SunnyBox
Enqueue ? > ");
                string queueName_enqueue = Console.ReadLine();

                Console.Write(@"エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(" + ((int)LifeSpanType.Durable) + @") durable    : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 3
Enqueue ? > ");
                int lifeSpan_enqueue;
                if(int.TryParse(Console.ReadLine(),out lifeSpan_enqueue))
                {
                    SetLifeSpan(ENQUEUE_INDEX, queueName_enqueue, (LifeSpanType)lifeSpan_enqueue);
                    break;
                }
            }

            //----------------------------------------
            // Enqueue settings
            //----------------------------------------
            EventingBasicConsumer consumer = null;
            for (;;)
            {
                Console.Write(@"デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > ");
                string queueName_dequeue = Console.ReadLine();

                Console.Write(@"デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(" + ((int)LifeSpanType.Durable) + @") durable    : RabbitMQが止まってもキューを残す
(" + ((int)LifeSpanType.Autodelete) + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + ((int)LifeSpanType.Passive) + @") passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + ((int)LifeSpanType.Exclusive) + @") exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > ");
                int lifeSpan_dequeue;
                if(int.TryParse(Console.ReadLine(),out lifeSpan_dequeue))
                {
                    SetLifeSpan(DEQUEUE_INDEX, queueName_dequeue, (LifeSpanType)lifeSpan_dequeue);
                    break;
                }
            }

            StartConsume(out consumer);

            Console.WriteLine(GetNow() + "34_cs (P)___ waitRegistered begin");
            while (!flag_registered)
            {
                Thread.Sleep(20);
            }
            Console.WriteLine(GetNow() + "34_cs (P)___ waitRegistered end");

            Console.Write(@"終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > ");
            for (;;)
            {
                // "Hello World!" などを入力
                string line = Console.ReadLine();
                Enqueue(line);
                Console.Write(@"Enqueue? > ");
            }

            // ここには来ない
            // CloseConnection();
        }

        static uint DeleteQueue()
        {
            IModel channel = GetChannel(DELETEQUEUE_INDEX);

#if UBUNTU
            // Ubuntuでは何故か 昔の .Net 用の引数 5 つのやつの変則版になっている。
            // uint QueueDelete(ushort ticket, string queue, bool ifUnused, bool ifEmpty, bool nowait)
            uint result = channel.QueueDelete( name_queues[DELETEQUEUE_INDEX],true,true,true);
#else
            uint result = channel.QueueDelete(name_queues[DELETEQUEUE_INDEX],true,true);
#endif

            // 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
            CloseConnection(DELETEQUEUE_INDEX);
            //CloseChannel(DELETEQUEUE_INDEX);
            return result;
        }

        static void Enqueue(string message)
        {
            byte[] body = Encoding.UTF8.GetBytes(message);

            Console.WriteLine(GetNow() + "34_cs (P)___ enqueue begin [" + message + "]");
            GetChannel(ENQUEUE_INDEX).BasicPublish("", name_queues[ENQUEUE_INDEX], null, body);
            Console.WriteLine(GetNow() + "34_cs (P)___ enqueue end");

            // 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
            CloseConnection(ENQUEUE_INDEX);
            //CloseChannel(ENQUEUE_INDEX);
        }

        static void StartConsume(out EventingBasicConsumer consumer)
        {
            Console.WriteLine(GetNow() + "34_cs ___(C) start begin");

            consumer = GetConsumer(DEQUEUE_INDEX); // ローカル変数だとスコープが切れて寿命が尽きてしまうので、外側に返す

            // 受信できたときに割り込んでくる処理
            consumer.Received += GetDequeueHandler();
            consumer.Registered += GetRegisteredHandler();

#if UBUNTU
            // Ubuntuでは何故か引数が 6 個のやつに似ている。
            // string BasicConsume(ushort ticket, string queue, bool noAck, string consumerTag, IDictionary filter, IBasicConsumer consumer)
            GetChannel(DEQUEUE_INDEX).BasicConsume( name_queues[DEQUEUE_INDEX], true, "", null, consumer);
#else
            GetChannel(DEQUEUE_INDEX).BasicConsume( name_queues[DEQUEUE_INDEX], true, consumer);
#endif
            // 終了はさせない
            // consumer.Received -= GetReceiveHandler();
            // CloseChannel(DEQUEUE_INDEX);

            Console.WriteLine(GetNow() + "34_cs ___(C) start end");
        }
    }
}

tamesi35a2_cpp.cpp

//--------------------------------------------------------------------------------
// OS      : Windows10 : It not work. This program for Ubuntu 16.04.
//
// OS      : Ubuntu 16.04
// Library : libev
//         : Install   : Command  : sudo apt-get update
//                                : sudo apt-get install libev-dev
// Service : RabbitMQ
//         : Install   : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
//         : Reference : Web site : Top page http://www.rabbitmq.com/
//                     : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
//                     : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
//         : Manual    : Command  : man rabbitmqctl
//         : Start     : Command  : (1) rabbitmq-server
//                                : (2) [Ctrl] + [Z]
//                                : (3) bg (Job Number)
//         : Stop      : Command  : rabbitmqctl stop
//         : Check     : Command  : rabbitmqctl status
//         :           : Command  : rabbitmqctl list_queues
//                                : // Count messages in queues.
// Library : AMQP-CPP
//         : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
//         : Reference : Web site : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
//         : Reference : Web site : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
//         : Reference : Web site : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
//         : Reference : Web site : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
//         : Reference : Web site : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//
// Program : this
//         : Author                         : Satoshi TAKAHASHI (Handle. Muzudho)
//         : License                        : MIT License https://opensource.org/licenses/MIT
//         : Explain                        : For test. Practice of interprocess communication.
//                                          : This program run on the background side.
//                                          : Append daze to received message on interprocess communication.
//                                          : Daze meaning is dialect in the suburbs of Tokyo.
//                                          : Please settings source queue, destination queue to command line argument.
//                                          :   And standard input to /dev/null
//         : Compile   : Command            : g++ -std=c++11 tamesi35a2_cpp.cpp -o tamesi35a2_cpp.exe -lev -lamqpcpp -pthread
//                                : Explain :     g++                    C++ Compiler
//                                          :     -std=c++11             C++ 11
//                                          :     tamesi35a2_cpp.cpp     Source file
//                                          :     -o tamesi35a2_cpp.exe  Output executable file
//                                          :     -lev                   Used library libev
//                                          :     -lamqpcpp              Used library AMQP-CPP
//                                          :     -pthread               Used library thread
//         : Execute   : Command            : // Run on the background.
//                                          : ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
//                                : Explain :     ./tamesi35a2_cpp.exe         Executable file
//                                          :     --enqueue                    Destination queue settings section
//                                          :     1113                         Destination queue name             (string ok)
//                                          :     durable                      Destination queue life span        (multiple)
//                                          :     --dequeue                    Source queue settings section
//                                          :     1112                         Source queue name                  (string ok)
//                                          :     durable                      Source queue life span
//                                          :     autodelete                   Source queue life span
//                                          :     > ./tamesi35a2_cpp.out.log   Standard output to   ./tamesi35a2_cpp.out.log
//                                          :     2> ./tamesi35a2_cpp.err.log  Standard error  to   ./tamesi35a2_cpp.err.log
//                                          :     < /dev/null                  Standard input  from nothing
//                                          :     &                            Process goto the background
//                                : Explain : command line argument of life span of queue (Compositable)
//                                          :     durable     The queue exists, Even if the RabbitMQ service closed.
//                                          :     autodelete  The queue delete when consumers to be nothing.
//                                          :     passive     Just check what queue exists. Don't use read/write to queue.
//                                          :     exclusive   The queue delete when connection closed. The queue exists is this connection only.
//                                : Example : Windows 10   (This program not work) default queue life span.  durable
//                                          : Ubuntu 16.04                         default queue life span.  durable autodelete
//         : Stop      : Typing   : [Ctrl]+[C]
//--------------------------------------------------------------------------------

#include <string> // std::string
#include <iostream> // std::cout
#include <iomanip> // std::setfill(...)
#include <sstream> // std::ostringstream
#include <chrono>
#include <thread> // sleep_for
#include <future> // thread

// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

// AMQP-CPPでの実装 :
// AMQP::durable=[1] RabbitMQが止まってもキューを残す
// AMQP::autodelete=[2] コンシューマーが1人も接続していなかったら消す
// AMQP::passive=[8] キューが存在するかどうかチェックするだけ。中身見ない時これ
// AMQP::exclusive=[512] この接続でだけ使える。この接続が切れたら消す
typedef int LifeSpan_t;

static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
// 0 : enqueue用設定
// 1 : dequeue用設定
static const int ENQUEUE_INDEX = 0;
static const int DEQUEUE_INDEX = 1;
static const int NUM_INDEX = 2;
static std::string name_queues[] = { "", "" };
static LifeSpan_t lifeSpan_queues[] = { (LifeSpan_t)0, (LifeSpan_t)0 };
static LifeSpan_t LifeSpanString_To_Int(std::string lifeSpan)
{
    if ("durable"==lifeSpan) {
        return AMQP::durable;
    } else if ("autodelete" == lifeSpan) {
        return AMQP::autodelete;
    } else if ("passive" == lifeSpan) {
        return AMQP::passive;
    } else if ("exclusive" == lifeSpan) {
        return AMQP::exclusive;
    } else {
        std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
        exit(1);
    }
}
std::string exchange_name = "myexchange";
std::string routing_key = "";

static std::string Dump()
{
    static std::ostringstream sb;
    sb << "tamesi35a2_cs.cs Dump" << std::endl
        << "    name_queues         [ENQUEUE_INDEX]    =[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
        << "    lifeSpan_queues     [ENQUEUE_INDEX]    =[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl
        << "    ----" << std::endl
        << "    name_queues         [DEQUEUE_INDEX]    =[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl
        << "    lifeSpan_queues     [DEQUEUE_INDEX]    =[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl
        << "    ----" << std::endl;
    return sb.str();
}
// 時計はずれているが、現在時刻が返ってくる
static std::string getTimeslipNow()
{
    // どこか外国の 1969年12月31日19時ぐらいから数えたミリ秒だろうか?
    std::chrono::milliseconds ms = std::chrono::duration_cast< std::chrono::milliseconds >(
        std::chrono::system_clock::now().time_since_epoch()
        );

    long long hours = ms.count() / 1000 / 60 / 60 % 24;
    long long minutes = ms.count() / 1000 / 60 % 60;
    long long seconds = ms.count() / 1000 % 60;
    long long milliseconds = ms.count() % 1000;

    std::ostringstream oss;
    oss << std::setfill('0') << std::setw(2) << hours << ":" << std::setfill('0') << std::setw(2) << minutes << ":" << std::setfill('0') << std::setw(2) << seconds << "'" << std::setfill('0') << std::setw(3) << milliseconds << " ";
    return oss.str();
}


// 接続はシングルトンにします
static struct ev_loop* pLoop_ev[2] = { ev_loop_new(), ev_loop_new() };
static AMQP::LibEvHandler* pHandler_ev[2] = { new AMQP::LibEvHandler(pLoop_ev[ENQUEUE_INDEX]), new AMQP::LibEvHandler(pLoop_ev[DEQUEUE_INDEX]) };

static AMQP::TcpConnection* pConnection_ev[2] = { nullptr,nullptr };
static AMQP::TcpConnection* getConnection(int index)
{
    if (nullptr== pConnection_ev[index]) {
        std::cout << getTimeslipNow() << " 35a2_cpp[" << index << "] connection.open begin" << std::endl;
        pConnection_ev[index] = new AMQP::TcpConnection( pHandler_ev[index], ADDRESS );
        std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] connection.open end" << std::endl;
    }
    return pConnection_ev[index];
}
static void closeChannel(int index);
static void closeConnection(int index)
{
    if (nullptr != pConnection_ev[index]) {
        std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] connection.close begin" << std::endl;
        // チャンネルにもヌルのフラグを入れる
        closeChannel(index);

        pConnection_ev[index]->close();
        pConnection_ev[index] = nullptr;
        std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] connection.close end" << std::endl;
    }
}

// チャンネルはシングルトンにします。
static AMQP::TcpChannel* pChannel_ev[2] = { nullptr,nullptr };
static AMQP::TcpChannel* getChannel(int index)
{
    if (nullptr== pChannel_ev[index])
    {
        std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] channel.open begin" << std::endl;
        pChannel_ev[index] = new AMQP::TcpChannel(getConnection(index));
        std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] channel.open end" << std::endl;
    }
    return pChannel_ev[index];
}
static void closeChannel(int index)
{
    if (nullptr != pChannel_ev[index]) {
        std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] channel.close begin" << std::endl;
        pChannel_ev[index]->close();
        pChannel_ev[index] = nullptr;
        std::cout << getTimeslipNow() << "35a2_cpp[" << index << "] channel.close end" << std::endl;
    }
}


/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
    const int bufferSize = 1024;
    static std::string buffer[bufferSize] = {};
    static int bufferCursors[2] = { 0, 0 };
    const int PUT_INDEX = 0;
    const int GET_INDEX = 1;

    std::mutex _mutex;
    static void putMessage(std::string message)
    {
        std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.putMessage begin [" << bufferCursors[PUT_INDEX] << "] before lock [" << message << "]" << std::endl;
        std::lock_guard<std::mutex> lock(_mutex);

        buffer[bufferCursors[PUT_INDEX]] = message;
        std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.putMessage begin [" << bufferCursors[PUT_INDEX] << "] [" << message << "]" << std::endl;
        bufferCursors[PUT_INDEX]++;
        if (!(bufferCursors[PUT_INDEX] < bufferSize))
        {
            bufferCursors[PUT_INDEX] = 0;
        }
        std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.putMessage end next=[" << bufferCursors[PUT_INDEX] << "]" << std::endl;
    }
    static std::string removeMessage()
    {
        //std::cout << "35a2_cpp rotationBuffer.removeMessage begin [" << bufferCursors[GET_INDEX] << "] before lock" << std::endl;
        std::lock_guard<std::mutex> lock(_mutex);

        if ("" != buffer[bufferCursors[GET_INDEX]])
        {
            std::string message = buffer[bufferCursors[GET_INDEX]];
            std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.removeMessage begin [" << bufferCursors[GET_INDEX] << "] [" << message << "]" << std::endl;

            buffer[bufferCursors[GET_INDEX]] = "";
            bufferCursors[GET_INDEX]++;
            if (!(bufferCursors[GET_INDEX] < bufferSize))
            {
                bufferCursors[GET_INDEX] = 0;
            }

            std::cout << getTimeslipNow() << "35a2_cpp rotationBuffer.removeMessage end return=[" << message << "] next=[" << bufferCursors[GET_INDEX] << "]" << std::endl;
            return message;
        }
        return "";
    }
}

static void sendMessage()
{
    std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_ begin" << std::endl;

    std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_.declareQueue begin" << std::endl;
    getChannel(ENQUEUE_INDEX)->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan_queues[ENQUEUE_INDEX])
        .onError([](const char* errMsg) {
        std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.SM_.declareQueue.onError [" << errMsg << "] queue name=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan=[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << "\n";
    });
    std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_.declareQueue end" << std::endl;

    std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_.bindQueue begin" << std::endl;
    getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
        // ev_run から1回だけ呼び出される。接続を切るとev_runから抜ける。コミットするにはこのハンドラから抜けること。
        .onSuccess([]() {

        for (;;)
        {
            // 回転式バッファーからメッセージを切り取る。
            std::string message = rotationBuffer::removeMessage();

            if ("" != message)
            {
                // 末尾に daze を付ける。
                message += "daze";
                std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.bindQueue.onSuccess next,publish message=[" << message << "] ****important**** " << std::endl;

                getChannel(ENQUEUE_INDEX)->startTransaction();
                if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
                    getChannel(ENQUEUE_INDEX)->commitTransaction()
                        .onSuccess([&message]() {
                        // パブリッシュ成功
                        std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.bindQueue.publish.commited [" << message << "] **********important**********" << std::endl;
                    })
                        .onError([&message](const char *errMsg) {
                        std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.publish.failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
                    });
                }
                else {
                    std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.publish.failuer message=[" << message << "]" << std::endl;
                    getChannel(ENQUEUE_INDEX)->rollbackTransaction();
                }

                // 接続を切って ev_run ループから抜けよう。かつ、このハンドラーから抜けよう。パブリッシュのコミットに必要
                closeConnection(ENQUEUE_INDEX);
                break;
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(20));
        }
    }).onError([](const char* errMsg) {
        std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
    });
    std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_.bindQueue end" << std::endl;

    std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_ ev_run begin" << std::endl;
    // ここで、バインドキューのオンサクセスの処理へ。
    ev_run(pLoop_ev[ENQUEUE_INDEX], 0);
    std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_ ev_run end" << std::endl;

    std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.SM_ end" << std::endl;
}
// メッセージ・キューの送信を担当
static void workProduce()
{
    std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP begin" << std::endl;

    // おっきな無限ループ
    for (;;) {
        sendMessage();
    }
    std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP outOfInfinityLoop error" << std::endl;
}

// メッセージ・キューの監視を開始
static void workConsume()
{
    std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ begin" << std::endl;

    std::cout << getTimeslipNow() << "35a2_cpp ___(C) DQ_ begin" << std::endl;
    // I will go to the front of the box named "1111".
    LifeSpan_t lifeSpan = lifeSpan_queues[DEQUEUE_INDEX];
    getChannel(DEQUEUE_INDEX)->declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);
    std::cout << getTimeslipNow() << "35a2_cpp ___(C) DQ_ end" << std::endl;

    std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS begin" << std::endl;
    // I look inside the box.
    auto errorCb = [&lifeSpan](const char *errMsg) {
        std::cerr << getTimeslipNow() << "(>_<)35a2_cpp ___(C) CNS.onError My ID watching failed [" << errMsg << "] lifeSpan=[" << lifeSpan << "]" << std::endl;
    };
    auto messageCb = [](const AMQP::Message &amqpMessage, uint64_t deliveryTag, bool redelivered) {

        std::string message(amqpMessage.body(), amqpMessage.bodySize());
        std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received begin [" << message << "]" << std::endl;

        rotationBuffer::putMessage(message);

        std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received.ack begin [" << deliveryTag << "] [" << message << "]" << std::endl;
        getChannel(DEQUEUE_INDEX)->ack(deliveryTag);
        std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received.ack end [" << deliveryTag << "]" << std::endl;
        std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received end [" << message << "]" << std::endl;
    };
    getChannel(DEQUEUE_INDEX)->consume(name_queues[DEQUEUE_INDEX])
        .onReceived(messageCb)
        .onError(errorCb);
    std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS end" << std::endl;

    std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ ev_run begin" << std::endl;
    // I will keep on forever.
    ev_run(pLoop_ev[DEQUEUE_INDEX]);
    std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ ev_run end" << std::endl;

    std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ end" << std::endl;

    // I will not come here.
    return;
}

int main(int argc, char* argv[])
{
    // 引数の解析
    if(0<argc)
    {
        // プログラム名を省き、コマンドライン引数だけをつなげる。
        std::string cmdArg;
        for (int i = 1; i < argc; ++i)
        {
            cmdArg += std::string(argv[i]);
            if (i < argc) {
                cmdArg += " ";
            }
        }
        std::istringstream data(cmdArg);
        std::cout << getTimeslipNow() << "35a2_cpp main commandLineArguments=[" << data.str() << "]" << std::endl;

        // 与件
        // 「--enqueue 1117 durable autodelete --dequeue 1116 durable autodelete」
        // 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」

        // 受け皿
        // name_queues 結果はこれらの配列に入れる
        // lifeSpan_queues

        // 記憶
        int m0 = -1; // enqueue or dequeue index.
        int m1 = -1; // lifespans index.

        // 解析器
        std::string a;
        while (data >> a) {

            if ("--enqueue" == a) { m0 = ENQUEUE_INDEX; m1 = -1; }
            else if ("--dequeue" == a) { m0 = DEQUEUE_INDEX; m1 = -1; }
            else if (-1 == m1 && 0 == m0) { // エンキュー先キュー名
                name_queues[m0] = a;
                lifeSpan_queues[m0] = 0;
                m1++;
            }
            else if (-1 == m1 && 1 == m0) { // デキュー元キュー名
                name_queues[m0] = a;
                lifeSpan_queues[m0] = 0;
                m1++;
            }
            else { lifeSpan_queues[m0] |= LifeSpanString_To_Int(a); }
        }

        if ("" == name_queues[ENQUEUE_INDEX] || "" == name_queues[DEQUEUE_INDEX]) {
            goto gt_CommandlineParameterError;
        }

        std::cout << getTimeslipNow() << "35a2_cpp main dump begin" << std::endl;
        std::cout << getTimeslipNow() << Dump();
        std::cout << getTimeslipNow() << "35a2_cpp main dump end" << std::endl;
    }
    else
    {
        goto gt_CommandlineParameterError;
    }

    // キューの監視スレッド、メッセージの送信スレッドをスタート
    {
        auto th1 = std::thread([] { workConsume(); });
        auto th2 = std::thread([] { workProduce(); });

        for (;;) // このメイン・スレッドはずっと寝てる
        {
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
        // このプログラムは、自分では接続を切らないので、ここから下には来ない
        th2.join();
        th1.join();
    }
    return 0;

gt_CommandlineParameterError:
    std::cerr << getTimeslipNow() << "コマンドライン引数の「--enqueue 送信先キュー名 寿命」「--dequeue 受信元キュー名 寿命」を漏れなく指定してください。" << std::endl
        << "例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete" << std::endl
        << "寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」" << std::endl
        << "queueNames[0]=[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
        << "queueNames[1]=[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl;
    exit(1);
}

記事が長くなったので次へ

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1