LoginSignup
1
1

More than 5 years have passed since last update.

コンピュータ将棋ソフトとの対局サーバーを立てよう<その17>

Last updated at Posted at 2017-03-20

前回の記事 : http://qiita.com/muzudho1/items/bd9d477361def1c687d9

Linux には もともと プロセス間通信が備わっており、パイプ+SSHを使うと 楽できる、とか ツイートで飛んできたり 電車で言われたりするんだが、

それは置いておいて ミューテックスについて調べようぜ。

テスト方法(第1段階)

スレッドを2つ立てる。

  • プロデューサー
  • コンシューマー

キューが1つある。

  • プロデューサーは、キューにプッシュする
  • コンシューマーは、キューからポップする

キューは、フラグを持つ

  • ビジー・フラグ busy

このフラグは、

  • キューの空きが 1つ、または キューの空きが 1%以下になったとき 真
  • それ以外のとき 偽

とする。

データは "0" ~ "99" の文字列とし、1ずつカウントアップする

  • プロデューサーは、キューが busy でないときは データをどんどんプッシュする
  • プロデューサーは、キューが busy の時は、空ループを回りながら待機する
  • コンシューマーは何も考えず データをどんどん処理していく

また、コンシューマーはデータの整合性を検証する

  • コンシューマーは "0" ~ "99" が飛び番で送られてきたときにエラーを吐きだす

サンプル・プログラム

tamesi36a1_cpp.cpp (Windows10)

// Cpp_tamesi36a1_cpp.cpp : コンソール アプリケーションのエントリ ポイントを定義します。
//

#include "stdafx.h"

#include <string> // std::string
#include <iostream> // std::cout
#include <iomanip> // std::setfill(...)
#include <sstream> // std::ostringstream
#include <chrono>
#include <thread> // sleep_for
#include <future> // thread


// 時計はずれているが、現在時刻が返ってくる
static std::string getTimeslipNow()
{
    // どこか外国の 1969年12月31日19時ぐらいから数えたミリ秒だろうか?
    std::chrono::milliseconds ms = std::chrono::duration_cast< std::chrono::milliseconds >(
        std::chrono::system_clock::now().time_since_epoch()
        );

    long long hours = ms.count() / 1000 / 60 / 60 % 24;
    long long minutes = ms.count() / 1000 / 60 % 60;
    long long seconds = ms.count() / 1000 % 60;
    long long milliseconds = ms.count() % 1000;

    std::ostringstream oss;
    oss << std::setfill('0') << std::setw(2) << hours << ":" << std::setfill('0') << std::setw(2) << minutes << ":" << std::setfill('0') << std::setw(2) << seconds << "'" << std::setfill('0') << std::setw(3) << milliseconds << " ";
    return oss.str();
}

/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
    const int bufferSize = 1024;
    static std::string buffer[bufferSize] = {};
    static int bufferCursors[2] = { 0, 0 };
    static int busy = 0;
    const int PUT_INDEX = 0;
    const int GET_INDEX = 1;

    static std::mutex _mutex;
    // 返り値: 追加しなかったとき 偽、それ以外のとき 真。
    static bool putMessage(std::string message)
    {
        std::lock_guard<std::mutex> lock(_mutex);

        if (busy+1== bufferSize || bufferSize*99/100 < busy) {
            return false;
        }

        buffer[bufferCursors[PUT_INDEX]] = message;
        bufferCursors[PUT_INDEX]++;
        busy++;

        if (!(bufferCursors[PUT_INDEX] < bufferSize))
        {
            bufferCursors[PUT_INDEX] = 0;
        }

        return true;
    }
    static std::string removeMessage()
    {
        std::lock_guard<std::mutex> lock(_mutex);

        if ("" != buffer[bufferCursors[GET_INDEX]])
        {
            std::string message = buffer[bufferCursors[GET_INDEX]];

            buffer[bufferCursors[GET_INDEX]] = "";
            bufferCursors[GET_INDEX]++;
            busy--;

            if (!(bufferCursors[GET_INDEX] < bufferSize))
            {
                bufferCursors[GET_INDEX] = 0;
            }

            return message;
        }
        return "";
    }
}


static int value_produce = 0;

// メッセージ・キューの監視を開始
static void workProduce()
{
    std::cout << getTimeslipNow() << "36a1_cpp (P)___ begin" << std::endl;
    char buffer[100];

    // 無限ループ
    for (;;) {
        _itoa_s(value_produce, buffer, 10);
        std::string message(buffer);

        while (!rotationBuffer::putMessage(message))
        {
            std::cout << getTimeslipNow() << "(P)___ busy." << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(20));
        }

        value_produce++;
        if (99<value_produce) {
            value_produce = 0;
        }
    }
}


static int value_consume = 0;

// メッセージ・キューの監視を開始
static void workConsume()
{
    std::cout << getTimeslipNow() << "36a1_cpp ___(C) begin" << std::endl;
    char buffer[100];

    // 無限ループ
    for (;;) {
        _itoa_s(value_consume, buffer, 10);
        std::string message(buffer);

        while ("" == (message = rotationBuffer::removeMessage()))
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(20));
        }

        int value = _atoi64(message.c_str());
        if (value_consume==value) {
            std::cout << getTimeslipNow() << "___(C) [" << value << "] ok." << std::endl;

            value_consume++;
            if (99<value_consume) {
                value_consume = 0;
            }
        }
        else {
            std::cerr << "予想しない値 " << value << " 欲しかった値" << value_consume << std::endl;
            exit(1);
        }
    }
}

int main()
{
    // キューの監視スレッド、メッセージの送信スレッドをスタート
    {
        auto th1 = std::thread([] { workConsume(); });
        auto th2 = std::thread([] { workProduce(); });

        for (;;) // このメイン・スレッドはずっと寝てる
        {
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
        // このプログラムは、自分では接続を切らないので、ここから下には来ない
        th2.join();
        th1.join();
    }

    std::cout << "Please, push any key." << std::endl;
    std::string unuse;
    std::getline(std::cin, unuse);
    return 0;
}

けっこう busy を起こしながらも少なくとも10秒ぐらい落ちずに回っている。
1024 のバッファーは そんなに簡単に埋まってしまうものなのだろうか?

これを本番用のプログラムに組み込むと

-rw-r--r-- 1 root root 117602383 Mar 20 12:28 tamesi35a2_cpp.out.log

ログ出過ぎ。

03:34:15'237 35a2_cpp main commandLineArguments=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
03:34:15'237 35a2_cpp main dump begin
03:34:15'237 tamesi35a2_cs.cs Dump
    name_queues         [ENQUEUE_INDEX]    =[1113]
    lifeSpan_queues     [ENQUEUE_INDEX]    =[1]
    ----
    name_queues         [DEQUEUE_INDEX]    =[1112]
    lifeSpan_queues     [DEQUEUE_INDEX]    =[3]
    ----
03:34:15'237 35a2_cpp main dump end
03:34:15'238 35a2_cpp (P)___ WKP begin
03:34:15'238 35a2_cpp (P)___ channel.open begin
03:34:15'238 35a2_cpp (P)___ connection.open begin
03:34:15'238 35a2_cpp (P)___ connection.open end
03:34:15'238 35a2_cpp (P)___ channel.open end
03:34:15'239 35a2_cpp ___(C) ___ begin
03:34:15'239 35a2_cpp ___(C) DQ_ begin
03:34:15'239 35a2_cpp ___(C) channel.open begin
03:34:15'239 35a2_cpp ___(C) connection.open begin
03:34:15'239 35a2_cpp ___(C) connection.open end
03:34:15'239 35a2_cpp ___(C) channel.open end
03:34:15'239 35a2_cpp ___(C) DQ_ end
03:34:15'239 35a2_cpp ___(C) CNS begin
03:34:15'239 35a2_cpp ___(C) CNS end
03:34:15'239 35a2_cpp ___(C) ___ ev_run begin
03:34:15'253 35a2_cpp (P)___ WKP.BQ_.OS_ begin name_enqueue=[1113] lifeSpan_enqueue=[1] name_dequeue=[1112] lifeSpan_dequeue=[3]
03:35:16'838 35a2_cpp ___(C) CNS.received [isready]
03:35:16'850 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[isreadydaze] ****important****
03:35:16'852 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [isreadydaze] **********important**********
03:35:16'852 35a2_cpp (P)___ connection.close begin
03:35:16'852 35a2_cpp (P)___ channel.close begin
03:35:16'852 35a2_cpp (P)___ channel.close end
03:35:16'852 35a2_cpp (P)___ connection.close end
03:35:16'858 35a2_cpp (P)___ channel.open begin
03:35:16'858 35a2_cpp (P)___ connection.open begin
03:35:16'859 35a2_cpp (P)___ connection.open end
03:35:16'859 35a2_cpp (P)___ channel.open end
03:35:16'865 35a2_cpp (P)___ WKP.BQ_.OS_ begin name_enqueue=[1113] lifeSpan_enqueue=[1] name_dequeue=[1112] lifeSpan_dequeue=[3]
03:35:26'250 35a2_cpp ___(C) CNS.received [position]
03:35:26'264 35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[positiondaze] ****important****
03:35:26'266 35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [positiondaze] **********important**********
03:35:26'266 35a2_cpp (P)___ connection.close begin
03:35:26'266 35a2_cpp (P)___ channel.close begin
03:35:26'266 35a2_cpp (P)___ channel.close end
03:35:26'266 35a2_cpp (P)___ connection.close end
03:35:26'268 35a2_cpp (P)___ channel.open begin
03:35:26'268 35a2_cpp (P)___ connection.open begin
03:35:26'268 35a2_cpp (P)___ connection.open end
03:35:26'268 35a2_cpp (P)___ channel.open end
03:35:26'274 35a2_cpp (P)___ WKP.BQ_.OS_ begin name_enqueue=[1113] lifeSpan_enqueue=[1] name_dequeue=[1112] lifeSpan_dequeue=[3]

「usi」と「go」も届いてないし。

バックグラウンドで 別のプロセスが動いてたりしないのか?

ps aux

1つ1つ調べて切っていく。

root     32186  0.4  0.0 318244    36 ?        Sl   Mar15  27:20 ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete
root     27821 99.6 14.9 1482768 152104 ?      Rl   Mar17 4929:44 ./apery --enqueue 1113 durable --dequeue 1112 durable autodelete
root     14613  0.2  0.3 252848  3056 pts/0    Sl   12:34   0:00 ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete
root     20615  0.0  0.2 230188  2080 ?        S    Mar10   1:45 php loop28.php
root     22468  0.0  0.0  11472   688 ?        S    Mar07   0:29 /bin/bash ./loop8.sh
root     21556  0.0  0.0   4216    28 ?        S    Mar10   1:22 ./a.out

「【 pstree 】 プロセスの親子関係をツリー表示する」(ITPro)
http://itpro.nikkeibp.co.jp/article/COLUMN/20071204/288730/?rt=nocnt

プロセス間通信の飛びが無くなった気がする。

じゃあ、本番用に取込みで。

浮かむ瀬とプロセス間通信してみた

# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
Name         : キューを削除します
"" + [Enter] : 次のステップへ進む
ex.) BrokenBox
Delete ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) SunnyBox
Enqueue ? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 3
Enqueue ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > 1
13:21:04 547 34_cs ___(C) start begin
13:21:04 556 34_cs ___(C) consumer.create begin
13:21:04 557 34_cs ___(C) consumer.create end
13:21:04 558 34_cs ___(C) channel.open begin
13:21:04 559 34_cs ___(C) connection.open begin
13:21:04 750 34_cs ___(C) connection.open end
13:21:04 757 34_cs ___(C) channel.open end
13:21:04 761 34_cs <-------- [interrupt!] Registered(^q^)
13:21:04 762 34_cs ___(C) start end
13:21:04 762 34_cs (P)___ waitRegistered begin
13:21:04 762 34_cs (P)___ waitRegistered end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > usi
13:21:08 580 34_cs (P)___ enqueue begin [usi]
13:21:08 580 34_cs (P)___ channel.open begin
13:21:08 580 34_cs (P)___ connection.open begin
13:21:08 589 34_cs (P)___ connection.open end
13:21:08 592 34_cs (P)___ channel.open end
13:21:08 598 34_cs (P)___ enqueue end
13:21:08 598 34_cs (P)___ connection.close begin
13:21:08 599 34_cs (P)___ channel.close begin
13:21:08 607 34_cs (P)___ channel.close end
13:21:08 612 34_cs (P)___ connection.close end
Enqueue? > 13:21:08 628 34_cs <-------- [interrupt!] Dequeue(^q^) id name ukamuse_SDT4
id author Hiraoka Takuya

option name Best_Book_Move type check default false
option name Book_File type string default book/20150503/book.bin
option name Byoyomi_Margin type spin default 500 min 0 max 2147483647
option name Clear_Hash type button
option name Draw_Ply type spin default 256 min 1 max 2147483647
option name Engine_Name type string default ukamuse_SDT4
option name Max_Book_Ply type spin default 32767 min 0 max 32767
option name Max_Random_Score_Diff type spin default 0 min 0 max 32600
option name Max_Random_Score_Diff_Ply type spin default 32767 min 0 max 32767
option name Min_Book_Ply type spin default 32767 min 0 max 32767
option name Min_Book_Score type spin default -180 min -32601 max 32601
option name Minimum_Thinking_Time type spin default 20 min 0 max 2147483647
option name Move_Overhead type spin default 30 min 0 max 5000
option name MultiPV type spin default 1 min 1 max 594
option name OwnBook type check default true
option name Slow_Mover type spin default 89 min 1 max 1000
option name Slow_Mover_10 type spin default 10 min 1 max 1000
option name Slow_Mover_16 type spin default 20 min 1 max 1000
option name Slow_Mover_20 type spin default 40 min 1 max 1000
option name Threads type spin default 2 min 1 max 256
option name Time_Margin type spin default 4500 min 0 max 2147483647
option name USI_Hash type spin default 256 min 1 max 1048576
option name USI_Ponder type check default true
usiok

isready
13:21:14 169 34_cs (P)___ enqueue begin [isready]
13:21:14 169 34_cs (P)___ channel.open begin
13:21:14 170 34_cs (P)___ connection.open begin
13:21:14 177 34_cs (P)___ connection.open end
13:21:14 179 34_cs (P)___ channel.open end
13:21:14 179 34_cs (P)___ enqueue end
13:21:14 179 34_cs (P)___ connection.close begin
13:21:14 179 34_cs (P)___ channel.close begin
13:21:14 181 34_cs (P)___ channel.close end
13:21:14 182 34_cs (P)___ connection.close end
Enqueue? > 13:21:18 293 34_cs <-------- [interrupt!] Dequeue(^q^) id name ukamuse_SDT4
id author Hiraoka Takuya

option name Best_Book_Move type check default false
option name Book_File type string default book/20150503/book.bin
option name Byoyomi_Margin type spin default 500 min 0 max 2147483647
option name Clear_Hash type button
option name Draw_Ply type spin default 256 min 1 max 2147483647
option name Engine_Name type string default ukamuse_SDT4
option name Max_Book_Ply type spin default 32767 min 0 max 32767
option name Max_Random_Score_Diff type spin default 0 min 0 max 32600
option name Max_Random_Score_Diff_Ply type spin default 32767 min 0 max 32767
option name Min_Book_Ply type spin default 32767 min 0 max 32767
option name Min_Book_Score type spin default -180 min -32601 max 32601
option name Minimum_Thinking_Time type spin default 20 min 0 max 2147483647
option name Move_Overhead type spin default 30 min 0 max 5000
option name MultiPV type spin default 1 min 1 max 594
option name OwnBook type check default true
option name Slow_Mover type spin default 89 min 1 max 1000
option name Slow_Mover_10 type spin default 10 min 1 max 1000
option name Slow_Mover_16 type spin default 20 min 1 max 1000
option name Slow_Mover_20 type spin default 40 min 1 max 1000
option name Threads type spin default 2 min 1 max 256
option name Time_Margin type spin default 4500 min 0 max 2147483647
option name USI_Hash type spin default 256 min 1 max 1048576
option name USI_Ponder type check default true
usiok
readyok

usinewgame
13:21:38 307 34_cs (P)___ enqueue begin [usinewgame]
13:21:38 308 34_cs (P)___ channel.open begin
13:21:38 308 34_cs (P)___ connection.open begin
13:21:38 354 34_cs (P)___ connection.open end
13:21:38 356 34_cs (P)___ channel.open end
13:21:38 357 34_cs (P)___ enqueue end
13:21:38 357 34_cs (P)___ connection.close begin
13:21:38 357 34_cs (P)___ channel.close begin
13:21:38 361 34_cs (P)___ channel.close end
13:21:38 363 34_cs (P)___ connection.close end
Enqueue? > position sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves
13:21:52 825 34_cs (P)___ enqueue begin [position sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves]
13:21:52 826 34_cs (P)___ channel.open begin
13:21:52 826 34_cs (P)___ connection.open begin
13:21:52 835 34_cs (P)___ connection.open end
13:21:52 837 34_cs (P)___ channel.open end
13:21:52 838 34_cs (P)___ enqueue end
13:21:52 838 34_cs (P)___ connection.close begin
13:21:52 838 34_cs (P)___ channel.close begin
13:21:52 840 34_cs (P)___ channel.close end
13:21:52 840 34_cs (P)___ connection.close end
Enqueue? > go
13:21:58 577 34_cs (P)___ enqueue begin [go]
13:21:58 578 34_cs (P)___ channel.open begin
13:21:58 578 34_cs (P)___ connection.open begin
13:21:58 591 34_cs (P)___ connection.open end
13:21:58 596 34_cs (P)___ channel.open end
13:21:58 596 34_cs (P)___ enqueue end
13:21:58 596 34_cs (P)___ connection.close begin
13:21:58 596 34_cs (P)___ channel.close begin
13:21:58 598 34_cs (P)___ channel.close end
13:21:58 599 34_cs (P)___ connection.close end
Enqueue? > 13:21:58 641 34_cs <-------- [interrupt!] Dequeue(^q^) info string optimum_time = 20

13:21:58 650 34_cs <-------- [interrupt!] Dequeue(^q^) info string optimum_time = 20
info string maximum_time = 20

13:21:58 663 34_cs <-------- [interrupt!] Dequeue(^q^) info string book_ply 32767

13:21:58 708 34_cs <-------- [interrupt!] Dequeue(^q^) info string book_ply 32767
info depth 1 seldepth 1 multipv 1 score cp 63 nodes 66 nps 767 time 86 pv 2g2f

13:21:58 721 34_cs <-------- [interrupt!] Dequeue(^q^) info string book_ply 32767
info depth 1 seldepth 1 multipv 1 score cp 63 nodes 66 nps 767 time 86 pv 2g2f
bestmove 2g2f


微妙に想定外の動きをしてるよな。

isready のあとで、なんで usi のときのメッセージが再び送られてくるのか?

一番単純に考えると、

isready のあとで、usi のときのメッセージを再び送っているケースだ。
調べよう。

練習用のソースと、本番用のソースが違うじゃないか。

テストしたコード。

static void sendMessage()
{
    getChannel(ENQUEUE_INDEX)->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan_queues[ENQUEUE_INDEX])
        .onError([](const char* errMsg) {
            std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.SM_.declareQueue.onError [" << errMsg << "] queue name=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan=[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << "\n";
        });

    getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
        // ev_run から1回だけ呼び出される。接続を切るとev_runから抜ける。コミットするにはこのハンドラから抜けること。
        .onSuccess([]() {
            std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ begin name_enqueue=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan_enqueue=[" << lifeSpan_queues[ENQUEUE_INDEX] << "] name_dequeue=[" << name_queues[DEQUEUE_INDEX] << "] lifeSpan_dequeue=[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl;

            bool infinityLoop = true;
            bool enable = true;
            while (infinityLoop)
            {
                if (enable) {
                    // 回転式バッファーからメッセージを切り取る。
                    std::string message = rotationBuffer::pop();

                    if ("" != message)
                    {
                        // 末尾に daze を付ける。
                        message += "daze";
                        std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ next,publish message=[" << message << "] ****important**** " << std::endl;

                        // トランザクション開始
                        getChannel(ENQUEUE_INDEX)->startTransaction();
                        // 以後、このループは空回りさせる
                        enable = false;
                        // パブリッシュ開始
                        if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {

                            // コミットを開始
                            getChannel(ENQUEUE_INDEX)->commitTransaction()
                                .onSuccess([=]() {
                                    // コミット成功
                                    std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [" << message << "] **********important**********" << std::endl;

                                    // 接続を切ったあと、トランザクションのコミットが実行されるようだ。
                                    closeConnection(ENQUEUE_INDEX);
                                })
                                .onError([&message](const char *errMsg) {
                                    std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
                                });
                        }
                        else {
                            std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
                            getChannel(ENQUEUE_INDEX)->rollbackTransaction();
                        }

                        // 無限ループから出て、このハンドラから抜けよう。そのあと ev_run が処理の続き(コミットの続き等)を行う
                        infinityLoop = false;
                    }
                    else 
                    {
                        std::this_thread::sleep_for(std::chrono::milliseconds(20));
                    }
                }
                else
                {
                    std::this_thread::sleep_for(std::chrono::milliseconds(20));
                }
            }

        }).onError([](const char* errMsg) {
            std::cout << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
            std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
        });

    // ここで、バインドキューのオンサクセスの処理へ。
    ev_run(pLoop_ev[ENQUEUE_INDEX], 0);
}

動いてないコード

    // メッセージキューにエンキュー
    // 出力ストリームのカンマ区切り「,,,;」で使われ、何か数字を返す必要があって必然性はない数字 0 を返している
    static int sendMessage(std::string message)
    {
        getChannel(ENQUEUE_INDEX)->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan_queues[ENQUEUE_INDEX])
            .onError([](const char* errMsg) {
            });

        getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
            // ev_run から1回だけ呼び出される。接続を切るとev_runから抜ける。コミットするにはこのハンドラから抜けること。
            .onSuccess([=]() {

                if ("" != message)
                {
                    // トランザクション開始
                    getChannel(ENQUEUE_INDEX)->startTransaction();
                    // パブリッシュ開始
                    if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {

                        // コミットを開始
                        getChannel(ENQUEUE_INDEX)->commitTransaction()
                            .onSuccess([=]() {
                            // コミット成功

                            // 接続を切ったあと、トランザクションのコミットが実行されるようだ。
                            closeConnection(ENQUEUE_INDEX);
                        })
                        .onError([&message](const char *errMsg) {
                            std::cerr << getTimeslipNow() << "(>_<)cpp (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
                        });
                    }
                    else {
                        std::cerr << getTimeslipNow() << "(>_<)cpp (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
                        getChannel(ENQUEUE_INDEX)->rollbackTransaction();
                    }

                    // このハンドラから抜けよう。そのあと ev_run が処理の続き(コミットの続き等)を行う
                }
        }).onError([](const char* errMsg) {
            std::cerr << getTimeslipNow() << "(>_<)cpp (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
        });

        // ここで、バインドキューのオンサクセスの処理へ。
        ev_run(pLoop_ev[ENQUEUE_INDEX], 0);

        return 0;
    }

sendMessage( ) のタイミングでメッセージを送信するという発想が そもそも無理なのか。

一旦 バッファーに投げておいて、busy じゃないタイミングで送信するようにするか。

ev_run で待機状態に入る、という仕様を活かすと そうなる。
プログラム中の好きなところから sendMessage( ) でメッセージを送るのではなく、ev_run の中からメッセージを送る。

もっと簡単に書けるんじゃないか?

tamesi35a2.cpp

//--------------------------------------------------------------------------------
// OS      : Windows10 : It not work. This program for Ubuntu 16.04.
//
// OS      : Ubuntu 16.04
// Library : libev
//         : Install   : Command  : sudo apt-get update
//                                : sudo apt-get install libev-dev
// Service : RabbitMQ
//         : Install   : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
//         : Reference : Web site : Top page http://www.rabbitmq.com/
//                     : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
//                     : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
//         : Manual    : Command  : man rabbitmqctl
//         : Start     : Command  : (1) rabbitmq-server
//                                : (2) [Ctrl] + [Z]
//                                : (3) bg (Job Number)
//         : Stop      : Command  : rabbitmqctl stop
//         : Check     : Command  : rabbitmqctl status
//         :           : Command  : rabbitmqctl list_queues
//                                : // Count messages in queues.
// Library : AMQP-CPP
//         : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
//         : Reference : Web site : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
//         : Reference : Web site : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
//         : Reference : Web site : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
//         : Reference : Web site : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
//         : Reference : Web site : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//
// Program : this
//         : Author                         : Satoshi TAKAHASHI (Handle. Muzudho)
//         : License                        : MIT License https://opensource.org/licenses/MIT
//         : Explain                        : For test. Practice of interprocess communication.
//                                          : This program run on the background side.
//                                          : Append daze to received message on interprocess communication.
//                                          : Daze meaning is dialect in the suburbs of Tokyo.
//                                          : Please settings source queue, destination queue to command line argument.
//                                          :   And standard input to /dev/null
//         : Compile   : Command            : g++ -std=c++11 tamesi35a2_cpp.cpp -o tamesi35a2_cpp.exe -lev -lamqpcpp -pthread
//                                : Explain :     g++                    C++ Compiler
//                                          :     -std=c++11             C++ 11
//                                          :     tamesi35a2_cpp.cpp     Source file
//                                          :     -o tamesi35a2_cpp.exe  Output executable file
//                                          :     -lev                   Used library libev
//                                          :     -lamqpcpp              Used library AMQP-CPP
//                                          :     -pthread               Used library thread
//         : Execute   : Command            : ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
//                                          : Run on the background.
//                                : FAQ     : Failures case: Message conflict when starting multiple processes.
//                                : Explain :     ./tamesi35a2_cpp.exe         Executable file
//                                          :     --enqueue                    Destination queue settings section
//                                          :     1113                         Destination queue name             (string ok)
//                                          :     durable                      Destination queue life span        (multiple)
//                                          :     --dequeue                    Source queue settings section
//                                          :     1112                         Source queue name                  (string ok)
//                                          :     durable                      Source queue life span
//                                          :     autodelete                   Source queue life span
//                                          :     > ./tamesi35a2_cpp.out.log   Standard output to   ./tamesi35a2_cpp.out.log
//                                          :     2> ./tamesi35a2_cpp.err.log  Standard error  to   ./tamesi35a2_cpp.err.log
//                                          :     < /dev/null                  Standard input  from nothing
//                                          :     &                            Process goto the background
//                                : Explain : command line argument of life span of queue (Compositable)
//                                          :     durable     The queue exists, Even if the RabbitMQ service closed.
//                                          :     autodelete  The queue delete when consumers to be nothing.
//                                          :     passive     Just check what queue exists. Don't use read/write to queue.
//                                          :     exclusive   The queue delete when connection closed. The queue exists is this connection only.
//                                : Example : Windows 10   (This program not work) default queue life span.  durable
//                                          : Ubuntu 16.04                         default queue life span.  durable autodelete
//         : Stop      : Typing   : [Ctrl]+[C]
//         : Referrence: ラムダ式(C++11) (cpprefjp - C++日本語リファレンス) https ://cpprefjp.github.io/lang/cpp11/lambda_expressions.html
//--------------------------------------------------------------------------------

#include <string> // std::string
#include <iostream> // std::cout
#include <iomanip> // std::setfill(...)
#include <sstream> // std::ostringstream
#include <chrono>
#include <thread> // sleep_for
#include <future> // thread

// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

// 時計はずれているが、現在時刻が返ってくる
static std::string getTimeslipNow()
{
    // どこか外国の 1969年12月31日19時ぐらいから数えたミリ秒だろうか?
    std::chrono::milliseconds ms = std::chrono::duration_cast< std::chrono::milliseconds >(
        std::chrono::system_clock::now().time_since_epoch()
        );

    long long hours = ms.count() / 1000 / 60 / 60 % 24;
    long long minutes = ms.count() / 1000 / 60 % 60;
    long long seconds = ms.count() / 1000 % 60;
    long long milliseconds = ms.count() % 1000;

    std::ostringstream oss;
    oss << std::setfill('0') << std::setw(2) << hours << ":" << std::setfill('0') << std::setw(2) << minutes << ":" << std::setfill('0') << std::setw(2) << seconds << "'" << std::setfill('0') << std::setw(3) << milliseconds << " ";
    return oss.str();
}

// AMQP-CPPでの実装 :
// AMQP::durable=[1] RabbitMQが止まってもキューを残す
// AMQP::autodelete=[2] コンシューマーが1人も接続していなかったら消す
// AMQP::passive=[8] キューが存在するかどうかチェックするだけ。中身見ない時これ
// AMQP::exclusive=[512] この接続でだけ使える。この接続が切れたら消す
typedef int LifeSpan_t;

static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
// 0 : enqueue用設定
// 1 : dequeue用設定
static const int ENQUEUE_INDEX = 0;
static const int DEQUEUE_INDEX = 1;
static const int NUM_INDEX = 2;
static std::string name_queues[] = { "", "" };
static LifeSpan_t lifeSpan_queues[] = { (LifeSpan_t)0, (LifeSpan_t)0 };
static LifeSpan_t LifeSpanString_To_Int(std::string lifeSpan)
{
    if ("durable"==lifeSpan) {
        return AMQP::durable;
    } else if ("autodelete" == lifeSpan) {
        return AMQP::autodelete;
    } else if ("passive" == lifeSpan) {
        return AMQP::passive;
    } else if ("exclusive" == lifeSpan) {
        return AMQP::exclusive;
    } else {
        std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
        exit(11);
    }
}
static std::string exchange_name = "myexchange";
static std::string routing_key = "";

static std::string Dump()
{
    static std::ostringstream sb;
    sb << "tamesi35a2_cs.cs Dump" << std::endl
        << "    name_queues         [ENQUEUE_INDEX]    =[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
        << "    lifeSpan_queues     [ENQUEUE_INDEX]    =[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl
        << "    ----" << std::endl
        << "    name_queues         [DEQUEUE_INDEX]    =[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl
        << "    lifeSpan_queues     [DEQUEUE_INDEX]    =[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl
        << "    ----" << std::endl;
    return sb.str();
}


// 接続はシングルトンにします
static struct ev_loop* pLoop_ev[2] = { ev_loop_new(), ev_loop_new() };
static AMQP::LibEvHandler* pHandler_ev[2] = { new AMQP::LibEvHandler(pLoop_ev[ENQUEUE_INDEX]), new AMQP::LibEvHandler(pLoop_ev[DEQUEUE_INDEX]) };

static AMQP::TcpConnection* pConnection_ev[2] = { nullptr,nullptr };
static AMQP::TcpConnection* getConnection(int index)
{
    if (nullptr== pConnection_ev[index]) {
        std::cout << getTimeslipNow() << "35a2_cpp " << (index== ENQUEUE_INDEX?"(P)___":"___(C)") << " connection.open begin" << std::endl;
        pConnection_ev[index] = new AMQP::TcpConnection( pHandler_ev[index], ADDRESS );
        std::cout << getTimeslipNow() << "35a2_cpp " << (index == ENQUEUE_INDEX ? "(P)___" : "___(C)") << " connection.open end" << std::endl;
    }
    return pConnection_ev[index];
}
static void closeChannel(int index);
static void closeConnection(int index)
{
    if (nullptr != pConnection_ev[index]) {
        std::cout << getTimeslipNow() << "35a2_cpp " << (index == ENQUEUE_INDEX ? "(P)___" : "___(C)") << " connection.close begin" << std::endl;
        // チャンネルにもヌルのフラグを入れる
        closeChannel(index);

        pConnection_ev[index]->close();
        pConnection_ev[index] = nullptr;
        std::cout << getTimeslipNow() << "35a2_cpp " << (index == ENQUEUE_INDEX ? "(P)___" : "___(C)") << " connection.close end" << std::endl;
    }
}

// チャンネルはシングルトンにします。
static AMQP::TcpChannel* pChannel_ev[2] = { nullptr,nullptr };
static AMQP::TcpChannel* getChannel(int index)
{
    if (nullptr== pChannel_ev[index])
    {
        std::cout << getTimeslipNow() << "35a2_cpp " << (index == ENQUEUE_INDEX ? "(P)___" : "___(C)") << " channel.open begin" << std::endl;
        pChannel_ev[index] = new AMQP::TcpChannel(getConnection(index));
        std::cout << getTimeslipNow() << "35a2_cpp " << (index == ENQUEUE_INDEX ? "(P)___" : "___(C)") << " channel.open end" << std::endl;
    }
    return pChannel_ev[index];
}
static void closeChannel(int index)
{
    if (nullptr != pChannel_ev[index]) {
        std::cout << getTimeslipNow() << "35a2_cpp " << (index == ENQUEUE_INDEX ? "(P)___" : "___(C)") << " channel.close begin" << std::endl;
        pChannel_ev[index]->close();
        pChannel_ev[index] = nullptr;
        std::cout << getTimeslipNow() << "35a2_cpp " << (index == ENQUEUE_INDEX ? "(P)___" : "___(C)") << " channel.close end" << std::endl;
    }
}


/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
    const int bufferSize = 1024;
    static std::string buffer[bufferSize] = {};
    static int bufferCursors[2] = { 0, 0 };
    static int busy = 0;
    const int PUT_INDEX = 0;
    const int GET_INDEX = 1;

    static std::mutex _mutex;
    // 返り値: 追加しなかったとき 偽、それ以外のとき 真。
    static bool push_inner(std::string message)
    {
        std::lock_guard<std::mutex> lock(_mutex);

        if (busy + 1 == bufferSize || bufferSize * 99 / 100 < busy) {
            return false;
        }

        buffer[bufferCursors[PUT_INDEX]] = message;
        bufferCursors[PUT_INDEX]++;
        busy++;

        if (!(bufferCursors[PUT_INDEX] < bufferSize))
        {
            bufferCursors[PUT_INDEX] = 0;
        }

        return true;
    }
    static std::string pop_inner()
    {
        std::lock_guard<std::mutex> lock(_mutex);

        if ("" != buffer[bufferCursors[GET_INDEX]])
        {
            std::string message = buffer[bufferCursors[GET_INDEX]];

            buffer[bufferCursors[GET_INDEX]] = "";
            bufferCursors[GET_INDEX]++;
            busy--;

            if (!(bufferCursors[GET_INDEX] < bufferSize))
            {
                bufferCursors[GET_INDEX] = 0;
            }

            return message;
        }
        return "";
    }

    // 追加に成功するまで、ブロックする
    static void push_block(std::string message)
    {
        while (!rotationBuffer::push_inner(message))
        {
            // busy
            std::this_thread::sleep_for(std::chrono::milliseconds(40));
        }
    }
    // 何か取れるまで、ブロックする
    static std::string pop_block()
    {
        std::string message;
        while ("" == (message = rotationBuffer::pop_inner()))
        {
            // buffer is empty
            std::this_thread::sleep_for(std::chrono::milliseconds(20));
        }
        return message;
    }
}

static void sendMessage(std::string message)
{
    // チャンネルを宣言
    getChannel(ENQUEUE_INDEX)->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan_queues[ENQUEUE_INDEX])
        .onError([](const char* errMsg) {
        std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.SM_.declareQueue.onError [" << errMsg << "] queue name=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan=[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << "\n";
    });

    // チャンネルを束縛
    getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
        // ev_run から1回だけ呼び出される。コミットを開始してこのハンドラから抜けること。コミットの成功時に接続を切ることでev_runから抜ける。
        .onSuccess([&message]() {
            std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ message=[" << message << "] ****important**** name_enqueue=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan_enqueue=[" << lifeSpan_queues[ENQUEUE_INDEX] << "] name_dequeue=[" << name_queues[DEQUEUE_INDEX] << "] lifeSpan_dequeue=[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl;

            // トランザクション開始
            getChannel(ENQUEUE_INDEX)->startTransaction();
            // パブリッシュ開始
            if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {

                // コミットを開始
                getChannel(ENQUEUE_INDEX)->commitTransaction()
                    .onSuccess([=]() {
                        // コミット成功
                        std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [" << message << "] **********important**********" << std::endl;

                        // 接続を切ったあと、トランザクションのコミットが実行されるようだ。
                        closeConnection(ENQUEUE_INDEX);
                    })
                    .onError([&message](const char *errMsg) {
                        std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
                    });
            }
            else {
                std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
                getChannel(ENQUEUE_INDEX)->rollbackTransaction();
            }

        }).onError([](const char* errMsg) {
            std::cout << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
            std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
        });

    // ここで、バインドキューのオンサクセスの処理へ。
    ev_run(pLoop_ev[ENQUEUE_INDEX], 0);
}

// メッセージ・キューの送信を担当
static void workProduce()
{
    std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP begin" << std::endl;

    // おっきな無限ループ
    for (;;) {

        // 回転式バッファーからメッセージを切り取る。
        std::string message = rotationBuffer::pop_block();

        // 末尾に daze を付ける。
        message += "daze";

        // さあ、送信だ。
        sendMessage(message);
    }
    std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP outOfInfinityLoop error" << std::endl;
}

// メッセージ・キューの監視を開始
static void workConsume()
{
    std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ begin" << std::endl;

    std::cout << getTimeslipNow() << "35a2_cpp ___(C) DQ_ begin" << std::endl;
    // I will go to the front of the box named "1111".
    LifeSpan_t lifeSpan = lifeSpan_queues[DEQUEUE_INDEX];
    getChannel(DEQUEUE_INDEX)->declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);
    std::cout << getTimeslipNow() << "35a2_cpp ___(C) DQ_ end" << std::endl;

    std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS begin" << std::endl;
    // I look inside the box.
    auto errorCb = [&lifeSpan](const char *errMsg) {
        std::cerr << getTimeslipNow() << "(>_<)35a2_cpp ___(C) CNS.onError My ID watching failed [" << errMsg << "] lifeSpan=[" << lifeSpan << "]" << std::endl;
    };
    auto messageCb = [](const AMQP::Message &amqpMessage, uint64_t deliveryTag, bool redelivered) {

        std::string message(amqpMessage.body(), amqpMessage.bodySize());
        std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received [" << message << "]" << std::endl;

        rotationBuffer::push_block(message);

        getChannel(DEQUEUE_INDEX)->ack(deliveryTag);
    };
    getChannel(DEQUEUE_INDEX)->consume(name_queues[DEQUEUE_INDEX])
        .onReceived(messageCb)
        .onError(errorCb);
    std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS end" << std::endl;

    std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ ev_run begin" << std::endl;
    // I will keep on forever.
    ev_run(pLoop_ev[DEQUEUE_INDEX]);
    std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ ev_run end" << std::endl;

    std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ end" << std::endl;

    // I will not come here.
    return;
}

bool parseArgs(int argc, char* argv[], std::string& name_enqueue, LifeSpan_t& lifeSpan_enqueue, std::string& name_dequeue, LifeSpan_t& lifeSpan_dequeue)
{
    if (0<argc)
    {
        // プログラム名を省き、コマンドライン引数だけをつなげる。
        std::string cmdArg;
        for (int i = 1; i < argc; ++i)
        {
            cmdArg += std::string(argv[i]);
            if (i < argc) {
                cmdArg += " ";
            }
        }
        std::istringstream data(cmdArg);
        std::cout << getTimeslipNow() << "35a2_cpp main commandLineArguments=[" << data.str() << "]" << std::endl;

        // 与件
        // 「--enqueue 1117 durable autodelete --dequeue 1116 durable autodelete」
        // 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」

        // 受け皿
        // name_queues 結果はこれらの配列に入れる
        // lifeSpan_queues

        // 記憶
        int m0 = -1; // enqueue or dequeue index.
        int m1 = -1; // lifespans index.

                     // 解析器
        std::string a;
        while (data >> a) {

            if ("--" == a.substr(0, 2)) {//先頭の2文字が「--」の場合
                if ("--enqueue" == a) { m0 = ENQUEUE_INDEX; m1 = -1; }
                else if ("--dequeue" == a) { m0 = DEQUEUE_INDEX; m1 = -1; }
                else { break; }
            }
            else if (
                ">" == a.substr(0, 1)//先頭の1文字が「>」の場合
                || "<" == a.substr(0, 1)//「<」
                || ">" == a.substr(1, 1)//先頭から2文字目が「>」の場合
                ) {
                // 標準入力、標準出力、標準エラーを無視
                break;
            } else if (0 == m0) { // エンキュー先キュー名
                if (-1 == m1) {
                    name_enqueue = a;
                    lifeSpan_enqueue = 0;
                    m1++;
                }
                else {
                    lifeSpan_enqueue |= LifeSpanString_To_Int(a);
                }
            } else if ( 1 == m0) { // デキュー元キュー名
                if (-1 == m1) {
                    name_dequeue = a;
                    lifeSpan_dequeue = 0;
                    m1++;
                }
                else {
                    lifeSpan_dequeue |= LifeSpanString_To_Int(a);
                }
            }
            else
            {
                // 欲しい内容がくるまでスルー
            }
        }

        if ("" == name_enqueue || "" == name_dequeue) {
            goto gt_CommandlineParameterError;
        }

        std::cout << getTimeslipNow() << "35a2_cpp main dump begin" << std::endl;
        std::cout << getTimeslipNow() << Dump();
        std::cout << getTimeslipNow() << "35a2_cpp main dump end" << std::endl;
    }
    else
    {
        goto gt_CommandlineParameterError;
    }
    return true;

gt_CommandlineParameterError:
    std::cerr << getTimeslipNow() << "コマンドライン引数の「--enqueue 送信先キュー名 寿命」「--dequeue 受信元キュー名 寿命」を漏れなく指定してください。" << std::endl
        << "例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete" << std::endl
        << "寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」" << std::endl
        << "name_queues[ENQUEUE_INDEX]=[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
        << "name_queues[DEQUEUE_INDEX]=[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl;
    return false;
}

int main(int argc, char* argv[])
{
    // 引数の解析
    if (!parseArgs(argc, argv, name_queues[ENQUEUE_INDEX], lifeSpan_queues[ENQUEUE_INDEX], name_queues[DEQUEUE_INDEX], lifeSpan_queues[DEQUEUE_INDEX]))
    {
        exit(12);
    }

    // キューの監視スレッド、メッセージの送信スレッドをスタート
    {
        auto th1 = std::thread([] { workConsume(); });
        auto th2 = std::thread([] { workProduce(); });

        for (;;) // このメイン・スレッドはずっと寝てる
        {
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
        // このプログラムは、自分では接続を切らないので、ここから下には来ない
        th2.join();
        th1.join();
    }
    return 0;
}

これで いけている。

浮かむ瀬に組み込むと

option name MultiPV type spin default 1 min 1 max 594
option name OwnBook type check default true
option name Slow_Mover type spin default 89 min 1 max 1000
option name Slow_Mover_10 type spin default 10 min 1 max 1000
option name Slow_Mover_16 type spin default 20 min 1 max 1000
option name Slow_Mover_20 type spin default 40 min 1 max 1000
option name Threads type spin default 2 min 1 max 256
option name Time_Margin type spin default 4500 min 0 max 2147483647
option name USI_Hash type spin default 256 min 1 max 1048576
option name USI_Ponder type check default true
usiok
id name ukamuse_SDT4
id author Hiraoka Takuya

option name Best_Book_Move type check default false
option name Book_File type string default book/20150503/book.bin
option name Byoyomi_Margin type spin default 500 min 0 max 2147483647
option name Clear_Hash type button
option name Draw_Ply type spin default 256 min 1 max 2147483647
option name Engine_Name type string default ukamuse_SDT4
option name Max_Book_Ply type spin default 32767 min 0 max 32767
option name Max_Random_Score_Diff type spin default 0 min 0 max 32600
option name Max_Random_Score_Diff_Ply type spin default 32767 min 0 max 32767
option name Min_Book_Ply type spin default 32767 min 0 max 32767
option name Min_Book_Score type spin default -180 min -32601 max 32601
option name Minimum_Thinking_Time type spin default 20 min 0 max 2147483647
option name Move_Overhead type spin default 30 min 0 max 5000
option name MultiPV type spin default 1 min 1 max 594
option name OwnBook type check default true
option name Slow_Mover type spin default 89 min 1 max 1000
option name Slow_Mover_10 type spin default 10 min 1 max 1000
option name Slow_Mover_16 type spin default 20 min 1 max 1000
option name Slow_Mover_20 type spin default 40 min 1 max 1000
option name Threads type spin default 2 min 1 max 256
option name Time_Margin type spin default 4500 min 0 max 2147483647
option name USI_Hash type spin default 256 min 1 max 1048576
option name USI_Ponder type check default true
usiok
id name ukamuse_SDT4
id author Hiraoka Takuya

option name Best_Book_Move type check default false
option name Book_File type string default book/20150503/book.bin
option name Byoyomi_Margin type spin default 500 min 0 max 2147483647
option name Clear_Hash type button
option name Draw_Ply type spin default 256 min 1 max 2147483647
option name Engine_Name type string default ukamuse_SDT4
option name Max_Book_Ply type spin default 32767 min 0 max 32767
option name Max_Random_Score_Diff type spin default 0 min 0 max 32600
option name Max_Random_Score_Diff_Ply type spin default 32767 min 0 max 32767
option name Min_Book_Ply type spin default 32767 min 0 max 32767
option name Min_Book_Score type spin default -180 min -32601 max 32601
option name Minimum_Thinking_Time type spin default 20 min 0 max 2147483647
option name Move_Overhead type spin default 30 min 0 max 5000
option name MultiPV type spin default 1 min 1 max 594
option name OwnBook type check default true
option name Slow_Mover type spin default 89 min 1 max 1000
option name Slow_Mover_10 type spin default 10 min 1 max 1000
option name Slow_Mover_16 type spin default 20 min 1 max 1000
option name Slow_Mover_20 type spin default 40 min 1 max 1000

無限ループしてるな。

# rabbitmqctl list_queues
Error: rabbit application is not running on node rabbit@tk2-217-18401.
 * Suggestion: start it with "rabbitmqctl start_app" and try again

あれっ? RabbitMQ 止まってんの?

# rabbitmqctl start_app
Starting node 'rabbit@tk2-217-18401' ...
^Z
[3]+  Stopped                 rabbitmqctl start_app
root@tk2-217-18401:/home/★user/shogi/ukamuse_sdt4_child4/src# bg 3
[3]+ rabbitmqctl start_app &
Unhandled Exception:
None of the specified endpoints were reachable
Endpoints attempted:
------------------------------------------------
endpoint=amqp-0-8://localhost:5672, attempts=1
System.Net.Sockets.SocketException: Connection refused
  at System.Net.Sockets.TcpClient.Connect (System.Net.IPAddress[] ipAddresses, Int32 port) <0x4025c990 + 0x0030b> in <filename unknown>:0
  at System.Net.Sockets.TcpClient.Connect (System.String hostname, Int32 port) <0x4025c130 + 0x0002f> in <filename unknown>:0
  at RabbitMQ.Client.Impl.SocketFrameHandler_0_9..ctor (RabbitMQ.Client.AmqpTcpEndpoint endpoint) <0x40256730 + 0x000e7> in <filename unknown>:0
  at RabbitMQ.Client.Framing.Impl.v0_8.ProtocolBase.CreateFrameHandler (RabbitMQ.Client.AmqpTcpEndpoint endpoint) <0x402566e0 + 0x00033> in <filename unknown>:0
  at RabbitMQ.Client.ConnectionFactory.FollowRedirectChain (Int32 maxRedirects, IDictionary connectionAttempts, IDictionary connectionErrors, RabbitMQ.Client.AmqpTcpEndpoint[]& mostRecentKnownHosts, RabbitMQ.Client.AmqpTcpEndpoint endpoint) <0x402$
================================================
Stack trace:
  at RabbitMQ.Client.ConnectionFactory.CreateConnection (Int32 maxRedirects) <0x40255d60 + 0x0012b> in <filename unknown>:0
  at RabbitMQ.Client.ConnectionFactory.CreateConnection () <0x40255d40 + 0x00012> in <filename unknown>:0
  at UsagiMQ.Program.GetConnection (Int32 index) <0x4022e600 + 0x000a4> in <filename unknown>:0
  at UsagiMQ.Program.GetChannel (Int32 index) <0x4022d970 + 0x0009f> in <filename unknown>:0
  at UsagiMQ.Program.DeleteQueue () <0x4022d8e0 + 0x0000f> in <filename unknown>:0
  at UsagiMQ.Program.Main (System.String[] args) <0x402290d0 + 0x00333> in <filename unknown>:0
[ERROR] FATAL UNHANDLED EXCEPTION: None of the specified endpoints were reachable
Endpoints attempted:
------------------------------------------------
endpoint=amqp-0-8://localhost:5672, attempts=1
System.Net.Sockets.SocketException: Connection refused
  at System.Net.Sockets.TcpClient.Connect (System.Net.IPAddress[] ipAddresses, Int32 port) <0x4025c990 + 0x0030b> in <filename unknown>:0
  at System.Net.Sockets.TcpClient.Connect (System.String hostname, Int32 port) <0x4025c130 + 0x0002f> in <filename unknown>:0
  at RabbitMQ.Client.Impl.SocketFrameHandler_0_9..ctor (RabbitMQ.Client.AmqpTcpEndpoint endpoint) <0x40256730 + 0x000e7> in <filename unknown>:0
  at RabbitMQ.Client.Framing.Impl.v0_8.ProtocolBase.CreateFrameHandler (RabbitMQ.Client.AmqpTcpEndpoint endpoint) <0x402566e0 + 0x00033> in <filename unknown>:0
  at RabbitMQ.Client.ConnectionFactory.FollowRedirectChain (Int32 maxRedirects, IDictionary connectionAttempts, IDictionary connectionErrors, RabbitMQ.Client.AmqpTcpEndpoint[]& mostRecentKnownHosts, RabbitMQ.Client.AmqpTcpEndpoint endpoint) <0x402$
================================================
Stack trace:
  at RabbitMQ.Client.ConnectionFactory.CreateConnection (Int32 maxRedirects) <0x40255d60 + 0x0012b> in <filename unknown>:0
  at RabbitMQ.Client.ConnectionFactory.CreateConnection () <0x40255d40 + 0x00012> in <filename unknown>:0
  at UsagiMQ.Program.GetConnection (Int32 index) <0x4022e600 + 0x000a4> in <filename unknown>:0
  at UsagiMQ.Program.GetChannel (Int32 index) <0x4022d970 + 0x0009f> in <filename unknown>:0
  at UsagiMQ.Program.DeleteQueue () <0x4022d8e0 + 0x0000f> in <filename unknown>:0
  at UsagiMQ.Program.Main (System.String[] args) <0x402290d0 + 0x00333> in <filename unknown>:0

なんだろう、このエラー。
キューをデリートできないんだが。

        ├─rabbitmq-server───su───sh───rabbitmq-server───beam.smp─┬─erl_child_setup───inet_gethost───inet_gethost
        │                                                        ├─{1_scheduler}
        │                                                        ├─{2_scheduler}
        │                                                        ├─{async_10}
        │                                                        ├─{async_11}
        │                                                        ├─{async_12}

いったん、このプロセスを止めたいが。

rabbitmqctl stop
Stopping and halting node 'rabbit@tk2-217-18401' ...

でも止まらないし。

ログアウトしても止まらないし。漫画でも読むか。

漫画を読み終わってもストップしていない。
プロセスを直接落としていいものか?

# rabbitmq-server
sh: echo: I/O error
Failed to write pid file: /var/lib/rabbitmq/mnesia/rabbit@tk2-217-18401.pid

なんだこれ。

# rabbitmqctl reset
Resetting node 'rabbit@tk2-217-18401' ...
Error:  Mnesia is still running on node 'rabbit@tk2-217-18401'.
        Please stop the node with rabbitmqctl stop_app first.
# rabbitmqctl stop_app
Stopping node 'rabbit@tk2-217-18401' ...

エラーだらけだぜ。

e# ps aux | grep rabbitmq
root     15848  0.0  0.0   4500     0 ?        S    Mar16   0:00 /bin/sh /usr/sbin/rabbitmq-server
root     15855  0.0  0.0  55128     0 ?        S    Mar16   0:00 su rabbitmq -s /bin/sh -c /usr/lib rabbitmq/bin/rabbitmq-server
rabbitmq 15856  0.0  0.0   4500     8 ?        Ss   Mar16   0:00 sh -c /usr/lib/rabbitmq/bin/rabbitmq-server
rabbitmq 15857  0.0  0.0   4500     0 ?        S    Mar16   0:00 /bin/sh -e /usr/lib/rabbitmq/bin/rabbitmq-server
rabbitmq 15961  0.4  4.6 2710208 46964 ?       Sl   Mar16  27:39 /usr/lib/erlang/erts-8.1/bin/beam.smp -W w -A 64 -P 1048576 -t 5000000 -stbt db -zdbbl 32000 -K true -B i -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.6/ebin -noshell -noinput -s rabbit boot -sname rabbit@tk2-217-18401 -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit error_logger {file,"/var/log/rabbitmq/rabbit@tk2-217-18401.log"} -rabbit sasl_error_logger {file,"/var/log/rabbitmq/rabbit@tk2-217-18401-sasl.log"} -rabbit enabled_plugins_file "/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.6/plugins" -rabbit plugins_expand_dir "/var/lib/rabbitmq/mnesia/rabbit@tk2-217-18401-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/var/lib/rabbitmq/mnesia/rabbit@tk2-217-18401" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672
rabbitmq 16057  0.0  0.0   4356     4 ?        Ss   Mar16   0:02 erl_child_setup 1024
rabbitmq 16067  0.0  0.0  13676    20 ?        Ss   Mar16   0:01 inet_gethost 4
rabbitmq 16068  0.0  0.0  15796    84 ?        S    Mar16   0:02 inet_gethost 4
root     23218  0.0  0.0  12936   984 pts/0    S+   15:33   0:00 grep --color=auto rabbitmq
rabbitmq 27670  0.0  0.0  14488   936 ?        S    Mar06   0:55 /usr/lib/erlang/erts-8.1/bin/epmd -daemon
root@tk2-217-18401:/home/★user/shogi/csharp_service# kill 27670
root@tk2-217-18401:/home/★user/shogi/csharp_service# kill 16068
root@tk2-217-18401:/home/★user/shogi/csharp_service# kill 16067
root@tk2-217-18401:/home/★user/shogi/csharp_service# kill 16057
root@tk2-217-18401:/home/★user/shogi/csharp_service# kill 15961
-su: kill: (15961) - No such process
root@tk2-217-18401:/home/★user/shogi/csharp_service# kill 15857
-su: kill: (15857) - No such process
root@tk2-217-18401:/home/★user/shogi/csharp_service# kill 15856
-su: kill: (15856) - No such process
root@tk2-217-18401:/home/★user/shogi/csharp_service# kill 15855
-su: kill: (15855) - No such process
root@tk2-217-18401:/home/★user/shogi/csharp_service# kill 15848
-su: kill: (15848) - No such process
# rabbitmqctl stop
Stopping and halting node 'rabbit@tk2-217-18401' ...
Error: unable to connect to node 'rabbit@tk2-217-18401': nodedown

DIAGNOSTICS
===========

attempted to contact: ['rabbit@tk2-217-18401']

rabbit@tk2-217-18401:
  * connected to epmd (port 4369) on tk2-217-18401
  * epmd reports: node 'rabbit' not running at all
                  no other nodes on tk2-217-18401
  * suggestion: start the node

current node details:
- node name: 'rabbitmq-cli-88@tk2-217-18401'
- home dir: /var/lib/rabbitmq
- cookie hash: L1kt6KDPXO6/Lfg/VH+mqA==

# rabbitmq-server
sh: echo: I/O error
Failed to write pid file: /var/lib/rabbitmq/mnesia/rabbit@tk2-217-18401.pid

ハードディスク壊れてないか?

# rm -rf ukamuse_sdt4_child2/
# rm -rf ukamuse_sdt4_child3/
# rabbitmq-server

              RabbitMQ 3.6.6. Copyright (C) 2007-2016 Pivotal Software, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /var/log/rabbitmq/rabbit@tk2-217-18401.log
  ######  ##        /var/log/rabbitmq/rabbit@tk2-217-18401-sasl.log
  ##########
              Starting broker...
dets: file "/var/lib/rabbitmq/mnesia/rabbit@tk2-217-18401/recovery.dets" not properly closed, repairing ...
 completed with 0 plugins.

[Ctrl] + [Z]

# bg 1
[1]+ rabbitmq-server &
# rabbitmqctl list_queues
Listing queues ...
1113    0

さっきまで 1113 だったのが、1113 0 になったな。

# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
Name         : キューを削除します
"" + [Enter] : 次のステップへ進む
ex.) BrokenBox
Delete ? > 1113
削除するメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Delete ? > 1
15:59:09 741 34_cs ___(C) channel.open begin
15:59:09 748 34_cs ___(C) connection.open begin
15:59:09 973 34_cs ___(C) connection.open end
15:59:09 978 34_cs ___(C) channel.open end
15:59:09 979 34_cs ___(C) connection.close begin
15:59:09 979 34_cs ___(C) channel.close begin
15:59:09 987 34_cs ___(C) channel.close end
15:59:09 992 34_cs ___(C) connection.close end
[1113]キューを削除したはずだぜ☆(^~^) result=[4294967295] ただし、中にメッセージが残っていると消すのに失敗するみたいだが。
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
Name         : キューを削除します
"" + [Enter] : 次のステップへ進む
ex.) BrokenBox
Delete ? >

動いたか?

# rabbitmqctl list_queues
Listing queues ...

デリートも機能したようだ。

readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok
readyok

なんか 無限ループしてるなあ。

tamesi35a2_cpp

//--------------------------------------------------------------------------------
// OS      : Windows10 : It not work. This program for Ubuntu 16.04.
//
// OS      : Ubuntu 16.04
// Library : libev
//         : Install   : Command  : sudo apt-get update
//                                : sudo apt-get install libev-dev
// Service : RabbitMQ
//         : Install   : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
//         : Reference : Web site : Top page http://www.rabbitmq.com/
//                     : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
//                     : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
//         : Manual    : Command  : man rabbitmqctl
//         : Start     : Command  : (1) rabbitmq-server
//                                : (2) [Ctrl] + [Z]
//                                : (3) bg (Job Number)
//         : Stop      : Command  : rabbitmqctl stop
//         : Check     : Command  : rabbitmqctl status
//         :           : Command  : rabbitmqctl list_queues
//                                : // Count messages in queues.
// Library : AMQP-CPP
//         : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
//         : Reference : Web site : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
//         : Reference : Web site : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
//         : Reference : Web site : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
//         : Reference : Web site : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
//         : Reference : Web site : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//
// Program : this
//         : Author                         : Satoshi TAKAHASHI (Handle. Muzudho)
//         : License                        : MIT License https://opensource.org/licenses/MIT
//         : Explain                        : For test. Practice of interprocess communication.
//                                          : This program run on the background side.
//                                          : Append daze to received message on interprocess communication.
//                                          : Daze meaning is dialect in the suburbs of Tokyo.
//                                          : Please settings source queue, destination queue to command line argument.
//                                          :   And standard input to /dev/null
//         : Compile   : Command            : g++ -std=c++11 tamesi35a2_cpp.cpp -o tamesi35a2_cpp.exe -lev -lamqpcpp -pthread
//                                : Explain :     g++                    C++ Compiler
//                                          :     -std=c++11             C++ 11
//                                          :     tamesi35a2_cpp.cpp     Source file
//                                          :     -o tamesi35a2_cpp.exe  Output executable file
//                                          :     -lev                   Used library libev
//                                          :     -lamqpcpp              Used library AMQP-CPP
//                                          :     -pthread               Used library thread
//         : Execute   : Command            : ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
//                                          : Run on the background.
//                                : FAQ     : Failures case: Message conflict when starting multiple processes.
//                                : Explain :     ./tamesi35a2_cpp.exe         Executable file
//                                          :     --enqueue                    Destination queue settings section
//                                          :     1113                         Destination queue name             (string ok)
//                                          :     durable                      Destination queue life span        (multiple)
//                                          :     --dequeue                    Source queue settings section
//                                          :     1112                         Source queue name                  (string ok)
//                                          :     durable                      Source queue life span
//                                          :     autodelete                   Source queue life span
//                                          :     > ./tamesi35a2_cpp.out.log   Standard output to   ./tamesi35a2_cpp.out.log
//                                          :     2> ./tamesi35a2_cpp.err.log  Standard error  to   ./tamesi35a2_cpp.err.log
//                                          :     < /dev/null                  Standard input  from nothing
//                                          :     &                            Process goto the background
//                                : Explain : command line argument of life span of queue (Compositable)
//                                          :     durable     The queue exists, Even if the RabbitMQ service closed.
//                                          :     autodelete  The queue delete when consumers to be nothing.
//                                          :     passive     Just check what queue exists. Don't use read/write to queue.
//                                          :     exclusive   The queue delete when connection closed. The queue exists is this connection only.
//                                : Example : Windows 10   (This program not work) default queue life span.  durable
//                                          : Ubuntu 16.04                         default queue life span.  durable autodelete
//         : Stop      : Typing   : [Ctrl]+[C]
//         : Referrence: ラムダ式(C++11) (cpprefjp - C++日本語リファレンス) https ://cpprefjp.github.io/lang/cpp11/lambda_expressions.html
//--------------------------------------------------------------------------------

#include <string> // std::string
#include <iostream> // std::cout
#include <iomanip> // std::setfill(...)
#include <sstream> // std::ostringstream
#include <chrono>
#include <thread> // sleep_for
#include <future> // thread

// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

// 時計はずれているが、現在時刻が返ってくる
static std::string getTimeslipNow()
{
    // どこか外国の 1969年12月31日19時ぐらいから数えたミリ秒だろうか?
    std::chrono::milliseconds ms = std::chrono::duration_cast< std::chrono::milliseconds >(
        std::chrono::system_clock::now().time_since_epoch()
        );

    long long hours = ms.count() / 1000 / 60 / 60 % 24;
    long long minutes = ms.count() / 1000 / 60 % 60;
    long long seconds = ms.count() / 1000 % 60;
    long long milliseconds = ms.count() % 1000;

    std::ostringstream oss;
    oss << std::setfill('0') << std::setw(2) << hours << ":" << std::setfill('0') << std::setw(2) << minutes << ":" << std::setfill('0') << std::setw(2) << seconds << "'" << std::setfill('0') << std::setw(3) << milliseconds << " ";
    return oss.str();
}

// AMQP-CPPでの実装 :
// AMQP::durable=[1] RabbitMQが止まってもキューを残す
// AMQP::autodelete=[2] コンシューマーが1人も接続していなかったら消す
// AMQP::passive=[8] キューが存在するかどうかチェックするだけ。中身見ない時これ
// AMQP::exclusive=[512] この接続でだけ使える。この接続が切れたら消す
typedef int LifeSpan_t;

static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
// 0 : enqueue用設定
// 1 : dequeue用設定
static const int ENQUEUE_INDEX = 0;
static const int DEQUEUE_INDEX = 1;
static const int NUM_INDEX = 2;
static std::string name_queues[] = { "", "" };
static LifeSpan_t lifeSpan_queues[] = { (LifeSpan_t)0, (LifeSpan_t)0 };
static LifeSpan_t LifeSpanString_To_Int(std::string lifeSpan)
{
    if ("durable"==lifeSpan) {
        return AMQP::durable;
    } else if ("autodelete" == lifeSpan) {
        return AMQP::autodelete;
    } else if ("passive" == lifeSpan) {
        return AMQP::passive;
    } else if ("exclusive" == lifeSpan) {
        return AMQP::exclusive;
    } else {
        std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
        exit(11);
    }
}
static std::string exchange_name = "myexchange";
static std::string routing_key = "";

static std::string Dump()
{
    static std::ostringstream sb;
    sb << "tamesi35a2_cs.cs Dump" << std::endl
        << "    name_queues         [ENQUEUE_INDEX]    =[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
        << "    lifeSpan_queues     [ENQUEUE_INDEX]    =[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl
        << "    ----" << std::endl
        << "    name_queues         [DEQUEUE_INDEX]    =[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl
        << "    lifeSpan_queues     [DEQUEUE_INDEX]    =[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl
        << "    ----" << std::endl;
    return sb.str();
}


// 接続はシングルトンにします
static struct ev_loop* pLoop_ev[2] = { ev_loop_new(), ev_loop_new() };
static AMQP::LibEvHandler* pHandler_ev[2] = { new AMQP::LibEvHandler(pLoop_ev[ENQUEUE_INDEX]), new AMQP::LibEvHandler(pLoop_ev[DEQUEUE_INDEX]) };

static AMQP::TcpConnection* pConnection_ev[2] = { nullptr,nullptr };
static AMQP::TcpConnection* getConnection(int index)
{
    if (nullptr== pConnection_ev[index]) {
        std::cout << getTimeslipNow() << "35a2_cpp " << (index== ENQUEUE_INDEX?"(P)___":"___(C)") << " connection.open begin" << std::endl;
        pConnection_ev[index] = new AMQP::TcpConnection( pHandler_ev[index], ADDRESS );
        std::cout << getTimeslipNow() << "35a2_cpp " << (index == ENQUEUE_INDEX ? "(P)___" : "___(C)") << " connection.open end" << std::endl;
    }
    return pConnection_ev[index];
}
static void closeChannel(int index);
static void closeConnection(int index)
{
    if (nullptr != pConnection_ev[index]) {
        std::cout << getTimeslipNow() << "35a2_cpp " << (index == ENQUEUE_INDEX ? "(P)___" : "___(C)") << " connection.close begin" << std::endl;
        // チャンネルにもヌルのフラグを入れる
        closeChannel(index);

        pConnection_ev[index]->close();
        pConnection_ev[index] = nullptr;
        std::cout << getTimeslipNow() << "35a2_cpp " << (index == ENQUEUE_INDEX ? "(P)___" : "___(C)") << " connection.close end" << std::endl;
    }
}

// チャンネルはシングルトンにします。
static AMQP::TcpChannel* pChannel_ev[2] = { nullptr,nullptr };
static AMQP::TcpChannel* getChannel(int index)
{
    if (nullptr== pChannel_ev[index])
    {
        std::cout << getTimeslipNow() << "35a2_cpp " << (index == ENQUEUE_INDEX ? "(P)___" : "___(C)") << " channel.open begin" << std::endl;
        pChannel_ev[index] = new AMQP::TcpChannel(getConnection(index));
        std::cout << getTimeslipNow() << "35a2_cpp " << (index == ENQUEUE_INDEX ? "(P)___" : "___(C)") << " channel.open end" << std::endl;
    }
    return pChannel_ev[index];
}
static void closeChannel(int index)
{
    if (nullptr != pChannel_ev[index]) {
        std::cout << getTimeslipNow() << "35a2_cpp " << (index == ENQUEUE_INDEX ? "(P)___" : "___(C)") << " channel.close begin" << std::endl;
        pChannel_ev[index]->close();
        pChannel_ev[index] = nullptr;
        std::cout << getTimeslipNow() << "35a2_cpp " << (index == ENQUEUE_INDEX ? "(P)___" : "___(C)") << " channel.close end" << std::endl;
    }
}


/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
    const int bufferSize = 1024;
    static std::string buffer[bufferSize] = {};
    static int bufferCursors[2] = { 0, 0 };
    static int busy = 0;
    const int PUT_INDEX = 0;
    const int GET_INDEX = 1;

    static std::mutex _mutex;
    // 返り値: 追加しなかったとき 偽、それ以外のとき 真。
    static bool push_inner(std::string message)
    {
        std::lock_guard<std::mutex> lock(_mutex);

        if (busy + 1 == bufferSize || bufferSize * 99 / 100 < busy) {
            // busy
            return false;
        }

        buffer[bufferCursors[PUT_INDEX]] = message;
        bufferCursors[PUT_INDEX]++;
        busy++;

        if (!(bufferCursors[PUT_INDEX] < bufferSize))
        {
            bufferCursors[PUT_INDEX] = 0;
        }

        return true;
    }
    static std::string pop_inner()
    {
        std::lock_guard<std::mutex> lock(_mutex);

        if ("" != buffer[bufferCursors[GET_INDEX]])
        {
            std::string message = buffer[bufferCursors[GET_INDEX]];

            buffer[bufferCursors[GET_INDEX]] = "";
            bufferCursors[GET_INDEX]++;
            busy--;

            if (!(bufferCursors[GET_INDEX] < bufferSize))
            {
                bufferCursors[GET_INDEX] = 0;
            }

            return message;
        }
        // buffer is empty
        return "";
    }

    // 追加に成功するまで、ブロックする
    static void push_block(std::string message)
    {
        while (!rotationBuffer::push_inner(message))
        {
            // busy
            std::this_thread::sleep_for(std::chrono::milliseconds(40));
        }
    }
    // 何か取れるまで、ブロックする
    static std::string pop_block()
    {
        std::string message;
        while ("" == (message = rotationBuffer::pop_inner()))
        {
            // buffer is empty
            std::this_thread::sleep_for(std::chrono::milliseconds(20));
        }
        return message;
    }
}

static void sendMessage(std::string message)
{
    // チャンネルを宣言
    getChannel(ENQUEUE_INDEX)->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan_queues[ENQUEUE_INDEX])
        .onError([](const char* errMsg) {
        std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.SM_.declareQueue.onError [" << errMsg << "] queue name=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan=[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << "\n";
    });

    // チャンネルを束縛
    getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
        // ev_run から1回だけ呼び出される。コミットを開始してこのハンドラから抜けること。コミットの成功時に接続を切ることでev_runから抜ける。
        .onSuccess([&message]() {
            std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ message=[" << message << "] ****important**** name_enqueue=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan_enqueue=[" << lifeSpan_queues[ENQUEUE_INDEX] << "] name_dequeue=[" << name_queues[DEQUEUE_INDEX] << "] lifeSpan_dequeue=[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl;

            // トランザクション開始
            getChannel(ENQUEUE_INDEX)->startTransaction();
            // パブリッシュ開始
            if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {

                // コミットを開始
                getChannel(ENQUEUE_INDEX)->commitTransaction()
                    .onSuccess([=]() {
                        // コミット成功
                        std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP.BQ_.OS_ CT_.onSuccess Commited [" << message << "] **********important**********" << std::endl;

                        // 接続を切ったあと、トランザクションのコミットが実行されるようだ。
                        closeConnection(ENQUEUE_INDEX);
                    })
                    .onError([&message](const char *errMsg) {
                        std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
                    });
            }
            else {
                std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
                getChannel(ENQUEUE_INDEX)->rollbackTransaction();
            }

        }).onError([](const char* errMsg) {
            std::cout << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
            std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
        });

    // ここで、バインドキューのオンサクセスの処理へ。
    ev_run(pLoop_ev[ENQUEUE_INDEX], 0);
}

// メッセージ・キューの送信を担当
static void workProduce()
{
    std::cout << getTimeslipNow() << "35a2_cpp (P)___ WKP begin" << std::endl;

    // おっきな無限ループ
    for (;;) {

        // 回転式バッファーからメッセージを切り取る。
        std::string message = rotationBuffer::pop_block();

        // 末尾に daze を付ける。
        message += "daze";

        // さあ、送信だ。
        sendMessage(message);
    }
    std::cerr << getTimeslipNow() << "(>_<)35a2_cpp (P)___ WKP outOfInfinityLoop error" << std::endl;
}

// メッセージ・キューの監視を開始
static void workConsume()
{
    std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ begin" << std::endl;

    std::cout << getTimeslipNow() << "35a2_cpp ___(C) DQ_ begin" << std::endl;
    // I will go to the front of the box named "1111".
    LifeSpan_t lifeSpan = lifeSpan_queues[DEQUEUE_INDEX];
    getChannel(DEQUEUE_INDEX)->declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);
    std::cout << getTimeslipNow() << "35a2_cpp ___(C) DQ_ end" << std::endl;

    std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS begin" << std::endl;
    // I look inside the box.
    auto errorCb = [&lifeSpan](const char *errMsg) {
        std::cerr << getTimeslipNow() << "(>_<)35a2_cpp ___(C) CNS.onError My ID watching failed [" << errMsg << "] lifeSpan=[" << lifeSpan << "]" << std::endl;
    };
    auto messageCb = [](const AMQP::Message &amqpMessage, uint64_t deliveryTag, bool redelivered) {

        std::string message(amqpMessage.body(), amqpMessage.bodySize());
        std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS.received [" << message << "]" << std::endl;

        rotationBuffer::push_block(message);

        getChannel(DEQUEUE_INDEX)->ack(deliveryTag);
    };
    getChannel(DEQUEUE_INDEX)->consume(name_queues[DEQUEUE_INDEX])
        .onReceived(messageCb)
        .onError(errorCb);
    std::cout << getTimeslipNow() << "35a2_cpp ___(C) CNS end" << std::endl;

    std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ ev_run begin" << std::endl;
    // I will keep on forever.
    ev_run(pLoop_ev[DEQUEUE_INDEX]);
    std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ ev_run end" << std::endl;

    std::cout << getTimeslipNow() << "35a2_cpp ___(C) ___ end" << std::endl;

    // I will not come here.
    return;
}

bool parseArgs(int argc, char* argv[], std::string& name_enqueue, LifeSpan_t& lifeSpan_enqueue, std::string& name_dequeue, LifeSpan_t& lifeSpan_dequeue)
{
    if (0<argc)
    {
        // プログラム名を省き、コマンドライン引数だけをつなげる。
        std::string cmdArg;
        for (int i = 1; i < argc; ++i)
        {
            cmdArg += std::string(argv[i]);
            if (i < argc) {
                cmdArg += " ";
            }
        }
        std::istringstream data(cmdArg);
        std::cout << getTimeslipNow() << "35a2_cpp main commandLineArguments=[" << data.str() << "]" << std::endl;

        // 与件
        // 「--enqueue 1117 durable autodelete --dequeue 1116 durable autodelete」
        // 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」

        // 受け皿
        // name_queues 結果はこれらの配列に入れる
        // lifeSpan_queues

        // 記憶
        int m0 = -1; // enqueue or dequeue index.
        int m1 = -1; // lifespans index.

                     // 解析器
        std::string a;
        while (data >> a) {

            if ("--" == a.substr(0, 2)) {//先頭の2文字が「--」の場合
                if ("--enqueue" == a) { m0 = ENQUEUE_INDEX; m1 = -1; }
                else if ("--dequeue" == a) { m0 = DEQUEUE_INDEX; m1 = -1; }
                else { break; }
            }
            else if (
                ">" == a.substr(0, 1)//先頭の1文字が「>」の場合
                || "<" == a.substr(0, 1)//「<」
                || ">" == a.substr(1, 1)//先頭から2文字目が「>」の場合
                ) {
                // 標準入力、標準出力、標準エラーを無視
                break;
            } else if (0 == m0) { // エンキュー先キュー名
                if (-1 == m1) {
                    name_enqueue = a;
                    lifeSpan_enqueue = 0;
                    m1++;
                }
                else {
                    lifeSpan_enqueue |= LifeSpanString_To_Int(a);
                }
            } else if ( 1 == m0) { // デキュー元キュー名
                if (-1 == m1) {
                    name_dequeue = a;
                    lifeSpan_dequeue = 0;
                    m1++;
                }
                else {
                    lifeSpan_dequeue |= LifeSpanString_To_Int(a);
                }
            }
            else
            {
                // 欲しい内容がくるまでスルー
            }
        }

        if ("" == name_enqueue || "" == name_dequeue) {
            goto gt_CommandlineParameterError;
        }

        std::cout << getTimeslipNow() << "35a2_cpp main dump begin" << std::endl;
        std::cout << getTimeslipNow() << Dump();
        std::cout << getTimeslipNow() << "35a2_cpp main dump end" << std::endl;
    }
    else
    {
        goto gt_CommandlineParameterError;
    }
    return true;

gt_CommandlineParameterError:
    std::cerr << getTimeslipNow() << "コマンドライン引数の「--enqueue 送信先キュー名 寿命」「--dequeue 受信元キュー名 寿命」を漏れなく指定してください。" << std::endl
        << "例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete" << std::endl
        << "寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」" << std::endl
        << "name_queues[ENQUEUE_INDEX]=[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
        << "name_queues[DEQUEUE_INDEX]=[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl;
    return false;
}

int main(int argc, char* argv[])
{
    // 引数の解析
    if (!parseArgs(argc, argv, name_queues[ENQUEUE_INDEX], lifeSpan_queues[ENQUEUE_INDEX], name_queues[DEQUEUE_INDEX], lifeSpan_queues[DEQUEUE_INDEX]))
    {
        exit(12);
    }

    // キューの監視スレッド、メッセージの送信スレッドをスタート
    {
        auto th1 = std::thread([] { workConsume(); });
        auto th2 = std::thread([] { workProduce(); });

        for (;;) // このメイン・スレッドはずっと寝てる
        {
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
        // このプログラムは、自分では接続を切らないので、ここから下には来ない
        th2.join();
        th1.join();
    }
    return 0;
}
# rabbitmqctl list_queues
Listing queues ...
1112    0
1113    1310

無限ループのあと、メッセージが 1310 件も送信されてるのか。

# rabbitmqctl list_queues
Listing queues ...
1112    0
1113    1310
# rabbitmqctl list_queues
Listing queues ...
1112    0
1113    2790
# rabbitmqctl list_queues
Listing queues ...
1112    0
1113    2904

改造浮かむ瀬が 出力出し続けてるようだ。なんでだろう。

なんか メッセージ・キューに 30行ぐらいのデータが 2700件ぐらい入っていて、
データが入っているので キューを削除することができない。

そこで 2700回ぐらいデータを取り出すループを回すが なかなか終わらない。

3時間ぐらいで 全部取り出せたろうか。

# rabbitmqctl list_queues
Listing queues ...
1112    0
amq.gen-ZGOwZSjW0llDIVW8fH4n5g  0
1113    0
amq.gen-qSnAlH4nPL1W7tjunKBIjg  0
amq.gen-msQiRvA2FLc__frBIDzepw  0
amq.gen-q9TQuMGUX8An84ZSCLmBPA  0
amq.gen-uThSmHkcfnA2gAkzT65tOg  0

プロセス間通信をテストし直すと、変な名前のキューが増えている。

08:55:41'745 (^q^)ukamuse_sdt4_child4 (P)___ WKP.SM_ begin queue name=[1113] lifeSpan=[1]
08:55:49'290 (^q^)ukamuse_sdt4_child4 (P)___ WKP.SM_ begin queue name=[1113] lifeSpan=[1]
08:57:46'599 (^q^)ukamuse_sdt4_child4 (P)___ WKP.SM_ begin queue name=[] lifeSpan=[0]
08:57:46'622 (^q^)ukamuse_sdt4_child4 (P)___ WKP.SM_ begin queue name=[] lifeSpan=[0]
08:57:46'641 (^q^)ukamuse_sdt4_child4 (P)___ WKP.SM_ begin queue name=[] lifeSpan=[0]
08:57:46'695 (^q^)ukamuse_sdt4_child4 (P)___ WKP.SM_ begin queue name=[] lifeSpan=[0]
08:57:46'723 (^q^)ukamuse_sdt4_child4 (P)___ WKP.SM_ begin queue name=[] lifeSpan=[0]

ログを見ると、名無しのキューに書き込んでいる。そんなことができるのか。

ログが無限に増えっていって サーバーの容量を食らいつくしてしまう。
ローテしないものか。

# rabbitmqctl list_queues
Listing queues ...
1112    0
amq.gen-D_n3IdNQaiWwVs11qDnqwg  0
amq.gen-ZGOwZSjW0llDIVW8fH4n5g  0
amq.gen-Z6lAb8pxD_cgeBkTxJXQ5A  0
amq.gen-AxDCUlJNHq5Q63NCHIYQkw  0
amq.gen-KA3LtO8nFf6_jej4XZUnIg  0
1113    0
amq.gen-bXxV9rS5yTiER1b7J32ZaA  0
amq.gen-Fguq25GvWHN9HEJ_xOk35g  0
amq.gen-dU7dictb3OhNLBNqsjqSjw  0
amq.gen-nnRNh30EXrZNLp9AKI-7aw  0
amq.gen-QgrFEoptxW2TpXbfJZQ_vg  0
amq.gen-x8ReXdSEWvcF7lnOPJ9d5A  0
amq.gen-qSnAlH4nPL1W7tjunKBIjg  0
amq.gen-msQiRvA2FLc__frBIDzepw  0
amq.gen-q9TQuMGUX8An84ZSCLmBPA  0
amq.gen-uThSmHkcfnA2gAkzT65tOg  0

また、ゴミのキューが増えてくるのも なぜだろうか?

キューを作る前に メッセージを送信してるんじゃないか?

18:37:00 260 34_cs <-------- [interrupt!] Dequeue(^q^) info string book_ply 32767
info depth 1 seldepth 1 multipv 1 score cp 63 nodes 67 nps 848 time 79 pv 2g2f
bestmove 2g2f
info string book_ply 32767
info depth 1 seldepth 1 multipv 1 score cp 63 nodes 66 nps 2000 time 33 pv 2g2f
bestmove 2g2f
info string book_ply 32767
info depth 1 seldepth 1 multipv 1 score cp 155 nodes 68 nps 1619 time 42 pv 2g2f 3c3d 2f2e
bestmove 2g2f ponder 3c3d
info string book_ply 32767
info depth 1 seldepth 1 multipv 1 score cp -35 nodes 91 nps 1137 time 80 pv 6i7h
info depth 2 seldepth 2 multipv 1 score cp 16 nodes 174 nps 1775 time 98 pv 7i7h 3c3d
bestmove 7i7h ponder 3c3d
info string book_ply 32767
info depth 1 seldepth 1 multipv 1 score cp 155 nodes 32 nps 800 time 40 pv 2g2f 3c3d 2f2e
bestmove 2g2f ponder 3c3d
info string book_ply 32767
info depth 1 seldepth 1 multipv 1 score cp -95 nodes 103 nps 2019 time 51 pv 6i7h 3c3d 2g2f
bestmove 6i7h ponder 3c3d

この、蓄積されていくのも不気味だ。

# rabbitmqctl list_queues
Listing queues ...
amq.gen-VYXcKVIrk-n36mcaHqUjJw  0
amq.gen-iNqu_wbZjJIdbAiXtKi6dQ  0
1112    0
amq.gen-8q07JsdlQAPt55LaH8As6A  0
amq.gen-IsvAj1hZUQ43oD4ri7-LGg  0
amq.gen--ErF10SWg7mOrPm6bzTDAQ  0
amq.gen-KnapyxkFnkj83kgV9TGumQ  0
amq.gen-eDhHF3Ncn1BUam9RVLvBMQ  0
1113    0
amq.gen-bqdiv9okYqsoPJv441zbsw  0
amq.gen-yH0aKaFXd9JDZgUI8_Yk0g  0
amq.gen-yzkwsMf-fhLr1VkpzbSzSg  0
amq.gen-v81vG9UOfV1_FzJvomC4Dw  0
amq.gen--ncng_0P0ZQa7uniYa3xYg  0
amq.gen-aPmRvVw9ju7O_Rr4il6JpQ  0
amq.gen-_cp5CtG47Z7_Rl_0xoHEZQ  0
amq.gen-4oAT1RkNPcT5PnAr-qzwTg  0
amq.gen-xE_yX_F6SPj55p9hQBUEKg  0
amq.gen-o-rXWc_PBYkXPX-33zSjWg  0
amq.gen-uoMOZvsQMabaAMMc2uzXTQ  0
amq.gen-agzj98-wGbXy__IYRrhjQw  0
amq.gen-63hgk-HwUejBaRH9TfzvCQ  0
amq.gen-LkOt2O0B58ED1ce8c0Krcg  0
amq.gen-l6Czel9XapmuA1g4yh-rFg  0
amq.gen-KAgW_MO08eIhFoOLf0LODA  0
amq.gen-RT4PGw2DNzS7raMGk23cxQ  0
amq.gen-bdJspsJUMRckMjDZKVbp6Q  0
amq.gen-KE5pJ0DIHltZeEwam1xc1w  0
amq.gen-6boAKkKUDuQ6bK0XquVACQ  0
amq.gen-uuPVLVLBlMGmN1CVf4HtFw  0
amq.gen-KSpmIlk_uDqgw_oFmQdcYg  0
amq.gen-AHtrb3pkjwUOR7TNExYrrg  0
amq.gen-rm_m2xtVeeSdpDhC7_ZofA  0

なんで キューが増えるんだろう?

改行してないところでは、クリアーが呼び出されていない気もする。

    static int clearMessage() {
        oss.str(std::string());// 空文字列を入れてクリアーする
        return 0;
    }
    static int sendMessage(std::string message) {
        std::cout << IOLock << message << std::endl << IOUnlock;
        oss.str(std::string());// 空文字列を入れてクリアーする
        return 0;
    }
    // メッセージキューにエンキュー
    // 出力ストリームのカンマ区切り「,,,;」で使われ、何か数字を返す必要があって必然性はない数字 0 を返している
    static int flushMessage()
    {
        std::string message = service_io::oss.str();
        oss.str(std::string());// 空文字列を入れてクリアーする

クリアーを書いてなかった。

# rabbitmqctl list_queues
Listing queues ...
1112    0
amq.gen-sCFwK6y8RTyG_ATHjrF8zg  0
1113    0

まだどこかで、名無しのキューに書き込んでいるのだろうか。

# rabbitmqctl list_queues
Listing queues ...
1112    0
amq.gen-sCFwK6y8RTyG_ATHjrF8zg  0
1113    0
amq.gen-GrrhJyCjaAlwb-ukR6wQJg  0
root@tk2-217-18401:/home/csg10/shogi/csharp_service# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
Name         : キューを削除します
"" + [Enter] : 次のステップへ進む
ex.) BrokenBox
Delete ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) SunnyBox
Enqueue ? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 3
Enqueue ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > 1
20:17:28 899 34_cs ___(C) start begin
20:17:28 906 34_cs ___(C) consumer.create begin
20:17:28 907 34_cs ___(C) consumer.create end
20:17:28 908 34_cs ___(C) channel.open begin
20:17:28 908 34_cs ___(C) connection.open begin
20:17:29 080 34_cs ___(C) connection.open end
20:17:29 084 34_cs ___(C) channel.open end
20:17:29 088 34_cs <-------- [interrupt!] Registered(^q^)
20:17:29 089 34_cs ___(C) start end
20:17:29 089 34_cs (P)___ waitRegistered begin
20:17:29 089 34_cs (P)___ waitRegistered end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > position sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves
20:17:35 290 34_cs (P)___ enqueue begin [position sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves]
20:17:35 291 34_cs (P)___ channel.open begin
20:17:35 291 34_cs (P)___ connection.open begin
20:17:35 308 34_cs (P)___ connection.open end
20:17:35 310 34_cs (P)___ channel.open end
20:17:35 314 34_cs (P)___ enqueue end
20:17:35 315 34_cs (P)___ connection.close begin
20:17:35 315 34_cs (P)___ channel.close begin
20:17:35 320 34_cs (P)___ channel.close end
20:17:35 326 34_cs (P)___ connection.close end
Enqueue? > go
20:17:37 681 34_cs (P)___ enqueue begin [go]
20:17:37 681 34_cs (P)___ channel.open begin
20:17:37 681 34_cs (P)___ connection.open begin
20:17:37 687 34_cs (P)___ connection.open end
20:17:37 689 34_cs (P)___ channel.open end
20:17:37 690 34_cs (P)___ enqueue end
20:17:37 690 34_cs (P)___ connection.close begin
20:17:37 690 34_cs (P)___ channel.close begin
20:17:37 691 34_cs (P)___ channel.close end
20:17:37 694 34_cs (P)___ connection.close end
Enqueue? > 20:17:37 760 34_cs <-------- [interrupt!] Dequeue(^q^) bestmove 2g2f ponder 3c3d

root@tk2-217-18401:/home/csg10/shogi/csharp_service# rabbitmqctl list_queues
Listing queues ...
1112    0
amq.gen-sCFwK6y8RTyG_ATHjrF8zg  0
1113    0
amq.gen-pqbejGq7x3zfo2zBeS65Eg  0
amq.gen-GrrhJyCjaAlwb-ukR6wQJg  0

どのタイミングで増えるのか?

# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
Name         : キューを削除します
"" + [Enter] : 次のステップへ進む
ex.) BrokenBox
Delete ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) SunnyBox
Enqueue ? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 3
Enqueue ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > 1
20:19:09 602 34_cs ___(C) start begin
20:19:09 610 34_cs ___(C) consumer.create begin
20:19:09 610 34_cs ___(C) consumer.create end
20:19:09 611 34_cs ___(C) channel.open begin
20:19:09 611 34_cs ___(C) connection.open begin
20:19:09 766 34_cs ___(C) connection.open end
20:19:09 771 34_cs ___(C) channel.open end
20:19:09 773 34_cs <-------- [interrupt!] Registered(^q^)
20:19:09 774 34_cs ___(C) start end
20:19:09 774 34_cs (P)___ waitRegistered begin
20:19:09 774 34_cs (P)___ waitRegistered end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > position sfen lnsgkgsnl/1r5b1/4ppppp/pppp5/9/PPPP5/4PPPPP/1B5R1/LNSGKGSNL b - 1 moves
20:19:17 673 34_cs (P)___ enqueue begin [position sfen lnsgkgsnl/1r5b1/4ppppp/pppp5/9/PPPP5/4PPPPP/1B5R1/LNSGKGSNL b - 1 moves]
20:19:17 674 34_cs (P)___ channel.open begin
20:19:17 674 34_cs (P)___ connection.open begin
20:19:17 681 34_cs (P)___ connection.open end
20:19:17 683 34_cs (P)___ channel.open end
20:19:17 687 34_cs (P)___ enqueue end
20:19:17 687 34_cs (P)___ connection.close begin
20:19:17 687 34_cs (P)___ channel.close begin
20:19:17 693 34_cs (P)___ channel.close end
20:19:17 700 34_cs (P)___ connection.close end
Enqueue? > root@tk2-217-18401:/home/csg10/shogi/csharp_service# rabbitmqctl list_queues
Listing queues ...
1112    0
amq.gen-sCFwK6y8RTyG_ATHjrF8zg  0
1113    0
amq.gen-pqbejGq7x3zfo2zBeS65Eg  0
amq.gen-GrrhJyCjaAlwb-ukR6wQJg  0
root@tk2-217-18401:/home/csg10/shogi/csharp_service# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
Name         : キューを削除します
"" + [Enter] : 次のステップへ進む
ex.) BrokenBox
Delete ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) SunnyBox
Enqueue ? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 3
Enqueue ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
ex.) RainyBox
Dequeue ? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable    : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive  : この接続でだけ使える。この接続が切れたら消す
ex.) 1
Dequeue ? > 1
20:19:37 576 34_cs ___(C) start begin
20:19:37 584 34_cs ___(C) consumer.create begin
20:19:37 585 34_cs ___(C) consumer.create end
20:19:37 587 34_cs ___(C) channel.open begin
20:19:37 587 34_cs ___(C) connection.open begin
20:19:37 759 34_cs ___(C) connection.open end
20:19:37 763 34_cs ___(C) channel.open end
20:19:37 766 34_cs <-------- [interrupt!] Registered(^q^)
20:19:37 766 34_cs ___(C) start end
20:19:37 766 34_cs (P)___ waitRegistered begin
20:19:37 766 34_cs (P)___ waitRegistered end
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > go
20:19:39 137 34_cs (P)___ enqueue begin [go]
20:19:39 138 34_cs (P)___ channel.open begin
20:19:39 138 34_cs (P)___ connection.open begin
20:19:39 144 34_cs (P)___ connection.open end
20:19:39 146 34_cs (P)___ channel.open end
20:19:39 150 34_cs (P)___ enqueue end
20:19:39 151 34_cs (P)___ connection.close begin
20:19:39 151 34_cs (P)___ channel.close begin
20:19:39 156 34_cs (P)___ channel.close end
20:19:39 160 34_cs (P)___ connection.close end
Enqueue? > 20:19:39 313 34_cs <-------- [interrupt!] Dequeue(^q^) bestmove 6i7h ponder 8d8e

root@tk2-217-18401:/home/csg10/shogi/csharp_service# rabbitmqctl list_queues
Listing queues ...
1112    0
amq.gen-sCFwK6y8RTyG_ATHjrF8zg  0
1113    0
amq.gen-pqbejGq7x3zfo2zBeS65Eg  0
amq.gen-M21Y28aFcY52grzn4QzbQQ  0
amq.gen-GrrhJyCjaAlwb-ukR6wQJg  0

go をすると増えるのか?

ponder の後ろに空行が1個あるのも怪しい。が、関係なさそうだ。

            .onSuccess([&message]() {

このラムダのキャプチャを、コピー渡しにしてみようか。

# rabbitmqctl list_queues
Listing queues ...
1112    0
amq.gen-sCFwK6y8RTyG_ATHjrF8zg  0
1113    0
amq.gen-pqbejGq7x3zfo2zBeS65Eg  0
amq.gen-M21Y28aFcY52grzn4QzbQQ  0
amq.gen-148snlqYYUeQYzG3e_OwHg  0
amq.gen-GrrhJyCjaAlwb-ukR6wQJg  0

でも増えるし。

# jobs
[1]-  Running                 rabbitmq-server &
[2]+  Running                 ./apery --enqueue 1113 durable --dequeue 1112 durable autodelete > ./apery.out.log 2> ./apery.err.log < /dev/null &  (wd: /home/csg10/shogi/ukamuse_sdt4_child4/bin)

うしろのサーバー、落としてみるか?

落とすとブローカーが落ちた。

改造浮かむ瀬 でないものと プロセス間通信しても、名前がおかしなキューは増えないし。

usinewgame 移行のどこかで

12:00:40'813 (>_<)ukamuse_sdt4_child4 (P)___ WKP.SM_ queue name is empty=[] lifeSpan=[0]

キュー名がなぜか消えていて、通信しようとするらしい。

12:28:52'170 (>_<)ukamuse_sdt4_child4 (P)___ WKP.SM_ queue name is empty=[] lifeSpan=[0] hint=[BestmoveAndPonder]

go のあとで落ちた。なんで このとき グローバル変数に入れてあるはずのキュー名が空になっているのか?

    for (Thread* th : pos.searcher()->threads) {
        th->rootPos = Position(pos, th);
        th->maxPly = 0;
        th->rootDepth = Depth0;
        th->rootMoves = rootMoves;
    }

pos変数のサーチャーが所有するスレッドのリストは、グローバル変数の中身を見てくれないんだろうか?

13:21:08'490 (>_<)ukamuse_sdt4_child4 (P)___ WKP.SM_ queue name is empty=[] ENQUEUE_INDEX=[0] lifeSpan=[0] hint=[Bestmove]

ミューテックスを使ってみたものの、スレッドからグローバル変数は見えないんだろうか?

    Position pos(DefaultStartPositionSFEN, threads.main(), thisptr);

じゃあ ポジションに持たせたらいいのか?

#ifdef UBUNTU
    Position pos(DefaultStartPositionSFEN, threads.main(), thisptr, name_enqueue, lifeSpan_enqueue);
#else
    Position pos(DefaultStartPositionSFEN, threads.main(), thisptr);
#endif
    // (2017-03-20 Add begin)
#ifdef UBUNTU
    Position(const std::string& sfen, Thread* th, Searcher* s, std::string name_enqueue, service_io::LifeSpan_t lifeSpan_enqueue) {
        set(sfen, th);
        setSearcher(s);
        this->name_enqueue = name_enqueue;
        this->lifeSpan_enqueue = lifeSpan_enqueue;
    }
    std::string name_enqueue;
    service_io::LifeSpan_t lifeSpan_enqueue;
#endif
    // (2017-03-20 Add end)
template <NodeType NT>
Score Searcher::search(Position& pos, SearchStack* ss, Score alpha, Score beta, const Depth depth, const bool cutNode) {

    // (2017-03-20 Add begin)
#ifdef  UBUNTU
    // ここは別スレッドのはず。
    service_io::SetQueueName(service_io::ENQUEUE_INDEX, pos.name_enqueue);
    service_io::SetQueueLifeSpan(service_io::ENQUEUE_INDEX, pos.lifeSpan_enqueue);
#endif //  UBUNTU
    // (2017-03-20 Add end)

こんなんでいいのだろうか?

13:53:50'186 (>_<)ukamuse_sdt4_child4 (P)___ WKP.SM_ queue name is empty=[] ENQUEUE_INDEX=[0] lifeSpan=[0] hint=[BestmoveAndPonder]

変わり無し。

Thread::Thread(Searcher* s) {
    searcher = s;
    resetCalls = exit = false;
    maxPly = callsCnt = 0;
    history.clear();
    counterMoves.clear();
    idx = s->threads.size();

    std::unique_lock<Mutex> lock(mutex);
    searching = true;
    nativeThread = std::thread(&Thread::idleLoop, this);
    sleepCondition.wait(lock, [&] { return !searching; });
}

スレッドが実行されているのは、std::thread のコンストラクタではないだろうか。
this を実行するというのは どのメソッドが実行されるのか?

void Thread::idleLoop() {
    while (!exit) {
        std::unique_lock<Mutex> lock(mutex);
        searching = false;
        while (!searching && !exit) {
            sleepCondition.notify_one();
            sleepCondition.wait(lock);
        }
        lock.unlock();
        if (!exit)
            search();
    }
}

この関数は別スレッドだろうか? アイドル・ループだが。

グローバル変数のように使っている search

search.hpp

// (2017-03-20 Add begin)
#ifdef UBUNTU
namespace service_io
{
    // AMQP-CPPでの実装 :
    // AMQP::durable=[1] RabbitMQが止まってもキューを残す
    // AMQP::autodelete=[2] コンシューマーが1人も接続していなかったら消す
    // AMQP::passive=[8] キューが存在するかどうかチェックするだけ。中身見ない時これ
    // AMQP::exclusive=[512] この接続でだけ使える。この接続が切れたら消す
    typedef int LifeSpan_t;

    // 0 : enqueue用設定
    // 1 : dequeue用設定
    static const int ENQUEUE_INDEX = 0;
    static const int DEQUEUE_INDEX = 1;
    static const int NUM_INDEX = 2;
    static LifeSpan_t LifeSpanString_To_Int(std::string lifeSpan)
    {
        if ("durable" == lifeSpan) {
            return AMQP::durable;
        }
        else if ("autodelete" == lifeSpan) {
            return AMQP::autodelete;
        }
        else if ("passive" == lifeSpan) {
            return AMQP::passive;
        }
        else if ("exclusive" == lifeSpan) {
            return AMQP::exclusive;
        }
        else {
            std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
            exit(11);
        }
    }
}
#endif
// (2017-03-20 Add end)

search.hpp

    // (2017-03-20 Add begin)
#ifdef UBUNTU
    STATIC std::string name_queues_[] = { "", "" };
    STATIC std::mutex _mutex_queueName;
    STATIC void SetQueueName(int index, std::string value) {
        std::lock_guard<std::mutex> lock(_mutex_queueName);
        name_queues_[index] = value;
    }
    STATIC std::string GetQueueName(int index) {
        std::lock_guard<std::mutex> lock(_mutex_queueName);
        return name_queues_[index];
    }
    STATIC LifeSpan_t lifeSpan_queues_[] = { (LifeSpan_t)0, (LifeSpan_t)0 };
    STATIC std::mutex _mutex_lifeSpan;
    STATIC void SetQueueLifeSpan(int index, LifeSpan_t value) {
        std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
        lifeSpan_queues_[index] = value;
    }
    STATIC LifeSpan_t GetQueueLifeSpan(int index) {
        std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
        return lifeSpan_queues_[index];
    }
#endif
    // (2017-03-20 Add end)

移動。

usi.cpp

void Searcher::doUSICommandLoop(int argc, char* argv[]) {
#if UBUNTU
    // 引数の解析
    std::string             name_enqueue = "";
    service_io::LifeSpan_t  lifeSpan_enqueue = 0;
    std::string             name_dequeue = "";
    service_io::LifeSpan_t  lifeSpan_dequeue = 0;
    {
        if (!service_io::parseArgs(argc, argv, name_enqueue, lifeSpan_enqueue, name_dequeue, lifeSpan_dequeue))
        {
            exit(12);
        }
        this->SetQueueName    (service_io::ENQUEUE_INDEX, name_enqueue);
        this->SetQueueLifeSpan(service_io::ENQUEUE_INDEX, lifeSpan_enqueue);
        this->SetQueueName    (service_io::DEQUEUE_INDEX, name_dequeue);
        this->SetQueueLifeSpan(service_io::DEQUEUE_INDEX, lifeSpan_dequeue);
    }

    // デキューの常時監視をスタート
    auto th1 = std::thread([] { service_io::workConsume(this); });
#endif

こうか?

service_io.hpp

    // メッセージ・キューの監視を開始
    static void workConsume(Searcher& searcher)
    {
        std::string name_dequeue = searcher.GetQueueName(DEQUEUE_INDEX);
        LifeSpan_t lifeSpan_dequeue = searcher.GetQueueLifeSpan(DEQUEUE_INDEX);

        // I will go to the front of the box named "1111".
        getChannel(DEQUEUE_INDEX)->declareQueue(name_dequeue, lifeSpan_dequeue);

        // I look inside the box.
        auto errorCb = [&lifeSpan_dequeue](const char *errMsg) {
            std::cerr << getTimeslipNow() << "(>_<)cpp ___(C) CNS.onError My ID watching failed [" << errMsg << "] lifeSpan_dequeue=[" << lifeSpan_dequeue << "]" << std::endl;
        };
        auto messageCb = [](const AMQP::Message &amqpMessage, uint64_t deliveryTag, bool redelivered) {
            std::string message(amqpMessage.body(), amqpMessage.bodySize());

            rotationBuffer::push_block(message);

            getChannel(DEQUEUE_INDEX)->ack(deliveryTag);
        };
        getChannel(DEQUEUE_INDEX)->consume(name_dequeue)
            .onReceived(messageCb)
            .onError(errorCb);

        // I will keep on forever.
        ev_run(pLoop_ev[DEQUEUE_INDEX]);

        // I will not come here.
        return;
    }

こうか?

#define SERVIO_END_FLUSH(searcher,hint) std::endl, service_io::flushMessage(searcher,hint)

こうか?

    // メッセージキューにエンキュー
    // 出力ストリームのカンマ区切り「,,,;」で使われ、何か数字を返す必要があって必然性はない数字 0 を返している
    static int flushMessage(Searcher& searcher, std::string hint)
    {
        std::string message = service_io::oss.str();
        oss.str(std::string());// 空文字列を入れてクリアーする

        std::string name_enqueue = searcher->GetQueueName(ENQUEUE_INDEX);
        LifeSpan_t lifeSpan_enqueue = searcher->GetQueueLifeSpan(ENQUEUE_INDEX);
        if ("" == name_enqueue)
        {
            std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.SM_ queue name is empty=[" << name_enqueue << "] ENQUEUE_INDEX=[" << ENQUEUE_INDEX << "] lifeSpan=[" << lifeSpan_enqueue << "] hint=[" << hint << "]" << "\n";
            exit(1);
        }

        // チャンネルを宣言
        getChannel(ENQUEUE_INDEX)->declareQueue(name_enqueue, lifeSpan_enqueue)
            .onError([=](const char* errMsg) {
                std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.SM_.declareQueue.onError [" << errMsg << "] queue name=[" << name_enqueue << "] lifeSpan=[" << lifeSpan_enqueue << "]" << "\n";
            });

        // チャンネルを束縛
        getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_enqueue, routing_key)
            // ev_run から1回だけ呼び出される。コミットを開始してこのハンドラから抜けること。コミットの成功時に接続を切ることでev_runから抜ける。
            .onSuccess([=]() {//&message

                // トランザクション開始
                getChannel(ENQUEUE_INDEX)->startTransaction();
                // パブリッシュ開始
                if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {

                    // コミットを開始
                    getChannel(ENQUEUE_INDEX)->commitTransaction()
                        .onSuccess([=]() {
                            // コミット成功

                            // 接続を切ったあと、トランザクションのコミットが実行されるようだ。
                            closeConnection(ENQUEUE_INDEX);
                        })
                        .onError([=](const char *errMsg) {//&message
                            std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
                        });
                }
                else {
                    std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
                    getChannel(ENQUEUE_INDEX)->rollbackTransaction();
                }

            }).onError([](const char* errMsg) {
                std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
            });

        // ここで、バインドキューのオンサクセスの処理へ。
        ev_run(pLoop_ev[ENQUEUE_INDEX], 0);

        return 0;
    }

こうか?

「今日はまったエラー」(0と1の欠片)
http://blog.livedoor.jp/cielo_cielo/archives/64881104.html

search.cpp の上の方に

#if defined USE_GLOBAL
SignalsType Searcher::signals;
LimitsType Searcher::limits;
StateListPtr Searcher::states;
TimeManager Searcher::timeManager;
TranspositionTable Searcher::tt;
#if defined INANIWA_SHIFT
InaniwaFlag Searcher::inaniwaFlag;
#endif
ThreadPool Searcher::threads;
OptionsMap Searcher::options;
EasyMoveManager Searcher::easyMove;
Searcher* Searcher::thisptr;
// (2017-03-20 Add begin)
#ifdef UBUNTU
ServiceIo2 Searcher::serviceIo2;
#endif
// (2017-03-20 Add end)
#endif

という塊があるんだが、こういう宣言してないと使えないのな。

# jobs
[3]-  Running                 rabbitmq-server &  (wd: /home/csg10/shogi/csharp_service)
[4]+  Segmentation fault      ./apery --enqueue 1113 durable --dequeue 1112 durable autodelete > ./apery.out.log 2> ./apery.err.log < /dev/null

セグメンテーション・フォールトって何だっけ? メモリ・リークか何かか?

search.cpp

void Searcher::init() {
#if defined USE_GLOBAL
#else
    thisptr = this;
#endif
    options.init(thisptr);
    threads.init(thisptr);
    tt.resize(options["USI_Hash"]);
// (2017-03-20 Add begin)
#ifdef UBUNTU
    name_queues_[] = { "", "" };
    lifeSpan_queues_[] = { (service_io::LifeSpan_t)0, (service_io::LifeSpan_t)0 };
#endif
// (2017-03-20 Add end)
}

初期化とか書いたらいいのだろうか?
文法エラー。

// (2017-03-20 Add begin)
#ifdef UBUNTU
    thisptr->serviceIo2.name_queues_[service_io::ENQUEUE_INDEX] = "";
    thisptr->serviceIo2.name_queues_[service_io::DEQUEUE_INDEX] = "";
    thisptr->serviceIo2.lifeSpan_queues_[service_io::ENQUEUE_INDEX] = (service_io::LifeSpan_t)0;
    thisptr->serviceIo2.lifeSpan_queues_[service_io::DEQUEUE_INDEX] = (service_io::LifeSpan_t)0;
#endif
// (2017-03-20 Add end)

こうか?

# jobs
[3]-  Running                 rabbitmq-server &  (wd: /home/csg10/shogi/csharp_service)
[4]+  Segmentation fault      ./apery --enqueue 1113 durable --dequeue 1112 durable autodelete > ./apery.out.log 2> ./apery.err.log < /dev/null

原因が全く分からん。

// (2017-03-20 Add begin)
#ifdef UBUNTU
    serviceIo2.name_queues_[service_io::ENQUEUE_INDEX] = "";
    serviceIo2.name_queues_[service_io::DEQUEUE_INDEX] = "";
    serviceIo2.lifeSpan_queues_[service_io::ENQUEUE_INDEX] = (service_io::LifeSpan_t)0;
    serviceIo2.lifeSpan_queues_[service_io::DEQUEUE_INDEX] = (service_io::LifeSpan_t)0;
#endif
// (2017-03-20 Add end)

これでどうか?

// (2017-03-20 Add begin)
#ifdef UBUNTU
class ServiceIo2
{
public:
        std::string name_queues_[];// = { "", "" };
        std::mutex _mutex_queueName;
        service_io::LifeSpan_t lifeSpan_queues_[];// = { (service_io::LifeSpan_t)0, (service_io::LifeSpan_t)0 };
        std::mutex _mutex_lifeSpan;

        void SetQueueName(int index, std::string value);
        std::string GetQueueName(int index);
        void SetQueueLifeSpan(int index, service_io::LifeSpan_t value);
        service_io::LifeSpan_t GetQueueLifeSpan(int index);
};
#endif
// (2017-03-20 Add end)

配列のサイズとか 決めなくていいのか?

// (2017-03-20 Add begin)
#ifdef UBUNTU
class ServiceIo2
{
public:
    std::string name_queues_[2];// = { "", "" };
    std::mutex _mutex_queueName;
    service_io::LifeSpan_t lifeSpan_queues_[2];// = { (service_io::LifeSpan_t)0, (service_io::LifeSpan_t)0 };
    std::mutex _mutex_lifeSpan;

    void SetQueueName(int index, std::string value);
    std::string GetQueueName(int index);
    void SetQueueLifeSpan(int index, service_io::LifeSpan_t value);
    service_io::LifeSpan_t GetQueueLifeSpan(int index);
};
#endif
// (2017-03-20 Add end)

これでどうか?

# jobs
[3]-  Running                 rabbitmq-server &  (wd: /home/csg10/shogi/csharp_service)
[4]+  Running                 ./apery --enqueue 1113 durable --dequeue 1112 durable autodelete > ./apery.out.log 2> ./apery.err.log < /dev/null &

セグメンテーション・フォールトは取れたか?

tamesi26a4.php

長くなったので次の記事へ

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1