LoginSignup
0
1

More than 5 years have passed since last update.

コンピュータ将棋ソフトとの対局サーバーを立てよう<その10>

Last updated at Posted at 2017-03-13

前回の記事 : http://qiita.com/muzudho1/items/473fc0fda3e38300b820

バックグラウンドで割込み待ち停止しているプロセスと、プロセス間通信はできるのだろうか?

前に実験した気がするんだが どの記事だったろうか。

前回の記事だった。

「コンピュータ将棋ソフトとの対局サーバーを立てよう<その8>」(Qiita)
http://qiita.com/muzudho1/items/798dfbf267340f3801e1#_reference-1a747e743b7ff0dbb050

このときは C# / RabbitQM だったが、今度は C++ / AMQP-CPP でやりたい。

じゃあ

tamesi33a1_cpp.cpp、tamesi33a2_cpp.cpp のテストを作ろう。

テストケース tamesi33 C++版

a1 : (バックグラウンドに回さない)キーボード入力で、メッセージキュー「1112」にメッセージを送れるものとする。常時 メッセージキュー「1113」を監視し、メッセージが入っていれば 画面に出すものとする。

このプログラムは 必要になったら実行し、ログアウトしたら消えているものと想定する。

a2 : (バックグラウンドに回す)常時 メッセージキュー「1112」を監視し、メッセージが入っていれば その文字列に「daze」と付けて メッセージキュー「1113」にエンキューする。

このプログラムは ログアウトしても稼働しているものとする。

これで a2 が常時 稼働していて、a1 でアクセスできるのなら よしとする。

a2

a2 だけ作ればいいや……。

ソースを見ていたんだが

// I will keep on forever.
ev_run(loop, 0);

デキューを監視している処理があって、ここでブロックされている。
だからエンキューのプログラムまで行ってない。

どうするか。

C++ でスレッドを走らせるには?

デキューの監視は別スレッドでやればいいのだろうか?

「C++で簡単非同期処理(std::thread,std::async)」(Qiita)
http://qiita.com/termoshtt/items/d3cb7fe226cdd498d2ef

Visual Studio 2015 設定

Release x64 にして プロパティ画面からプリコンパイル・ヘッダーの使用を無しにする。

#include "stdafx.h"

といった記述も削除する。

  • stdafx.h
  • targetver.h
  • stdafx.cpp

の3ファイルは削除。

コンパイルは通らないが、Ubuntu の方へソースを移動させる。

# g++ -std=c++11 tamesi33a2_cpp.cpp -o tamesi33a2_cpp.exe -lev -lamqpcpp -pthread
tamesi33a2_cpp.cpp: In lambda function:
tamesi33a2_cpp.cpp:95:46: error: ‘bgp_workConsume’ was not declared in this scope
  auto th1 = std::thread([] { bgp_workConsume(); });
                                              ^
tamesi33a2_cpp.cpp: In function ‘int main()’:
tamesi33a2_cpp.cpp:104:20: error: ‘class AMQP::TcpConnection’ has no member named ‘isConnected’
  while (connection.isConnected())
                    ^
tamesi33a2_cpp.cpp:113:40: error: ‘enqueue’ was not declared in this scope
    enqueue(connection, channel, message);
                                        ^
tamesi33a2_cpp.cpp: In function ‘void enqueue(AMQP::TcpConnection&, AMQP::TcpChannel&, std::__cxx11::string)’:
tamesi33a2_cpp.cpp:166:9: error: ‘loop’ was not declared in this scope
  ev_run(loop);
         ^
tamesi33a2_cpp.cpp: In function ‘void bgp_workConsume()’:
tamesi33a2_cpp.cpp:222:18: error: ‘bgp_queueName_dequeue’ was not declared in this scope
  channel.consume(bgp_queueName_dequeue)
                  ^

エラーが出た。

//auto* loop = EV_DEFAULT;
struct ev_loop* loop = EV_DEFAULT;

auto を使わない場合は EV_DEFAULT の型は こう書くのか。

「class-TcpConnection」(docsforge.com)
http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/

ここでソースが読める。

# g++ -std=c++11 tamesi33a2_cpp.cpp -o tamesi33a2_cpp.exe -lev -lamqpcpp -pthread
tamesi33a2_cpp.cpp: In lambda function:
tamesi33a2_cpp.cpp:95:46: error: ‘bgp_workConsume’ was not declared in this scope
  auto th1 = std::thread([] { bgp_workConsume(); });
                                              ^
tamesi33a2_cpp.cpp: In function ‘int main()’:
tamesi33a2_cpp.cpp:114:46: error: ‘enqueue’ was not declared in this scope
    enqueue(loop, connection, channel, message);
                                              ^

残りのエラーは何をしたらいいのか。 scope とか書いてるな。

関数のプロトタイプ宣言を試してみるか。
おっけ。

サンプル・プログラム

// OS      : Windows10 : // このプログラムは動かない
//
// OS      : Ubuntu 16.04
// Library : libev
//         : Install   : Command  : sudo apt-get update
//                                : sudo apt-get install libev-dev
// Service : RabbitMQ
//         : Reference : Web site : Top page http://www.rabbitmq.com/
//         : Install   : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
//         : Manual    : Command  : man rabbitmqctl
//         : Start     : Command  : rabbitmq-server
//         : Stop      : Command  : rabbitmqctl stop
//         : Check     : Command  : rabbitmqctl status
//         :           : Command  : rabbitmqctl list_queues
// Library : AMQP-CPP
//         : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
//         : Reference : Web site : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
//         : Reference : Web site : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
//         : Reference : Web site : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
//         : Reference : Web site : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
//         : Reference : Web site : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//
// Program : this
//         : Compile   : Command  : g++ -std=c++11 tamesi33a2_cpp.cpp -o tamesi33a2_cpp.exe -lev -lamqpcpp -pthread
//         : Execute   : Command  : // バックグラウンドで実行
//                                : ./tamesi33a2_cpp.exe &
//         : Stop      : Typing   : [Ctrl]+[C]
//
// メッセージの末尾に daze を付けます。
// デキューは「1112」、エンキューは「1113」キューに向けて行います。
// 標準入出力は、キーボード、画面には接続しないようにします。

#include <string> // std::string
#include <iostream> // std::cout
#include <sstream> // std::ostringstream
#include <chrono>
#include <thread> // sleep_for
#include <future> // thread

// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
// 0 : enqueue用設定
// 1 : dequeue用設定
// 2 : deletequeue用設定
static const int ENQUEUE_INDEX = 0;
static const int DEQUEUE_INDEX = 1;
static const int NUM_INDEX = 2;
static std::string name_queues[] = { "1113", "1112" };
static std::string lifeSpan_queues[] = { "duration", "duration" };

static void workConsume();
static void enqueue(struct ev_loop* loop, AMQP::TcpConnection& connection, AMQP::TcpChannel& channel, std::string message);

/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
    const int bufferSize = 100;
    static std::string buffer[bufferSize] = {};
    static int bufferCursors[2] = { 0, 0 };
    const int PUT_INDEX = 0;
    const int GET_INDEX = 1;
    static void putMessage(std::string message)
    {
        buffer[bufferCursors[PUT_INDEX]] = message;
        bufferCursors[PUT_INDEX]++;
        if (!(bufferCursors[PUT_INDEX] < bufferSize))
        {
            bufferCursors[PUT_INDEX] = 0;
        }
    }
    static std::string getMessage()
    {
        if ("" != buffer[bufferCursors[GET_INDEX]])
        {
            std::string message = buffer[bufferCursors[GET_INDEX]];
            buffer[bufferCursors[GET_INDEX]] = "";
            bufferCursors[GET_INDEX]++;
            if (!(bufferCursors[GET_INDEX] < bufferSize))
            {
                bufferCursors[GET_INDEX] = 0;
            }
            return message;
        }
        return "";
    }
}

int main()
{
    // デキューの常時監視をスタート
    auto th1 = std::thread([] { workConsume(); });

    // エンキュー用の接続
    //auto* loop = EV_DEFAULT;
    struct ev_loop* loop = EV_DEFAULT;
    AMQP::LibEvHandler handler{ loop };
    AMQP::TcpConnection connection{ &handler, ADDRESS };
    AMQP::TcpChannel channel{ &connection };

    // 無限ループ
    for (;;)
    {
        std::string message = rotationBuffer::getMessage();
        if ("" != message)
        {
            // 末尾に daze を付ける。
            message += "daze";

            // エンキューする
            enqueue(loop, connection, channel, message);
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(20));
    }

    // このプログラムは、自分では接続を切らない。

    // th1 スレッドの終了を待つ
    th1.join();
    return 0;
}

// メッセージキューにエンキュー
static void enqueue(struct ev_loop* loop, AMQP::TcpConnection& connection, AMQP::TcpChannel& channel, std::string message)
{
    std::string exchange_name = "myexchange";
    std::string routing_key = "";

    int lifeSpan = 0;
    if ("durable" == lifeSpan_queues[ENQUEUE_INDEX])
    {
        lifeSpan = AMQP::durable;
    }
    else if ("autodelete" == lifeSpan_queues[ENQUEUE_INDEX])
    {
        lifeSpan = AMQP::autodelete;
    }
    else if ("passive" == lifeSpan_queues[ENQUEUE_INDEX])
    {
        lifeSpan = AMQP::passive;
    }
    else if ("exclusive" == lifeSpan_queues[ENQUEUE_INDEX])
    {
        lifeSpan = AMQP::exclusive;
    }

    channel.declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
        .onError([](const char* errMsg) {
        std::cerr << "error declaring queue: " << errMsg << "\n";
    });

    channel.bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
        .onSuccess([&connection, &channel, &exchange_name, &routing_key, &message]() {

        if (!channel.publish(exchange_name, routing_key, message.c_str(), message.size())) {
            std::cerr << "failed to publish?\n";
        }

        // break in ev loop.
        connection.close();
    });

    // このループは、パブリッシュ後に抜ける
    ev_run(loop);
}

// 受信できたときに割り込んでくる処理
// startConsume() しておくこと。
static std::string dequeue() {
    std::string message;

    while ("" == message)
    {
        message = rotationBuffer::getMessage();
    }

    return message;
}

// メッセージ・キューの監視を開始
static void workConsume()
{
    // Connect to the AMQP service.
    auto *loop = EV_DEFAULT;
    AMQP::LibEvHandler handler(loop);
    AMQP::TcpConnection connection(&handler, ADDRESS);
    AMQP::TcpChannel channel(&connection);

    // I will go to the front of the box named "1111".
    int lifeSpan = 0;
    if ("durable" == lifeSpan_queues[DEQUEUE_INDEX])
    {
        lifeSpan = AMQP::durable;
    }
    else if ("autodelete" == lifeSpan_queues[DEQUEUE_INDEX])
    {
        lifeSpan = AMQP::autodelete;
    }
    else if ("passive" == lifeSpan_queues[DEQUEUE_INDEX])
    {
        lifeSpan = AMQP::passive;
    }
    else if ("exclusive" == lifeSpan_queues[DEQUEUE_INDEX])
    {
        lifeSpan = AMQP::exclusive;
    }
    channel.declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);

    // I look inside the box.
    auto errorCb = [](const char *errMsg) {
        std::cerr << "My ID watching failed [" << errMsg << "]" << std::endl;
    };
    auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {

        std::string myString(message.body(), message.bodySize());
        rotationBuffer::putMessage(myString);

        channel.ack(deliveryTag);
    };
    channel.consume(name_queues[DEQUEUE_INDEX])
        .onReceived(messageCb)
        .onError(errorCb);

    // I will keep on forever.
    ev_run(loop, 0);

    // I will not come here.
    return;
}

これでコンパイルは通る。

これをバックグラウンドで実行する。

./tamesi33a2_cpp.exe &
# ./tamesi33a1_cs.exe
選べだぜ☆(^~^)
1   : メッセージ   エンキュー
2   : メッセージ   デキュー
quit: 終了

1
メッセージを書いて[Enter]キーを押せだぜ☆(^~^)
tako
 Enqueue(^q^) tako
選べだぜ☆(^~^)
1   : メッセージ   エンキュー
2   : メッセージ   デキュー
quit: 終了

2
 Please, wait dequeue... Or Press any key to menu.
 [interrupt!] Dequeue(^q^) takodaze

takodaze が返ってきた。C# → CPP → C# は おっけ!

C++で スレッドを書いたのが動いているんだろう。

浮かむ瀬のデキュー監視も別スレッドで動かそう

上のサンプル・プログラムを見て書き換えていこう。

名前を変えたい

gameserver.hpp を service_io.hpp に名前を変えて、名前空間 service_io を作りたい。
その名前空間の中の関数は service_io::enqueue みたいな感じで使える感じで。

通常の標準入出力が壊れていないか確認

 対局
    ====
        usi
            usiok
        isready
            readyok
        usinewgame
        position sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves
        go
            例) bestmove 2g2f ponder 1c1d
        quit

壊れてはない。

改造したソースコードを一応貼り付けておく。

service_io.hpp

#pragma once

#include <string> // std::string
#include <iostream> // std::cout
#include <sstream> // std::ostringstream
#include "common.hpp"


// #define UBUNTU


// バックグラウンド・プロセス用の出力
// #define BGP_COUT std::cout
// #define BGP_ENDL std::endl
#define BGP_COUT service_io::oss
#define BGP_ENDL BGP_ENDL1, BGP_ENDL2
#define BGP_ENDL1 std::endl
#define BGP_ENDL2 service_io::enqueue (service_io::oss.str())

namespace service_io
{
    // 文字列ストリーム出力を、文字列に置換するもの
    static std::ostringstream oss;
}


#ifdef UBUNTU
// OS      : Ubuntu 16.04
// Library : libev
//         : Install   : Command  : sudo apt-get update
//                                : sudo apt-get install libev-dev
// Service : RabbitMQ
//         : Reference : Web site : Top page http://www.rabbitmq.com/
//         : Install   : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
//         : Manual    : Command  : man rabbitmqctl
//         : Start     : Command  : rabbitmq-server
//         : Stop      : Command  : rabbitmqctl stop
//         : Check     : Command  : rabbitmqctl status
//         :           : Command  : rabbitmqctl list_queues
// Library : AMQP-CPP
//         : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
//
// service_io.hpp

// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

// バックグラウンド・プロセスに切り替えて使用する可能性も考慮した入出力API
namespace service_io
{
    static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
    // 0 : enqueue用設定
    // 1 : dequeue用設定
    static const int ENQUEUE_INDEX = 0;
    static const int DEQUEUE_INDEX = 1;
    static const int NUM_INDEX = 2;
    static std::string name_queues[] = { "1113", "1112" };
    static std::string lifeSpan_queues[] = { "duration", "duration" };

    /// <summary>
    /// 回転式バッファー。
    /// これはメイン・スレッドに置く。
    /// デキューのスレッドでエンキューすることはできない。
    /// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
    /// </summary>
    namespace rotationBuffer
    {
        const int bufferSize = 100;
        static std::string buffer[bufferSize] = {};
        static int bufferCursors[2] = { 0, 0 };
        const int PUT_INDEX = 0;
        const int GET_INDEX = 1;
        static void putMessage(std::string message)
        {
            buffer[bufferCursors[PUT_INDEX]] = message;
            bufferCursors[PUT_INDEX]++;
            if (!(bufferCursors[PUT_INDEX] < bufferSize))
            {
                bufferCursors[PUT_INDEX] = 0;
            }
        }
        static std::string getMessage()
        {
            if ("" != buffer[bufferCursors[GET_INDEX]])
            {
                std::string message = buffer[bufferCursors[GET_INDEX]];
                buffer[bufferCursors[GET_INDEX]] = "";
                bufferCursors[GET_INDEX]++;
                if (!(bufferCursors[GET_INDEX] < bufferSize))
                {
                    bufferCursors[GET_INDEX] = 0;
                }
                return message;
            }
            return "";
        }
    }

    // メッセージキューにエンキュー
    // 出力ストリームのカンマ区切り「,,,;」で使われ、何か数字を返す必要があって必然性はない数字 0 を返している
    static int enqueue(std::string message) {

        auto* loop = EV_DEFAULT;

        AMQP::LibEvHandler handler{ loop };
        AMQP::TcpConnection connection{ &handler, ADDRESS };
        AMQP::TcpChannel channel{ &connection };

        std::string exchange_name = "myexchange";
        std::string routing_key = "";

        int lifeSpan = 0;
        if ("durable" == lifeSpan_queues[ENQUEUE_INDEX])
        {
            lifeSpan = AMQP::durable;
        }
        else if ("autodelete" == lifeSpan_queues[ENQUEUE_INDEX])
        {
            lifeSpan = AMQP::autodelete;
        }
        else if ("passive" == lifeSpan_queues[ENQUEUE_INDEX])
        {
            lifeSpan = AMQP::passive;
        }
        else if ("exclusive" == lifeSpan_queues[ENQUEUE_INDEX])
        {
            lifeSpan = AMQP::exclusive;
        }

        channel.declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
            .onError([](const char* errMsg) {
            std::cerr << "error declaring queue: " << errMsg << "\n";
        });

        channel.bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
            .onSuccess([&connection, &channel, &exchange_name, &routing_key, &message]() {

            if (!channel.publish(exchange_name, routing_key, message.c_str(), message.size())) {
                std::cerr << "failed to publish?\n";
            }

            // break in ev loop.
            connection.close();
        });

        // We will monitor until the connection is lost. Execute channel.declareQueue( ... ).
        ev_run(loop);

        return 0;
    }

    // service_io::workConsume() を別スレッドで走らせておくこと。
    static std::string dequeue() {
        std::string message;

        while ("" == message)
        {
            message = rotationBuffer::getMessage();
        }

        return message;
    }

    // メッセージ・キューの監視を開始
    static void workConsume()
    {
        // Connect to the AMQP service.
        auto *loop = EV_DEFAULT;
        AMQP::LibEvHandler handler(loop);
        AMQP::TcpConnection connection(&handler, ADDRESS);
        AMQP::TcpChannel channel(&connection);

        // I will go to the front of the box named "1111".
        int lifeSpan = 0;
        if ("durable" == lifeSpan_queues[DEQUEUE_INDEX])
        {
            lifeSpan = AMQP::durable;
        }
        else if ("autodelete" == lifeSpan_queues[DEQUEUE_INDEX])
        {
            lifeSpan = AMQP::autodelete;
        }
        else if ("passive" == lifeSpan_queues[DEQUEUE_INDEX])
        {
            lifeSpan = AMQP::passive;
        }
        else if ("exclusive" == lifeSpan_queues[DEQUEUE_INDEX])
        {
            lifeSpan = AMQP::exclusive;
        }
        channel.declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);

        // I look inside the box.
        auto errorCb = [](const char *errMsg) {
            std::cerr << "My ID watching failed [" << errMsg << "]" << std::endl;
        };
        auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {

            std::string myString(message.body(), message.bodySize());
            rotationBuffer::putMessage(myString);

            channel.ack(deliveryTag);
        };
        channel.consume(name_queues[DEQUEUE_INDEX])
            .onReceived(messageCb)
            .onError(errorCb);

        // I will keep on forever.
        ev_run(loop, 0);

        // I will not come here.
        return;
    }
}

#else

// バックグラウンド・プロセスに切り替えて使用する可能性も考慮した入出力API
namespace service_io
{
    static int enqueue(std::string message) {
        std::cout << IOLock << message << std::endl << IOUnlock;
        oss.str(std::string());// 空文字列を入れてクリアーする
        return 0;
    }

    static std::string dequeue() {
        std::string cmd;
        if (!std::getline(std::cin, cmd))
        {
            cmd = "quit";
        }
        return cmd;
    }
}
#endif

class ServiceIo
{
public:
    ServiceIo();
    ~ServiceIo();
};

service_io.cpp

//#include "stdafx.h"
#include "service_io.hpp"

ServiceIo::ServiceIo()
{
}

ServiceIo::~ServiceIo()
{
}

usi.cpp 抜粋

void Searcher::doUSICommandLoop(int argc, char* argv[]) {

#if UBUNTU
    // デキューの常時監視をスタート
    auto th1 = std::thread([] { service_io::workConsume(); });
#endif

    bool dialogue = false;
    // 「./apery」のように引数無しで打鍵して実行された場合に限り、なんども標準入力を待つ。
    if (argc == 1)
    {
        dialogue = true;
    }
    bool evalTableIsRead = false;
    Position pos(DefaultStartPositionSFEN, threads.main(), thisptr);

    std::string cmd;
    std::string token;

    for (int i = 1; i < argc; ++i)
    {
        cmd += std::string(argv[i]) + " ";
    }

    do {
        // 文字が打たれたらその文字を、空文字を打たれたら "quit" 扱いとする。
        if (dialogue)
        {
            cmd = service_io::dequeue();
        }

        // 半角スペース区切りの最初の1トークンを拾う
        std::istringstream ssCmd(cmd);
        ssCmd >> std::skipws >> token;

        if (token == "--msgqueue") {
#if UBUNTU
            ssCmd >> service_io::name_queues[ENQUEUE_INDEX];
            ssCmd >> service_io::lifeSpan_queues[ENQUEUE_INDEX];
            ssCmd >> service_io::name_queues[DEQUEUE_INDEX];
            ssCmd >> service_io::lifeSpan_queues[DEQUEUE_INDEX];
            dialogue = true;
#else
            std::cerr << "Command line parameter error : \"--msgqueue\" option for UBUNTU.";
#endif
        }
        else if (token == "quit" || token == "stop" || token == "ponderhit" || token == "gameover") {
            if (token != "ponderhit" || signals.stopOnPonderHit) {
                signals.stop = true;
                threads.main()->startSearching(true);
            }
            else
                limits.ponder = false;
            if (token == "ponderhit" && limits.moveTime != 0)
                limits.moveTime += timeManager.elapsed();
        }
        else if (token == "go") go(pos, ssCmd);
        else if (token == "position") setPosition(pos, ssCmd);
        else if (token == "usinewgame"); // isready で準備は出来たので、対局開始時に特にする事はない。
        else if (token == "usi") SYNCCOUT << "id name " << std::string(options["Engine_Name"])
            << "\nid author Hiraoka Takuya"
            << "\n" << options
            << "\nusiok" << SYNCENDL;
        else if (token == "isready") { // 対局開始前の準備。
            tt.clear();
            threads.main()->previousScore = ScoreInfinite;
            if (!evalTableIsRead) {
                // 一時オブジェクトを生成して Evaluator::init() を呼んだ直後にオブジェクトを破棄する。
                // 評価関数の次元下げをしたデータを格納する分のメモリが無駄な為、
                std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
                evalTableIsRead = true;
            }
            SYNCCOUT << "readyok" << SYNCENDL;
        }
        else if (token == "setoption") setOption(ssCmd);
        else if (token == "write_eval") { // 対局で使う為の評価関数バイナリをファイルに書き出す。
            if (!evalTableIsRead)
                std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
            Evaluator::writeSynthesized(Evaluator::evalDir);
        }
#if defined LEARN
        else if (token == "l") {
            auto learner = std::unique_ptr<Learner>(new Learner);
            learner->learn(pos, ssCmd);
        }
        else if (token == "make_teacher") {
            if (!evalTableIsRead) {
                std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
                evalTableIsRead = true;
            }
            make_teacher(ssCmd);
        }
        else if (token == "use_teacher") {
            if (!evalTableIsRead) {
                std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
                evalTableIsRead = true;
            }
            use_teacher(pos, ssCmd);
        }
        else if (token == "check_teacher") {
            check_teacher(ssCmd);
        }
        else if (token == "print") printEvalTable(SQ88, f_gold + SQ78, f_gold, false);
#endif
#if !defined MINIMUL
        // 以下、デバッグ用
        else if (token == "bench") {
            if (!evalTableIsRead) {
                std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
                evalTableIsRead = true;
            }
            benchmark(pos);
        }
        else if (token == "key") SYNCCOUT << pos.getKey() << SYNCENDL;
        else if (token == "tosfen") SYNCCOUT << pos.toSFEN() << SYNCENDL;
        else if (token == "eval") std::cout << evaluateUnUseDiff(pos) / FVScale << std::endl;
        else if (token == "d") pos.print();
        else if (token == "s") measureGenerateMoves(pos);
        else if (token == "t") std::cout << pos.mateMoveIn1Ply().toCSA() << std::endl;
        else if (token == "b") makeBook(pos, ssCmd);
#endif
        else                           SYNCCOUT << "unknown command: " << cmd << SYNCENDL;
    } while (token != "quit" && dialogue);

#if UBUNTU
    // th1 スレッドの終了を待つ
    th1.join();
#endif

    threads.main()->waitForSearchFinished();
}

これだけだったろうか?

find . \( -name \Makefile -o -name \*.hpp -o -name \*.cpp -o -name \*.sfen -o -name \*.txt \) -type f | xargs -n 10 nkf -Lu --overwrite

改行の変換はこれ。どっかにまとめておきたいが。

nano service_io.hpp

こうして、

 // #define UBUNTU

のコメントアウトを外す。

ヘッダ・ファイルの再読込み防止

In file included from bitboard.hpp:25:0,
                 from main.cpp:30:
service_io.hpp:9:0: warning: "UBUNTU" redefined
 #define UBUNTU
 ^

あー、ヘッダファイル何度も読込んでるかも。

#pragma once

C++11 じゃこれは効かないんだろうか?

#ifndef APERY_SERVICE_IO_HPP
#define APERY_SERVICE_IO_HPP

ファイルの冒頭にこう書いて、

#endif // #ifndef APERY_SERVICE_IO_HPP

ファイルの末尾にこう書こう。

いや、そうでは無くて Makefile に -DUBUNTU と書き足したんじゃなかったか。

service_io.hpp

// #define UBUNTU

やっぱ こうする。

usi.cpp: In static member function ‘static void Searcher::doUSICommandLoop(int, char**)’                    :
usi.cpp:1061:37: error: ‘ENQUEUE_INDEX’ was not declared in this scope
    ssCmd >> service_io::name_queues[ENQUEUE_INDEX];
                                     ^
usi.cpp:1061:37: note: suggested alternative:
In file included from usi.cpp:22:0:
service_io.hpp:57:19: note:   ‘service_io::ENQUEUE_INDEX’
  static const int ENQUEUE_INDEX = 0;
                   ^
usi.cpp:1063:37: error: ‘DEQUEUE_INDEX’ was not declared in this scope
    ssCmd >> service_io::name_queues[DEQUEUE_INDEX];
                                     ^
usi.cpp:1063:37: note: suggested alternative:
In file included from usi.cpp:22:0:
service_io.hpp:58:19: note:   ‘service_io::DEQUEUE_INDEX’
  static const int DEQUEUE_INDEX = 1;
                   ^

これ、ネームスペース抜けか?

そうだった。

コンパイルは通った。

# mv apery ../bin/apery
# cd ../bin
# ./apery --msgqueue 1114 durable 1115 durable &

1114 と、1115、まずは試しで 1112 と 1113 でいいか。

            a1              a2
                → 1112 →
                ← 1113 ←

                → 1115 →
                ← 1114 ←

図にすると、こうか。なんかすっきりしないな。

            a1              a2
                → 1112 →
                ← 1113 ←

                → 1114 →
                ← 1115 ←

こう変えよう。

# ./apery --msgqueue 1113 durable 1112 durable &
[2] 32479
# My ID watching failed [Channel is in error state]

エラーが返ってくるだけ、前進だな。

        auto errorCb = [](const char *errMsg) {
            std::cerr << "My ID watching failed [" << errMsg << "] lifeSpan_queues[" << DEQUEUE_INDEX << "]=[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl;
        };

こう書くと、

# ./apery --msgqueue 1113 durable 1112 durable &
[3] 32746
# My ID watching failed [Channel is in error state] lifeSpan_queues[1]=[durable]

こうなる。
チャンネルが何かダメなのか?

キューを変えてみるか。

# ./apery --msgqueue 1115 durable 1114 durable &
[4] 410

今度はエラー応答も返ってこない。じゃあアクセスしてみよう。

# jobs
[1]-  Running                 ./tamesi33a2_cpp.exe &  (wd: /home/★user/shogi/cpp_service)
[4]+  Running                 ./apery --msgqueue 1115 durable 1114 durable &

なんか上のやつが 干渉してないか?

# kill %1

テスト

# ./tamesi34_cs.exe
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力    : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1114
エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1115
デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
<---- [interrupt!] Dequeue(^q^) usi
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? >

なんか「usi」が飛んできたのか。既にキューに入ってたとか。
じゃあ「usi」を送ってみるか。

終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > usi
 Enqueue(^q^) usi
apery: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.
Enqueue? >

浮かむ瀬から エラーが返ってきた。ループが再帰的な実行で うーん?

エラーが返ってくるだけ進歩だな。

「Is that ev_run()/ev_loop() inhibited in multiple thread of current libev?」(http://lists.schmorp.de)
http://lists.schmorp.de/pipermail/libev/2011q4/001659.html

ev_run( ) が再帰中に開放しているのが検出されるというのは どういうことなのか。
再帰しているのか?

エンキューには次のように書いている。

        channel.bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
            .onSuccess([&connection, &channel, &exchange_name, &routing_key, &message]() {

            if (!channel.publish(exchange_name, routing_key, message.c_str(), message.size())) {
                std::cerr << "failed to publish?\n";
            }

            // break in ev loop.
            connection.close();
        });

connection.close(); で、たしかに 接続を開放 している。
じゃあ、デキューとエンキューで、接続は別に行った方がいいのか。

書き直す。

コネクションをクローズするタイミングをよく確認。

「AMQP-CPP/tests/libev.cpp」(CopernicaMarketingSoftware/AMQP-CPP)
https://github.com/CopernicaMarketingSoftware/AMQP-CPP/blob/master/tests/libev.cpp

onSuccess の中で切断してないか?

        channel.declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
            .onError([](const char* errMsg) {
                std::cerr << "error declaring queue: " << errMsg << "\n";
            });

        channel.bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
            .onSuccess([&connection, &channel, &exchange_name, &routing_key, &message]() {

                if (!channel.publish(exchange_name, routing_key, message.c_str(), message.size())) {
                    std::cerr << "failed to publish?\n";
                }

                // break in ev loop.
                connection.close();
            });

現状、onSuccess の中で切断しているが、bindQueue じゃなくて declareQueue の onSuccess の中で切断した方がいいんだろうか?

        channel.declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
            .onSuccess([&connection](const std::string &queueName, uint32_t messagecount, uint32_t consumercount) {
                std::cout << "declared queue onSuccess queueName=[" << queueName << "] messagecount=[" << messagecount << "] consumercount=[" << consumercount << "]" << std::endl;
                // break in ev loop.
                connection.close();
            })
            .onError([](const char* errMsg) {
                std::cerr << "error declaring queue: " << errMsg << "\n";
            });

        channel.bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
            .onSuccess([ &channel, &exchange_name, &routing_key, &message]() {
                // &connection,

                if (!channel.publish(exchange_name, routing_key, message.c_str(), message.size())) {
                    std::cerr << "failed to publish?\n";
                }

                //// break in ev loop.
                //connection.close();
            });

こんなんでいいんだろうか?

# ./tamesi34_cs.exe
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力    : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1114
エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1115
デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > usi
apery: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.
 Enqueue(^q^) usi

ダメなようだ。このエラーは何なのか。

# ps aux | grep apery
root      1582  0.0  0.0  12936   988 pts/8    S+   14:11   0:00 grep --color=auto apery

なんらかの理由で落ちていて、それでループを抜けているのだろうか?どこかにエラーログは吐かれていないだろうか?

# ls -l
total 4076096
drwxr-xr-x 2 root root       4096 Mar 12 07:09 20161007
-rwxr-xr-x 1 root root    1911208 Mar 13 14:04 apery
-rw-r--r-- 1 root root       1531 Mar 12 05:48 benchmark.sfen
-rw-r--r-- 1 root root 4171912806 Mar 12 05:49 log1
-rw------- 1 root root          0 Mar 12 05:49 nohup.out

log1 というファイルはバグってそうだ。消そうか。

rm log1
root@tk2-217-18401:/home/★user/shogi/ukamuse_sdt4_child4/bin# ./apery
usi
isready
usinewgame
position sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves
go
quit

バックグラウンド・プロセス専用に改造したので、応答は画面に出てこない。

それにしても、quit と打っても出てこないんだな。
そういえば、標準入出力の両方を見てないんだった。

# ps aux | grep apery
root      1718 97.9 14.3 1481912 145836 pts/8  Rl   14:20   0:25 ./apery --msgqueue 1115 durable 1114 durable
root      1733  0.0  0.0  12936   984 pts/8    S+   14:20   0:00 grep --color=auto apery

リダイレクトして実行すればよかったのか?

# ./apery --msgqueue 1115 durable 1114 durable 2> ./apery.err.log &
[2] 1761
# jobs
[2]+  Running                 ./apery --msgqueue 1115 durable 1114 durable 2> ./apery.err.log &

これでどうか?

# ./tamesi34_cs.exe
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力    : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1114
エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1115
デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > usi
 Enqueue(^q^) usi
Enqueue? >

エラー出力が どっかいっただけじゃないか?

# rabbitmqctl list_queues
Listing queues ...
1112    0
1111    0
1113    0
myqueue 25
1115    0
1114    0
# ls
20161007  apery  apery.err.log  benchmark.sfen  nohup.out
# nano log1
apery: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.

まあ、そういうことなのだった。意味ない。
サンプルソースでも よく読むか。

このエラーメッセージは ev.c ファイルのアサートが出しているのか。
自分のコードでもエラーを拾えないか。

例えば こんな感じで。

        AMQP::TcpChannel channel{ &connection };

        channel.onError([](const char *errMsg) {
            // report error
            std::cerr << "(^q^)channel error: " << errMsg << std::endl;
        });

変わり無し。

        channel.bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
            .onSuccess([ &channel, &exchange_name, &routing_key, &message]() {
                // &connection,

                if (!channel.publish(exchange_name, routing_key, message.c_str(), message.size())) {
                    std::cerr << "failed to publish?\n";
                }

                //// break in ev loop.
                //connection.close();
            })
            .onError([](const char *errMsg) {
                // report error
                std::cerr << "(^q^)channel bindQueue error: " << errMsg << std::endl;
            });

じゃあ bindQueue にも onError を付けれないだろうか?

# ./apery --msgqueue 1115 durable 1114 durable &
[1] 2632
root@tk2-217-18401:/home/★user/shogi/ukamuse_sdt4_child4/bin# apery: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.

[1]+  Aborted                 ./apery --msgqueue 1115 durable 1114 durable

実行した直後に落ちたような。

# ./apery --msgqueue 1115 durable 1114 durable &
[1] 2655
root@tk2-217-18401:/home/★user/shogi/ukamuse_sdt4_child4/bin# ps aux | grep apery
root      2655  100 14.3 1481928 145852 pts/8  Rl   14:55   0:14 ./apery --msgqueue 1115 durable 1114 durable
root      2663  0.0  0.0  12936   988 pts/8    S+   14:56   0:00 grep --color=auto apery

もう1回やったら大丈夫だった。

終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > usi
apery: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.
 Enqueue(^q^) usi

ぜんぜん変わらん。ググるか。

「libev: ev_loop recursion during release detected in RPC」(Apache)
https://issues.apache.org/jira/browse/KUDU-12

エラーが出ているというのはあるんだが、解決方法が書いてないのな。

「libev」(dirtysalt.github.io)
https://dirtysalt.github.io/libev.html

ソースを見せられても、やっぱり解決方法が分からん。

「Is that ev_run()/ev_loop() inhibited in multiple thread of current libev?」(http://lists.schmorp.de)
http://lists.schmorp.de/pipermail/libev/2011q4/001671.html

「Exceptions when a TCPSocket connection timed out in and out of the Reactor using Kernel.timeout #97」(celluloid/celluloid-io)
https://github.com/celluloid/celluloid-io/issues/97

カーネルのタイムアウト?

「マルチスレッド・サーバーの組織内libevはで再帰(EVBREAK_RECURSE)」(stack overflow)
http://ru.stackoverflow.com/questions/173685/%D0%A0%D0%B5%D0%BA%D1%83%D1%80%D1%81%D0%B8%D0%B2%D0%BD%D1%8B%D0%B9-%D0%B2%D1%8B%D0%B7%D0%BE%D0%B2-%D0%B2-libev-evbreak-recurse-%D0%BF%D1%80%D0%B8-%D0%BE%D1%80%D0%B3%D0%B0%D0%BD%D0%B8%D0%B7%D0%B0%D1%86%D0%B8%D0%B8-%D0%BC%D0%BD%D0%BE%D0%B3%D0%BE%D0%BF%D0%BE%D1%82%D0%BE%D1%87%D0%BD%D0%BE%D0%B3%D0%BE-%D1%81%D0%B5%D1%80%D0%B2%D0%B5

ロシア語をグーグル翻訳。

「libev まとめ」(Qiita)
http://qiita.com/janus_wel/items/51bbd7517ab237a0b65c

「libevによる非同期プログラミング」(Qiita)
http://qiita.com/0xfffffff7/items/c5287067623fdbd2c3cb

どうすっかなー。

tamesi33a2_cpp.exe でそんなエラーあっただろうか。

# ./tamesi33a2_cpp.exe &

バックグラウンドで実行して、

# ./tamesi34_cs.exe

ためし34でアクセス。

a1            a2 
   -> 1112 ->
   <- 1113 <-
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力    : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0

Unhandled Exception:
RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text="PRECONDITION_FAILED - inequivalent arg 'durable' for queue '1113' in vhost '/': received 'true' but current is 'false'", classId=50, methodId=10, cause=
  at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply () <0x409c15e0 + 0x00103> in <filename unknown>:0
  at RabbitMQ.Client.Impl.ModelBase.ModelRpc (RabbitMQ.Client.Impl.MethodBase method, RabbitMQ.Client.Impl.ContentHeaderBase header, System.Byte[] body) <0x409bf4b0 + 0x000f9> in <filename unknown>:0
  at RabbitMQ.Client.Framing.Impl.v0_8.Model.QueueDeclare (System.String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Boolean nowait, IDictionary arguments) <0x409c3810 + 0x00113> in <filename unknown>:0
  at UsagiMQ.Program.GetChannel (Int32 index) <0x409851f0 + 0x00138> in <filename unknown>:0
  at UsagiMQ.Program.StartConsume (System.String name_queue, Int32 lifeSpan_queue) <0x40985110 + 0x0002b> in <filename unknown>:0
  at UsagiMQ.Program.Main (System.String[] args) <0x40980d50 + 0x002eb> in <filename unknown>:0
[ERROR] FATAL UNHANDLED EXCEPTION: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text="PRECONDITION_FAILED - inequivalent arg 'durable' for queue '1113' in vhost '/': received 'true' but current is 'false'", classId=50, methodId=10, cause=
  at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply () <0x409c15e0 + 0x00103> in <filename unknown>:0
  at RabbitMQ.Client.Impl.ModelBase.ModelRpc (RabbitMQ.Client.Impl.MethodBase method, RabbitMQ.Client.Impl.ContentHeaderBase header, System.Byte[] body) <0x409bf4b0 + 0x000f9> in <filename unknown>:0
  at RabbitMQ.Client.Framing.Impl.v0_8.Model.QueueDeclare (System.String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Boolean nowait, IDictionary arguments) <0x409c3810 + 0x00113> in <filename unknown>:0
  at UsagiMQ.Program.GetChannel (Int32 index) <0x409851f0 + 0x00138> in <filename unknown>:0
  at UsagiMQ.Program.StartConsume (System.String name_queue, Int32 lifeSpan_queue) <0x40985110 + 0x0002b> in <filename unknown>:0
  at UsagiMQ.Program.Main (System.String[] args) <0x40980d50 + 0x002eb> in <filename unknown>:0

そういえば、1113 にアクセスするとエラーになるんだった。

キュー名を 1114、1115 に変えれるような仕組みを付けたい。

// OS      : Windows10 : // このプログラムは動かない
//
// OS      : Ubuntu 16.04
// Library : libev
//         : Install   : Command  : sudo apt-get update
//                                : sudo apt-get install libev-dev
// Service : RabbitMQ
//         : Reference : Web site : Top page http://www.rabbitmq.com/
//         : Install   : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
//         : Manual    : Command  : man rabbitmqctl
//         : Start     : Command  : rabbitmq-server
//         : Stop      : Command  : rabbitmqctl stop
//         : Check     : Command  : rabbitmqctl status
//         :           : Command  : rabbitmqctl list_queues
// Library : AMQP-CPP
//         : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
//         : Reference : Web site : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
//         : Reference : Web site : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
//         : Reference : Web site : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
//         : Reference : Web site : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
//         : Reference : Web site : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//
// Program : this
//         : Compile   : Command  : g++ -std=c++11 tamesi33a2_cpp.cpp -o tamesi33a2_cpp.exe -lev -lamqpcpp -pthread
//         : Execute   : Command  : // バックグラウンドで実行
//                                : ./tamesi33a2_cpp.exe --msgqueue 1115 durable 1114 durable &
//         : Stop      : Typing   : [Ctrl]+[C]
//
// メッセージの末尾に daze を付けます。
// デキューは「1112」、エンキューは「1113」キューに向けて行います。
// 標準入出力は、キーボード、画面には接続しないようにします。

#include <string> // std::string
#include <iostream> // std::cout
#include <sstream> // std::ostringstream
#include <chrono>
#include <thread> // sleep_for
#include <future> // thread

// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

//using namespace service_io;
//static void service_io::workConsume();
//static void service_io::enqueue(struct ev_loop* loop, AMQP::TcpConnection& connection, AMQP::TcpChannel& channel, std::string message);

namespace service_io 
{
    static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
    // 0 : enqueue用設定
    // 1 : dequeue用設定
    static const int ENQUEUE_INDEX = 0;
    static const int DEQUEUE_INDEX = 1;
    static const int NUM_INDEX = 2;
    static std::string name_queues[] = { "1113", "1112" };
    static std::string lifeSpan_queues[] = { "duration", "duration" };

    /// <summary>
    /// 回転式バッファー。
    /// これはメイン・スレッドに置く。
    /// デキューのスレッドでエンキューすることはできない。
    /// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
    /// </summary>
    namespace rotationBuffer
    {
        const int bufferSize = 100;
        static std::string buffer[bufferSize] = {};
        static int bufferCursors[2] = { 0, 0 };
        const int PUT_INDEX = 0;
        const int GET_INDEX = 1;
        static void putMessage(std::string message)
        {
            buffer[bufferCursors[PUT_INDEX]] = message;
            bufferCursors[PUT_INDEX]++;
            if (!(bufferCursors[PUT_INDEX] < bufferSize))
            {
                bufferCursors[PUT_INDEX] = 0;
            }
        }
        static std::string getMessage()
        {
            if ("" != buffer[bufferCursors[GET_INDEX]])
            {
                std::string message = buffer[bufferCursors[GET_INDEX]];
                buffer[bufferCursors[GET_INDEX]] = "";
                bufferCursors[GET_INDEX]++;
                if (!(bufferCursors[GET_INDEX] < bufferSize))
                {
                    bufferCursors[GET_INDEX] = 0;
                }
                return message;
            }
            return "";
        }
    }

    // メッセージキューにエンキュー
    static void enqueue(struct ev_loop* loop, AMQP::TcpConnection& connection, AMQP::TcpChannel& channel, std::string message)
    {
        std::string exchange_name = "myexchange";
        std::string routing_key = "";

        int lifeSpan = 0;
        if ("durable" == lifeSpan_queues[ENQUEUE_INDEX])
        {
            lifeSpan = AMQP::durable;
        }
        else if ("autodelete" == lifeSpan_queues[ENQUEUE_INDEX])
        {
            lifeSpan = AMQP::autodelete;
        }
        else if ("passive" == lifeSpan_queues[ENQUEUE_INDEX])
        {
            lifeSpan = AMQP::passive;
        }
        else if ("exclusive" == lifeSpan_queues[ENQUEUE_INDEX])
        {
            lifeSpan = AMQP::exclusive;
        }

        channel.declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
            .onError([](const char* errMsg) {
            std::cerr << "error declaring queue: " << errMsg << "\n";
        });

        channel.bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
            .onSuccess([&connection, &channel, &exchange_name, &routing_key, &message]() {

            if (!channel.publish(exchange_name, routing_key, message.c_str(), message.size())) {
                std::cerr << "failed to publish?\n";
            }

            // break in ev loop.
            connection.close();
        });

        // このループは、パブリッシュ後に抜ける
        ev_run(loop);
    }

    // 受信できたときに割り込んでくる処理
    // startConsume() しておくこと。
    static std::string dequeue() {
        std::string message;

        while ("" == message)
        {
            message = rotationBuffer::getMessage();
        }

        return message;
    }

    // メッセージ・キューの監視を開始
    static void workConsume()
    {
        // Connect to the AMQP service.
        auto *loop = EV_DEFAULT;
        AMQP::LibEvHandler handler(loop);
        AMQP::TcpConnection connection(&handler, ADDRESS);
        AMQP::TcpChannel channel(&connection);

        // I will go to the front of the box named "1111".
        int lifeSpan = 0;
        if ("durable" == lifeSpan_queues[DEQUEUE_INDEX])
        {
            lifeSpan = AMQP::durable;
        }
        else if ("autodelete" == lifeSpan_queues[DEQUEUE_INDEX])
        {
            lifeSpan = AMQP::autodelete;
        }
        else if ("passive" == lifeSpan_queues[DEQUEUE_INDEX])
        {
            lifeSpan = AMQP::passive;
        }
        else if ("exclusive" == lifeSpan_queues[DEQUEUE_INDEX])
        {
            lifeSpan = AMQP::exclusive;
        }
        channel.declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);

        // I look inside the box.
        auto errorCb = [](const char *errMsg) {
            std::cerr << "My ID watching failed [" << errMsg << "]" << std::endl;
        };
        auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {

            std::string myString(message.body(), message.bodySize());
            rotationBuffer::putMessage(myString);

            channel.ack(deliveryTag);
        };
        channel.consume(name_queues[DEQUEUE_INDEX])
            .onReceived(messageCb)
            .onError(errorCb);

        // I will keep on forever.
        ev_run(loop, 0);

        // I will not come here.
        return;
    }
}

int main(int argc, char* argv[])
{
    std::string cmdArg;
    // プログラム名を省き、コマンドライン引数だけをつなげる。
    for (int i = 1; i < argc; ++i)
    {
        cmdArg += std::string(argv[i]) + " ";
    }
    std::istringstream ssCmd(cmdArg);

    // 半角スペース区切りの最初の1トークンを拾う
    std::string token;
    ssCmd >> token;

    if (token == "--msgqueue") {
        // ex.) --msgqueue 1115 durable 1114 durable
        ssCmd >> service_io::name_queues[service_io::ENQUEUE_INDEX];
        ssCmd >> service_io::lifeSpan_queues[service_io::ENQUEUE_INDEX];
        ssCmd >> service_io::name_queues[service_io::DEQUEUE_INDEX];
        ssCmd >> service_io::lifeSpan_queues[service_io::DEQUEUE_INDEX];
    }

    // デキューの常時監視をスタート
    auto th1 = std::thread([] { service_io::workConsume(); });

    // エンキュー用の接続
    //auto* loop = EV_DEFAULT;
    struct ev_loop* loop = EV_DEFAULT;
    AMQP::LibEvHandler handler{ loop };
    AMQP::TcpConnection connection{ &handler, service_io::ADDRESS };
    AMQP::TcpChannel channel{ &connection };

    // 無限ループ
    for (;;)
    {
        std::string message = service_io::rotationBuffer::getMessage();
        if ("" != message)
        {
            // 末尾に daze を付ける。
            message += "daze";

            // エンキューする
            service_io::enqueue(loop, connection, channel, message);
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(20));
    }

    // このプログラムは、自分では接続を切らない。

    // th1 スレッドの終了を待つ
    th1.join();
    return 0;
}

こんな感じでいいのだろうか?

# ./tamesi33a2_cpp.exe --msgqueue 1115 durable 1114 durable &
[1] 4177
root@tk2-217-18401:/home/★user/shogi/cpp_service# tamesi33a2_cpp.exe: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.

同様のエラーが出るようになったのか。

namespace service_io で囲んだからか、それとも キュー名を選べるようにしたからか?

# ./tamesi33a2_cpp.exe --msgqueue 1115 durable 1114 durable &
[1] 4239

ネームスペースを外すと すんなり行ったようにも見える。
じゃあ ネームスペースを付けてみよう。

# ./tamesi34_cs.exe
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力    : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? > 1114
削除するメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
[1114]キューを削除したはずだぜ☆(^~^) result=[4294967295]
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力    : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? > 1115
削除するメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0

Unhandled Exception:
RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=504, text="Frame received for invalid channel 1", classId=0, methodId=0, cause=RabbitMQ.Client.Impl.ChannelErrorException: Frame received for invalid channel 1

なんか 操作をミスって キューを削除してしまったらしい。

Enqueue? > usi
 Enqueue(^q^) usi
Enqueue? > tamesi33a2_cpp.exe: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.

同様のエラーが出ている。

ソースを戻して実行。

# ./tamesi33a2_cpp.exe &
[1] 4615
root@tk2-217-18401:/home/★user/shogi/cpp_service# My ID watching failed [Channel is in error state]

チャンネルか、メッセージ・キューが腐っているんだろうか?

a1            a2 
   -> 1116 ->
   <- 1117 <-

キューを作り直してやりなおし。

> usi

Unhandled Exception:
RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text="PRECONDITION_FAILED - inequivalent arg 'durable' for queue '1116' in vhost '/': received 'true' but current is 'false'", classId=50, methodId=10, cause=
  at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply () <0x416f2760 + 0x00103> in <filename unknown>:0

属性の設定がおかしいんだろうか?

        int lifeSpan = 0;
    if ("durable" == lifeSpan_queues[ENQUEUE_INDEX])
    {
        lifeSpan = AMQP::durable;
    }
    else if ("autodelete" == lifeSpan_queues[ENQUEUE_INDEX])
    {
        lifeSpan = AMQP::autodelete;
    }
    else if ("passive" == lifeSpan_queues[ENQUEUE_INDEX])
    {
        lifeSpan = AMQP::passive;
    }
    else if ("exclusive" == lifeSpan_queues[ENQUEUE_INDEX])
    {
        lifeSpan = AMQP::exclusive;
    }

    channel.declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
        .onError([](const char* errMsg) {
        std::cerr << "error declaring queue: " << errMsg << "\n";
    });

C++ で何かおかしいことをやっているだろうか。等号とか。

ミスを見つけた。

static std::string bgp_lifeSpan[] = { "duration", "duration" };

これは、

static std::string bgp_lifeSpan[] = { "durable", "durable" };

こっちが正しい。今回の不具合とは関係なさそうだが直していく。

./tamesi33a2_cpp.exe &
[1] 5307
My ID watching failed [Channel is in error state]

直してもエラーが出ている。

こんな風に書き直してみる。

    int lifeSpan = 0;
    std::string a = lifeSpan_queues[ENQUEUE_INDEX];
    if      ("durable"    == a) { lifeSpan = AMQP::durable;    }
    else if ("autodelete" == a) { lifeSpan = AMQP::autodelete; }
    else if ("passive"    == a) { lifeSpan = AMQP::passive;    }
    else if ("exclusive"  == a) { lifeSpan = AMQP::exclusive;  }
    else
    {
        std::cerr << "未対応のキュー寿命が指定されました。["+ a +"]" << std::endl;
        exit(1);
    }

    channel.declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
        .onError([&a](const char* errMsg) {
            std::cerr << "error declaring queue: " << errMsg << " lifeSpan=[" << a << "]" << "\n";
        });
# ./tamesi33a2_cpp.exe --msgqueue 1115 durable 1114 durable &
[1] 5577

引数付けて指定するのを忘れていたぜ。

# ./tamesi34_cs.exe
★中略
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > Kirin
 Enqueue(^q^) Kirin
Enqueue? > Neko
 Enqueue(^q^) Neko
Enqueue? > Inu
 Enqueue(^q^) Inu
Enqueue? > root@tk2-217-18401:/home/★user/shogi/csharp_service# rabbitmqctl list_queues
Listing queues ...
1112    0
1116    0
1111    0
1113    0
1117    0
myqueue 25
1115    0
1114    3

./tamesi34_cs.exe を使うと メッセージはキューに送れるのと、

# jobs
[1]+  Terminated              ./tamesi33a2_cpp.exe --msgqueue 1115 durable 1114 durable  (wd: /home/★user/shogi/cpp_service)

./tamesi33a2_cpp.exe を使うと キューの中身が 0 になっていることは確認したので、エンキューができていないのだろう。

スレッドと mutex

「what is correspoding feature for synchronized in java?」(stack overflow)
http://stackoverflow.com/questions/5429653/what-is-correspoding-feature-for-synchronized-in-java

こんなの覚えないといけないだろうか?

とりあえず適当に使ってみよう。

/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
    const int bufferSize = 100;
    static std::string buffer[bufferSize] = {};
    static int bufferCursors[2] = { 0, 0 };
    const int PUT_INDEX = 0;
    const int GET_INDEX = 1;

    std::mutex _mutex;
    static void putMessage(std::string message)
    {
        std::unique_lock<std::mutex> lock(_mutex);
        buffer[bufferCursors[PUT_INDEX]] = message;
        bufferCursors[PUT_INDEX]++;
        if (!(bufferCursors[PUT_INDEX] < bufferSize))
        {
            bufferCursors[PUT_INDEX] = 0;
        }
    }
    static std::string getMessage()
    {
        std::unique_lock<std::mutex> lock(_mutex);
        if ("" != buffer[bufferCursors[GET_INDEX]])
        {
            std::string message = buffer[bufferCursors[GET_INDEX]];
            buffer[bufferCursors[GET_INDEX]] = "";
            bufferCursors[GET_INDEX]++;
            if (!(bufferCursors[GET_INDEX] < bufferSize))
            {
                bufferCursors[GET_INDEX] = 0;
            }
            return message;
        }
        return "";
    }
}
root@tk2-217-18401:/home/★user/shogi/cpp_service# rm tamesi33a2_cpp.cpp
root@tk2-217-18401:/home/★user/shogi/cpp_service# nano tamesi33a2_cpp.cpp
root@tk2-217-18401:/home/★user/shogi/cpp_service# g++ -std=c++11 tamesi33a2_cpp.cpp -o tamesi33a2_cpp.exe -lev -lamqpcpp -pthread
root@tk2-217-18401:/home/★user/shogi/cpp_service# ./tamesi33a2_cpp.exe --msgqueue 1117 durable 1116 durable &
[1] 6963
root@tk2-217-18401:/home/★user/shogi/cpp_service# My ID watching failed [Channel is in error state] lifeSpan=[durable]

初回に失敗するのはなんでだろう?

# ./tamesi34_cs.exe
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力    : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1118
エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1119
デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > ika
 Enqueue(^q^) ika
tamesi33a2_cpp.exe: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.
Enqueue? >

いつものエラーで落ちている。 ika というメッセージをデキューはしてるんだと思うんだが、このエラーメッセージが出てくる前に何か不具合が起こってるんじゃないか? エラー出力はないか?

# jobs
[2]+  Running                 ./tamesi33a2_cpp.exe --msgqueue 1117 durable 1116 durable 2> ./tamesi33a2_cpp.err.log &

こう動かしておいて。

# ./tamesi34_cs.exe
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力    : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1116
エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1117
デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > ringo

Unhandled Exception:
RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text="PRECONDITION_FAILED - inequivalent arg 'durable' for queue '1116' in vhost '/': received 'true' but current is 'false'", classId=50, methodId=10, cause=
  at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply () <0x411595f0 + 0x00103> in <filename unknown>:0
  at RabbitMQ.Client.Impl.ModelBase.ModelRpc (RabbitMQ.Client.Impl.MethodBase method, RabbitMQ.Client.Impl.ContentHeaderBase header, System.Byte[] body) <0x411574f0 + 0x000f9> in <filename unknown>:0
  at RabbitMQ.Client.Framing.Impl.v0_8.Model.QueueDeclare (System.String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Boolean nowait, IDictionary arguments) <0x4115b820 + 0x00113> in <filename unknown>:0
  at UsagiMQ.Program.GetChannel (Int32 index) <0x4111d1f0 + 0x00138> in <filename unknown>:0
  at UsagiMQ.Program.Enqueue (System.String message) <0x4115c930 + 0x00013> in <filename unknown>:0
  at UsagiMQ.Program.Main (System.String[] args) <0x41118d50 + 0x0031f> in <filename unknown>:0
[ERROR] FATAL UNHANDLED EXCEPTION: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text="PRECONDITION_FAILED - inequivalent arg 'durable' for queue '1116' in vhost '/': received 'true' but current is 'false'", classId=50, methodId=10, cause=
  at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply () <0x411595f0 + 0x00103> in <filename unknown>:0
  at RabbitMQ.Client.Impl.ModelBase.ModelRpc (RabbitMQ.Client.Impl.MethodBase method, RabbitMQ.Client.Impl.ContentHeaderBase header, System.Byte[] body) <0x411574f0 + 0x000f9> in <filename unknown>:0
  at RabbitMQ.Client.Framing.Impl.v0_8.Model.QueueDeclare (System.String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Boolean nowait, IDictionary arguments) <0x4115b820 + 0x00113> in <filename unknown>:0
  at UsagiMQ.Program.GetChannel (Int32 index) <0x4111d1f0 + 0x00138> in <filename unknown>:0
  at UsagiMQ.Program.Enqueue (System.String message) <0x4115c930 + 0x00013> in <filename unknown>:0
  at UsagiMQ.Program.Main (System.String[] args) <0x41118d50 + 0x0031f> in <filename unknown>:0

うーむ。

RabbitMQ.Client.Framing.Impl.v0_8.Model.QueueDeclare (System.String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Boolean nowait, IDictionary arguments)

こういう関数を使ってるのは C# の方だったかな。ここで条件の設定が違うんだろうか?

autodelete

# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力    : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1116
エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1117
デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
GetChannel index = [1]
    QUEUE_NAMES[index] = [1117]
    passive_lifeSpans[index] = [False]
    durable_lifeSpans[index] = [True]
    exclusive_lifeSpans[index] = [False]
    autodelete_lifeSpans[index] = [False]
    nowait は仮に false 固定
    arguments は仮に null 固定
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)

durable と autodelete は両立するらしいが。

# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力    : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1116
エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1117
デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable    : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
GetChannel index = [1]
    QUEUE_NAMES[index] = [1117]
    passive_lifeSpans[index] = [False]
    durable_lifeSpans[index] = [True]
    exclusive_lifeSpans[index] = [False]
    autodelete_lifeSpans[index] = [False]
    nowait は仮に false 固定
    arguments は仮に null 固定
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > blade
GetChannel index = [0]
    QUEUE_NAMES[index] = [1116]
    passive_lifeSpans[index] = [False]
    durable_lifeSpans[index] = [True]
    exclusive_lifeSpans[index] = [False]
    autodelete_lifeSpans[index] = [False]
    nowait は仮に false 固定
    arguments は仮に null 固定

ここで突然 処理から抜けた。
tamesi34_cs.err.log にいつものエラーが書いてある。

足し算

なんだか急に 体調が悪くなったんで休憩。
これは書きかけ。

// OS  : Windows 10
// IDE : Visual Studio 2015
//       Install : NuGet   : Install-Package RabbitMQ.Client -Version 4.1.1
//
// OS  : Ubuntu 16.04
//       Compile : Command : mcs /r:RabbitMQ.Client.dll -define:UBUNTU tamesi34_cs.cs
//               : Command : chmod 755 tamesi34_cs.cs
//       Execute : Command : // フォアグラウンドで実行する
//                         : ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
//       Check   : Command : // キューの中身の数を調べる
//                         : rabbitmqctl list_queues
//
// Library : RabbitMQ
//           Refference : Website : RabbitMQ http://www.rabbitmq.com/
//                      : Website : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
//                      : Website : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
//--------------------------------------------------------------------------------
// tamesi34_cs.cs

// Ubuntu の RabbitMQ はソースのバージョンが古いのか、API が異なった。
// #define UBUNTU

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace UsagiMQ
{
    [Flags]
    enum LifeSpanType
    {
        /// <summary>
        /// RabbitMQが止まってもキューを残す
        /// </summary>
        Durable = 1,
        /// <summary>
        /// コンシューマーが1人も接続していなかったら消す
        /// </summary>
        Autodelete = 0x1<<1,
        /// <summary>
        /// キューが存在するかどうかチェックするだけ。中身見ない時これ
        /// </summary>
        Passive = 0x1 << 2,
        /// <summary>
        /// この接続でだけ使える。この接続が切れたら消す
        /// </summary>
        Exclusive = 0x1 << 3
    }

    /// <summary>
    /// メッセージを エンキューします。
    /// キューの名前は指定してください。
    /// デキューは割込みを受け付けます。
    /// 
    /// 参照 : QueueDeclare (v1.0) http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
    /// 参照 : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
    /// 参照 : QueueDelete (v1.4) https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.QueueDelete(System.UInt16,System.String,System.Boolean,System.Boolean,System.Boolean)
    /// 参照 : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
    /// 参照 : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
    /// </summary>
    class Program
    {
        const int ENQUEUE_INDEX = 0;
        const int DEQUEUE_INDEX = 1;
        const int DELETEQUEUE_INDEX = 2;
        const int NUM_INDEX = 3;
        const string HOST_NAME = "localhost";
        static string[] QUEUE_NAMES = new string[NUM_INDEX];
        /// <summary>
        /// キューの寿命(複合可能)
        /// durable    : RabbitMQが止まってもキューを残す
        /// autodelete : コンシューマーが1人も接続していなかったら消す
        /// passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
        /// exclusive  : この接続でだけ使える。この接続が切れたら消す
        /// </summary>
        static LifeSpanType[] lifeSpans_queue = new LifeSpanType[NUM_INDEX];
        static bool[]
            durable_lifeSpans = new bool[NUM_INDEX],
            autodelete_lifeSpans = new bool[NUM_INDEX],
            passive_lifeSpans = new bool[NUM_INDEX],
            exclusive_lifeSpans = new bool[NUM_INDEX];
        /// <summary>
        /// 
        /// </summary>
        /// <param name="index_queue"></param>
        /// <param name="name_queue"></param>
        /// <param name="lifeSpan">
        /// (0) durable
        /// (1) autodelete
        /// (2) passive
        /// (3) exclusive
        /// </param>
        static void SetLifeSpan(int index_queue, string name_queue, LifeSpanType lifeSpan)
        {
            QUEUE_NAMES[index_queue] = name_queue;
            lifeSpans_queue[index_queue] = lifeSpan;

            // 一旦クリアー
            durable_lifeSpans[index_queue] = false;
            autodelete_lifeSpans[index_queue] = false;
            passive_lifeSpans[index_queue] = false;
            exclusive_lifeSpans[index_queue] = false;

            // durable
            if (((int)lifeSpan & (int)LifeSpanType.Durable) == (int)LifeSpanType.Durable) { durable_lifeSpans[index_queue] = true; }
            // autodelete
            if (((int)lifeSpan & (int)LifeSpanType.Autodelete) == (int)LifeSpanType.Autodelete) { autodelete_lifeSpans[index_queue] = true; }
            // passive
            if (((int)lifeSpan & (int)LifeSpanType.Passive) == (int)LifeSpanType.Passive) { passive_lifeSpans[index_queue] = true; }
            // exclusive
            if (((int)lifeSpan & (int)LifeSpanType.Exclusive) == (int)LifeSpanType.Exclusive) { exclusive_lifeSpans[index_queue] = true; }
        }


        public static ConnectionFactory GetFactory()
        {
            if (null == m_factory_)
            {
                m_factory_ = new ConnectionFactory() { HostName = HOST_NAME };
            }
            return m_factory_;
        }
        static ConnectionFactory m_factory_;

        public static IConnection GetConnection()
        {
            if (null == m_connection_)
            {
                m_connection_ = GetFactory().CreateConnection();
            }
            return m_connection_;
        }
        static IConnection m_connection_;

        public static IModel GetChannel(int index)
        {
            if (null == m_channels_[index])
            {
                m_channels_[index] = GetConnection().CreateModel();

                // 引数が7つの QueueDeclare でエラーを吐くことがあるので、情報を出力しておく。
                Console.Error.WriteLine("GetChannel index = [" + index + "]");
                Console.Error.WriteLine("    QUEUE_NAMES[index] = [" + QUEUE_NAMES[index] + "]");
                Console.Error.WriteLine("    passive_lifeSpans[index] = [" + passive_lifeSpans[index] + "]");
                Console.Error.WriteLine("    durable_lifeSpans[index] = [" + durable_lifeSpans[index] + "]");
                Console.Error.WriteLine("    exclusive_lifeSpans[index] = [" + exclusive_lifeSpans[index] + "]");
                Console.Error.WriteLine("    autodelete_lifeSpans[index] = [" + autodelete_lifeSpans[index] + "]");
                Console.Error.WriteLine("    nowait は仮に false 固定");
                Console.Error.WriteLine("    arguments は仮に null 固定");

#if UBUNTU
                // Ubuntuでは何故か Spring.Messaging.Amqp.Rabbit の引数 7 つのやつになっている。
                m_channels_[index].QueueDeclare(QUEUE_NAMES[index], passive_lifeSpans[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], false, null);
#else
                m_channels_[index].QueueDeclare(QUEUE_NAMES[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], null);
#endif
            }
            return m_channels_[index];
        }
        static IModel[] m_channels_ = new IModel[NUM_INDEX];

        public static EventingBasicConsumer GetConsumer(int index)
        {
            if (null == m_consumers_[index])
            {
#if UBUNTU
                // Ubuntuでは何故か v1.4.0 の引数が 0 個のやつになっている。調べたが引数が1個~6個のものは無かった。
                m_consumers_[index] = new EventingBasicConsumer();
#else
                m_consumers_[index] = new EventingBasicConsumer(GetChannel(index));
#endif

            }
            return m_consumers_[index];
        }
        static EventingBasicConsumer[] m_consumers_ = new EventingBasicConsumer[NUM_INDEX];

        /// <summary>
        /// 受信できたときに割り込んでくる処理
        /// </summary>
#if UBUNTU
        public static BasicDeliverEventHandler GetDequeueHandler()
#else
        public static EventHandler<BasicDeliverEventArgs> GetDequeueHandler()
#endif
        {
            if (null == m_dequeueHandler_)
            {
#if UBUNTU
                m_dequeueHandler_ = new BasicDeliverEventHandler((model, ea) =>
#else
                m_dequeueHandler_ = new EventHandler<BasicDeliverEventArgs>((model, ea) =>
#endif
                {
                    byte[] body = ea.Body;
                    string message = Encoding.UTF8.GetString(body);
                    Console.WriteLine("<---- [interrupt!] Dequeue(^q^) {0}", message);
                });
            }

            return m_dequeueHandler_;
        }

#if UBUNTU
        static BasicDeliverEventHandler m_dequeueHandler_;
#else
        static EventHandler<BasicDeliverEventArgs> m_dequeueHandler_;
#endif

        /// <summary>
        /// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
        /// </summary>
        static void CloseConnection()
        {
            if (null != m_connection_)
            {
                m_connection_.Close();
                m_connection_ = null;
            }
        }
        /// <summary>
        /// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
        /// </summary>
        static void CloseChannel(int index)
        {
            if (null != m_channels_[index])
            {
                m_channels_[index].Close();
                m_channels_[index] = null;
            }
        }

        static void Main(string[] args)
        {
            //----------------------------------------
            // Delete
            //----------------------------------------
            for (;;)
            {
                Console.Write(@"削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力    : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? > ");
                string queueName_delete = Console.ReadLine();
                if (""== queueName_delete.Trim())
                {
                    break;
                }


                Console.Write(@"削除するメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
("+ LifeSpanType.Durable + @") durable    : RabbitMQが止まってもキューを残す
(" + LifeSpanType.Autodelete + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + LifeSpanType.Passive + @") passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + LifeSpanType.Exclusive + @") exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
                LifeSpanType lifeSpan_delete = (LifeSpanType)int.Parse(Console.ReadLine());
                SetLifeSpan(DELETEQUEUE_INDEX, queueName_delete, lifeSpan_delete);
                uint result = DeleteQueue();
                Console.WriteLine(@"["+ queueName_delete + "]キューを削除したはずだぜ☆(^~^) result=["+ result + "]");
            }

            //----------------------------------------
            // Enqueue settings
            //----------------------------------------
            for (;;)
            {
                Console.Write(@"エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > ");
                string queueName_enqueue = Console.ReadLine();

                Console.Write(@"エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(" + LifeSpanType.Durable + @") durable    : RabbitMQが止まってもキューを残す
(" + LifeSpanType.Autodelete + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + LifeSpanType.Passive + @") passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + LifeSpanType.Exclusive + @") exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
                int lifeSpan_enqueue;
                if(int.TryParse(Console.ReadLine(),out lifeSpan_enqueue))
                {
                    SetLifeSpan(ENQUEUE_INDEX, queueName_enqueue, (LifeSpanType)lifeSpan_enqueue);
                    break;
                }
            }

            //----------------------------------------
            // Enqueue settings
            //----------------------------------------
            for (;;)
            {
                Console.Write(@"デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > ");
                string queueName_dequeue = Console.ReadLine();

                Console.Write(@"デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(" + LifeSpanType.Durable + @") durable    : RabbitMQが止まってもキューを残す
(" + LifeSpanType.Autodelete + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + LifeSpanType.Passive + @") passive    : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + LifeSpanType.Exclusive + @") exclusive  : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
                int lifeSpan_dequeue;
                if(int.TryParse(Console.ReadLine(),out lifeSpan_dequeue))
                {
                    StartConsume(queueName_dequeue, (LifeSpanType)lifeSpan_dequeue);
                    break;
                }
            }

            Console.Write(@"終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > ");
            for (;;)
            {
                // "Hello World!" などを入力
                string line = Console.ReadLine();
                Enqueue(line);
                Console.Write(@"Enqueue? > ");
            }

            // ここには来ない
            // CloseConnection();
        }

        static uint DeleteQueue()
        {
            IModel channel = GetChannel(DELETEQUEUE_INDEX);

#if UBUNTU
            // Ubuntuでは何故か 昔の .Net 用の引数 5 つのやつの変則版になっている。
            // uint QueueDelete(ushort ticket, string queue, bool ifUnused, bool ifEmpty, bool nowait)
            uint result = channel.QueueDelete( QUEUE_NAMES[DELETEQUEUE_INDEX],true,true,true);
#else
            uint result = channel.QueueDelete(QUEUE_NAMES[DELETEQUEUE_INDEX],true,true);
#endif

            // 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
            CloseChannel(DELETEQUEUE_INDEX);
            return result;
        }

        static void Enqueue(string message)
        {
            IModel channel = GetChannel(ENQUEUE_INDEX);

            byte[] body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("", QUEUE_NAMES[ENQUEUE_INDEX], null, body);

            Console.WriteLine(" Enqueue(^q^) {0}", message);

            // 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
            CloseChannel(ENQUEUE_INDEX);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="name_queue">メッセージ・キューの名前</param>
        /// <param name="lifeSpan_queue">既存のメッセージ・キューの場合、メッセージ・キューの設定は合わせる必要がある。
        /// durable    : RabbitMQが止まってもキューを残す
        /// autodelete : コンシューマーが1人も接続していなかったら消す
        /// passive    : キューが存在するかどうかチェックするだけで、中身を見ない時はこれ
        /// exclusive  : この接続でだけ使える。この接続が切れたら消す
        /// </param>
        static void StartConsume(string name_queue, LifeSpanType lifeSpan_queue)
        {
            SetLifeSpan(DEQUEUE_INDEX, name_queue, lifeSpan_queue);

            IModel channel = GetChannel(DEQUEUE_INDEX);
            EventingBasicConsumer consumer = GetConsumer(DEQUEUE_INDEX);

            // 受信できたときに割り込んでくる処理
            consumer.Received += GetDequeueHandler();

#if UBUNTU
            // Ubuntuでは何故か引数が 5 個のやつになっている。
            channel.BasicConsume( QUEUE_NAMES[DEQUEUE_INDEX], true, "", null, consumer);
#else
            channel.BasicConsume( QUEUE_NAMES[DEQUEUE_INDEX], true, consumer);
#endif
            // 終了はさせない
            // consumer.Received -= GetReceiveHandler();
            // CloseChannel(DEQUEUE_INDEX);
        }
    }
}

1116 は 3 で合ってたか。
次はエラー。
inequivalent arg 'auto_delete' for queue '1117'

1117 は 1 が正解か。

長くなったので次の記事へ

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1