前回の記事 : http://qiita.com/muzudho1/items/473fc0fda3e38300b820
バックグラウンドで割込み待ち停止しているプロセスと、プロセス間通信はできるのだろうか?
前に実験した気がするんだが どの記事だったろうか。
前回の記事だった。
「コンピュータ将棋ソフトとの対局サーバーを立てよう<その8>」(Qiita)
http://qiita.com/muzudho1/items/798dfbf267340f3801e1#_reference-1a747e743b7ff0dbb050
このときは C# / RabbitQM だったが、今度は C++ / AMQP-CPP でやりたい。
じゃあ
tamesi33a1_cpp.cpp、tamesi33a2_cpp.cpp のテストを作ろう。
テストケース tamesi33 C++版
a1 : (バックグラウンドに回さない)キーボード入力で、メッセージキュー「1112」にメッセージを送れるものとする。常時 メッセージキュー「1113」を監視し、メッセージが入っていれば 画面に出すものとする。
このプログラムは 必要になったら実行し、ログアウトしたら消えているものと想定する。
a2 : (バックグラウンドに回す)常時 メッセージキュー「1112」を監視し、メッセージが入っていれば その文字列に「daze」と付けて メッセージキュー「1113」にエンキューする。
このプログラムは ログアウトしても稼働しているものとする。
これで a2 が常時 稼働していて、a1 でアクセスできるのなら よしとする。
a2
a2 だけ作ればいいや……。
ソースを見ていたんだが
// I will keep on forever.
ev_run(loop, 0);
デキューを監視している処理があって、ここでブロックされている。
だからエンキューのプログラムまで行ってない。
どうするか。
C++ でスレッドを走らせるには?
デキューの監視は別スレッドでやればいいのだろうか?
「C++で簡単非同期処理(std::thread,std::async)」(Qiita)
http://qiita.com/termoshtt/items/d3cb7fe226cdd498d2ef
Visual Studio 2015 設定
Release x64 にして プロパティ画面からプリコンパイル・ヘッダーの使用を無しにする。
#include "stdafx.h"
といった記述も削除する。
- stdafx.h
- targetver.h
- stdafx.cpp
の3ファイルは削除。
コンパイルは通らないが、Ubuntu の方へソースを移動させる。
# g++ -std=c++11 tamesi33a2_cpp.cpp -o tamesi33a2_cpp.exe -lev -lamqpcpp -pthread
tamesi33a2_cpp.cpp: In lambda function:
tamesi33a2_cpp.cpp:95:46: error: ‘bgp_workConsume’ was not declared in this scope
auto th1 = std::thread([] { bgp_workConsume(); });
^
tamesi33a2_cpp.cpp: In function ‘int main()’:
tamesi33a2_cpp.cpp:104:20: error: ‘class AMQP::TcpConnection’ has no member named ‘isConnected’
while (connection.isConnected())
^
tamesi33a2_cpp.cpp:113:40: error: ‘enqueue’ was not declared in this scope
enqueue(connection, channel, message);
^
tamesi33a2_cpp.cpp: In function ‘void enqueue(AMQP::TcpConnection&, AMQP::TcpChannel&, std::__cxx11::string)’:
tamesi33a2_cpp.cpp:166:9: error: ‘loop’ was not declared in this scope
ev_run(loop);
^
tamesi33a2_cpp.cpp: In function ‘void bgp_workConsume()’:
tamesi33a2_cpp.cpp:222:18: error: ‘bgp_queueName_dequeue’ was not declared in this scope
channel.consume(bgp_queueName_dequeue)
^
エラーが出た。
//auto* loop = EV_DEFAULT;
struct ev_loop* loop = EV_DEFAULT;
auto を使わない場合は EV_DEFAULT の型は こう書くのか。
「class-TcpConnection」(docsforge.com)
http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
ここでソースが読める。
# g++ -std=c++11 tamesi33a2_cpp.cpp -o tamesi33a2_cpp.exe -lev -lamqpcpp -pthread
tamesi33a2_cpp.cpp: In lambda function:
tamesi33a2_cpp.cpp:95:46: error: ‘bgp_workConsume’ was not declared in this scope
auto th1 = std::thread([] { bgp_workConsume(); });
^
tamesi33a2_cpp.cpp: In function ‘int main()’:
tamesi33a2_cpp.cpp:114:46: error: ‘enqueue’ was not declared in this scope
enqueue(loop, connection, channel, message);
^
残りのエラーは何をしたらいいのか。 scope とか書いてるな。
関数のプロトタイプ宣言を試してみるか。
おっけ。
サンプル・プログラム
// OS : Windows10 : // このプログラムは動かない
//
// OS : Ubuntu 16.04
// Library : libev
// : Install : Command : sudo apt-get update
// : sudo apt-get install libev-dev
// Service : RabbitMQ
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Manual : Command : man rabbitmqctl
// : Start : Command : rabbitmq-server
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : : Command : rabbitmqctl list_queues
// Library : AMQP-CPP
// : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
// : Reference : Web site : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
// : Reference : Web site : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
// : Reference : Web site : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
// : Reference : Web site : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
// : Reference : Web site : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//
// Program : this
// : Compile : Command : g++ -std=c++11 tamesi33a2_cpp.cpp -o tamesi33a2_cpp.exe -lev -lamqpcpp -pthread
// : Execute : Command : // バックグラウンドで実行
// : ./tamesi33a2_cpp.exe &
// : Stop : Typing : [Ctrl]+[C]
//
// メッセージの末尾に daze を付けます。
// デキューは「1112」、エンキューは「1113」キューに向けて行います。
// 標準入出力は、キーボード、画面には接続しないようにします。
#include <string> // std::string
#include <iostream> // std::cout
#include <sstream> // std::ostringstream
#include <chrono>
#include <thread> // sleep_for
#include <future> // thread
// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
// 0 : enqueue用設定
// 1 : dequeue用設定
// 2 : deletequeue用設定
static const int ENQUEUE_INDEX = 0;
static const int DEQUEUE_INDEX = 1;
static const int NUM_INDEX = 2;
static std::string name_queues[] = { "1113", "1112" };
static std::string lifeSpan_queues[] = { "duration", "duration" };
static void workConsume();
static void enqueue(struct ev_loop* loop, AMQP::TcpConnection& connection, AMQP::TcpChannel& channel, std::string message);
/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
const int bufferSize = 100;
static std::string buffer[bufferSize] = {};
static int bufferCursors[2] = { 0, 0 };
const int PUT_INDEX = 0;
const int GET_INDEX = 1;
static void putMessage(std::string message)
{
buffer[bufferCursors[PUT_INDEX]] = message;
bufferCursors[PUT_INDEX]++;
if (!(bufferCursors[PUT_INDEX] < bufferSize))
{
bufferCursors[PUT_INDEX] = 0;
}
}
static std::string getMessage()
{
if ("" != buffer[bufferCursors[GET_INDEX]])
{
std::string message = buffer[bufferCursors[GET_INDEX]];
buffer[bufferCursors[GET_INDEX]] = "";
bufferCursors[GET_INDEX]++;
if (!(bufferCursors[GET_INDEX] < bufferSize))
{
bufferCursors[GET_INDEX] = 0;
}
return message;
}
return "";
}
}
int main()
{
// デキューの常時監視をスタート
auto th1 = std::thread([] { workConsume(); });
// エンキュー用の接続
//auto* loop = EV_DEFAULT;
struct ev_loop* loop = EV_DEFAULT;
AMQP::LibEvHandler handler{ loop };
AMQP::TcpConnection connection{ &handler, ADDRESS };
AMQP::TcpChannel channel{ &connection };
// 無限ループ
for (;;)
{
std::string message = rotationBuffer::getMessage();
if ("" != message)
{
// 末尾に daze を付ける。
message += "daze";
// エンキューする
enqueue(loop, connection, channel, message);
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
// このプログラムは、自分では接続を切らない。
// th1 スレッドの終了を待つ
th1.join();
return 0;
}
// メッセージキューにエンキュー
static void enqueue(struct ev_loop* loop, AMQP::TcpConnection& connection, AMQP::TcpChannel& channel, std::string message)
{
std::string exchange_name = "myexchange";
std::string routing_key = "";
int lifeSpan = 0;
if ("durable" == lifeSpan_queues[ENQUEUE_INDEX])
{
lifeSpan = AMQP::durable;
}
else if ("autodelete" == lifeSpan_queues[ENQUEUE_INDEX])
{
lifeSpan = AMQP::autodelete;
}
else if ("passive" == lifeSpan_queues[ENQUEUE_INDEX])
{
lifeSpan = AMQP::passive;
}
else if ("exclusive" == lifeSpan_queues[ENQUEUE_INDEX])
{
lifeSpan = AMQP::exclusive;
}
channel.declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
.onError([](const char* errMsg) {
std::cerr << "error declaring queue: " << errMsg << "\n";
});
channel.bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
.onSuccess([&connection, &channel, &exchange_name, &routing_key, &message]() {
if (!channel.publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cerr << "failed to publish?\n";
}
// break in ev loop.
connection.close();
});
// このループは、パブリッシュ後に抜ける
ev_run(loop);
}
// 受信できたときに割り込んでくる処理
// startConsume() しておくこと。
static std::string dequeue() {
std::string message;
while ("" == message)
{
message = rotationBuffer::getMessage();
}
return message;
}
// メッセージ・キューの監視を開始
static void workConsume()
{
// Connect to the AMQP service.
auto *loop = EV_DEFAULT;
AMQP::LibEvHandler handler(loop);
AMQP::TcpConnection connection(&handler, ADDRESS);
AMQP::TcpChannel channel(&connection);
// I will go to the front of the box named "1111".
int lifeSpan = 0;
if ("durable" == lifeSpan_queues[DEQUEUE_INDEX])
{
lifeSpan = AMQP::durable;
}
else if ("autodelete" == lifeSpan_queues[DEQUEUE_INDEX])
{
lifeSpan = AMQP::autodelete;
}
else if ("passive" == lifeSpan_queues[DEQUEUE_INDEX])
{
lifeSpan = AMQP::passive;
}
else if ("exclusive" == lifeSpan_queues[DEQUEUE_INDEX])
{
lifeSpan = AMQP::exclusive;
}
channel.declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);
// I look inside the box.
auto errorCb = [](const char *errMsg) {
std::cerr << "My ID watching failed [" << errMsg << "]" << std::endl;
};
auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::string myString(message.body(), message.bodySize());
rotationBuffer::putMessage(myString);
channel.ack(deliveryTag);
};
channel.consume(name_queues[DEQUEUE_INDEX])
.onReceived(messageCb)
.onError(errorCb);
// I will keep on forever.
ev_run(loop, 0);
// I will not come here.
return;
}
これでコンパイルは通る。
これをバックグラウンドで実行する。
./tamesi33a2_cpp.exe &
# ./tamesi33a1_cs.exe
選べだぜ☆(^~^)
1 : メッセージ エンキュー
2 : メッセージ デキュー
quit: 終了
1
メッセージを書いて[Enter]キーを押せだぜ☆(^~^)
tako
Enqueue(^q^) tako
選べだぜ☆(^~^)
1 : メッセージ エンキュー
2 : メッセージ デキュー
quit: 終了
2
Please, wait dequeue... Or Press any key to menu.
[interrupt!] Dequeue(^q^) takodaze
takodaze が返ってきた。C# → CPP → C# は おっけ!
C++で スレッドを書いたのが動いているんだろう。
浮かむ瀬のデキュー監視も別スレッドで動かそう
上のサンプル・プログラムを見て書き換えていこう。
名前を変えたい
gameserver.hpp を service_io.hpp に名前を変えて、名前空間 service_io を作りたい。
その名前空間の中の関数は service_io::enqueue みたいな感じで使える感じで。
通常の標準入出力が壊れていないか確認
対局
====
usi
usiok
isready
readyok
usinewgame
position sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves
go
例) bestmove 2g2f ponder 1c1d
quit
壊れてはない。
改造したソースコードを一応貼り付けておく。
service_io.hpp
#pragma once
#include <string> // std::string
#include <iostream> // std::cout
#include <sstream> // std::ostringstream
#include "common.hpp"
// #define UBUNTU
// バックグラウンド・プロセス用の出力
// #define BGP_COUT std::cout
// #define BGP_ENDL std::endl
#define BGP_COUT service_io::oss
#define BGP_ENDL BGP_ENDL1, BGP_ENDL2
#define BGP_ENDL1 std::endl
#define BGP_ENDL2 service_io::enqueue (service_io::oss.str())
namespace service_io
{
// 文字列ストリーム出力を、文字列に置換するもの
static std::ostringstream oss;
}
#ifdef UBUNTU
// OS : Ubuntu 16.04
// Library : libev
// : Install : Command : sudo apt-get update
// : sudo apt-get install libev-dev
// Service : RabbitMQ
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Manual : Command : man rabbitmqctl
// : Start : Command : rabbitmq-server
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : : Command : rabbitmqctl list_queues
// Library : AMQP-CPP
// : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
//
// service_io.hpp
// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
// バックグラウンド・プロセスに切り替えて使用する可能性も考慮した入出力API
namespace service_io
{
static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
// 0 : enqueue用設定
// 1 : dequeue用設定
static const int ENQUEUE_INDEX = 0;
static const int DEQUEUE_INDEX = 1;
static const int NUM_INDEX = 2;
static std::string name_queues[] = { "1113", "1112" };
static std::string lifeSpan_queues[] = { "duration", "duration" };
/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
const int bufferSize = 100;
static std::string buffer[bufferSize] = {};
static int bufferCursors[2] = { 0, 0 };
const int PUT_INDEX = 0;
const int GET_INDEX = 1;
static void putMessage(std::string message)
{
buffer[bufferCursors[PUT_INDEX]] = message;
bufferCursors[PUT_INDEX]++;
if (!(bufferCursors[PUT_INDEX] < bufferSize))
{
bufferCursors[PUT_INDEX] = 0;
}
}
static std::string getMessage()
{
if ("" != buffer[bufferCursors[GET_INDEX]])
{
std::string message = buffer[bufferCursors[GET_INDEX]];
buffer[bufferCursors[GET_INDEX]] = "";
bufferCursors[GET_INDEX]++;
if (!(bufferCursors[GET_INDEX] < bufferSize))
{
bufferCursors[GET_INDEX] = 0;
}
return message;
}
return "";
}
}
// メッセージキューにエンキュー
// 出力ストリームのカンマ区切り「,,,;」で使われ、何か数字を返す必要があって必然性はない数字 0 を返している
static int enqueue(std::string message) {
auto* loop = EV_DEFAULT;
AMQP::LibEvHandler handler{ loop };
AMQP::TcpConnection connection{ &handler, ADDRESS };
AMQP::TcpChannel channel{ &connection };
std::string exchange_name = "myexchange";
std::string routing_key = "";
int lifeSpan = 0;
if ("durable" == lifeSpan_queues[ENQUEUE_INDEX])
{
lifeSpan = AMQP::durable;
}
else if ("autodelete" == lifeSpan_queues[ENQUEUE_INDEX])
{
lifeSpan = AMQP::autodelete;
}
else if ("passive" == lifeSpan_queues[ENQUEUE_INDEX])
{
lifeSpan = AMQP::passive;
}
else if ("exclusive" == lifeSpan_queues[ENQUEUE_INDEX])
{
lifeSpan = AMQP::exclusive;
}
channel.declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
.onError([](const char* errMsg) {
std::cerr << "error declaring queue: " << errMsg << "\n";
});
channel.bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
.onSuccess([&connection, &channel, &exchange_name, &routing_key, &message]() {
if (!channel.publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cerr << "failed to publish?\n";
}
// break in ev loop.
connection.close();
});
// We will monitor until the connection is lost. Execute channel.declareQueue( ... ).
ev_run(loop);
return 0;
}
// service_io::workConsume() を別スレッドで走らせておくこと。
static std::string dequeue() {
std::string message;
while ("" == message)
{
message = rotationBuffer::getMessage();
}
return message;
}
// メッセージ・キューの監視を開始
static void workConsume()
{
// Connect to the AMQP service.
auto *loop = EV_DEFAULT;
AMQP::LibEvHandler handler(loop);
AMQP::TcpConnection connection(&handler, ADDRESS);
AMQP::TcpChannel channel(&connection);
// I will go to the front of the box named "1111".
int lifeSpan = 0;
if ("durable" == lifeSpan_queues[DEQUEUE_INDEX])
{
lifeSpan = AMQP::durable;
}
else if ("autodelete" == lifeSpan_queues[DEQUEUE_INDEX])
{
lifeSpan = AMQP::autodelete;
}
else if ("passive" == lifeSpan_queues[DEQUEUE_INDEX])
{
lifeSpan = AMQP::passive;
}
else if ("exclusive" == lifeSpan_queues[DEQUEUE_INDEX])
{
lifeSpan = AMQP::exclusive;
}
channel.declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);
// I look inside the box.
auto errorCb = [](const char *errMsg) {
std::cerr << "My ID watching failed [" << errMsg << "]" << std::endl;
};
auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::string myString(message.body(), message.bodySize());
rotationBuffer::putMessage(myString);
channel.ack(deliveryTag);
};
channel.consume(name_queues[DEQUEUE_INDEX])
.onReceived(messageCb)
.onError(errorCb);
// I will keep on forever.
ev_run(loop, 0);
// I will not come here.
return;
}
}
#else
// バックグラウンド・プロセスに切り替えて使用する可能性も考慮した入出力API
namespace service_io
{
static int enqueue(std::string message) {
std::cout << IOLock << message << std::endl << IOUnlock;
oss.str(std::string());// 空文字列を入れてクリアーする
return 0;
}
static std::string dequeue() {
std::string cmd;
if (!std::getline(std::cin, cmd))
{
cmd = "quit";
}
return cmd;
}
}
#endif
class ServiceIo
{
public:
ServiceIo();
~ServiceIo();
};
service_io.cpp
//#include "stdafx.h"
#include "service_io.hpp"
ServiceIo::ServiceIo()
{
}
ServiceIo::~ServiceIo()
{
}
usi.cpp 抜粋
void Searcher::doUSICommandLoop(int argc, char* argv[]) {
#if UBUNTU
// デキューの常時監視をスタート
auto th1 = std::thread([] { service_io::workConsume(); });
#endif
bool dialogue = false;
// 「./apery」のように引数無しで打鍵して実行された場合に限り、なんども標準入力を待つ。
if (argc == 1)
{
dialogue = true;
}
bool evalTableIsRead = false;
Position pos(DefaultStartPositionSFEN, threads.main(), thisptr);
std::string cmd;
std::string token;
for (int i = 1; i < argc; ++i)
{
cmd += std::string(argv[i]) + " ";
}
do {
// 文字が打たれたらその文字を、空文字を打たれたら "quit" 扱いとする。
if (dialogue)
{
cmd = service_io::dequeue();
}
// 半角スペース区切りの最初の1トークンを拾う
std::istringstream ssCmd(cmd);
ssCmd >> std::skipws >> token;
if (token == "--msgqueue") {
#if UBUNTU
ssCmd >> service_io::name_queues[ENQUEUE_INDEX];
ssCmd >> service_io::lifeSpan_queues[ENQUEUE_INDEX];
ssCmd >> service_io::name_queues[DEQUEUE_INDEX];
ssCmd >> service_io::lifeSpan_queues[DEQUEUE_INDEX];
dialogue = true;
#else
std::cerr << "Command line parameter error : \"--msgqueue\" option for UBUNTU.";
#endif
}
else if (token == "quit" || token == "stop" || token == "ponderhit" || token == "gameover") {
if (token != "ponderhit" || signals.stopOnPonderHit) {
signals.stop = true;
threads.main()->startSearching(true);
}
else
limits.ponder = false;
if (token == "ponderhit" && limits.moveTime != 0)
limits.moveTime += timeManager.elapsed();
}
else if (token == "go") go(pos, ssCmd);
else if (token == "position") setPosition(pos, ssCmd);
else if (token == "usinewgame"); // isready で準備は出来たので、対局開始時に特にする事はない。
else if (token == "usi") SYNCCOUT << "id name " << std::string(options["Engine_Name"])
<< "\nid author Hiraoka Takuya"
<< "\n" << options
<< "\nusiok" << SYNCENDL;
else if (token == "isready") { // 対局開始前の準備。
tt.clear();
threads.main()->previousScore = ScoreInfinite;
if (!evalTableIsRead) {
// 一時オブジェクトを生成して Evaluator::init() を呼んだ直後にオブジェクトを破棄する。
// 評価関数の次元下げをしたデータを格納する分のメモリが無駄な為、
std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
evalTableIsRead = true;
}
SYNCCOUT << "readyok" << SYNCENDL;
}
else if (token == "setoption") setOption(ssCmd);
else if (token == "write_eval") { // 対局で使う為の評価関数バイナリをファイルに書き出す。
if (!evalTableIsRead)
std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
Evaluator::writeSynthesized(Evaluator::evalDir);
}
#if defined LEARN
else if (token == "l") {
auto learner = std::unique_ptr<Learner>(new Learner);
learner->learn(pos, ssCmd);
}
else if (token == "make_teacher") {
if (!evalTableIsRead) {
std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
evalTableIsRead = true;
}
make_teacher(ssCmd);
}
else if (token == "use_teacher") {
if (!evalTableIsRead) {
std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
evalTableIsRead = true;
}
use_teacher(pos, ssCmd);
}
else if (token == "check_teacher") {
check_teacher(ssCmd);
}
else if (token == "print") printEvalTable(SQ88, f_gold + SQ78, f_gold, false);
#endif
#if !defined MINIMUL
// 以下、デバッグ用
else if (token == "bench") {
if (!evalTableIsRead) {
std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
evalTableIsRead = true;
}
benchmark(pos);
}
else if (token == "key") SYNCCOUT << pos.getKey() << SYNCENDL;
else if (token == "tosfen") SYNCCOUT << pos.toSFEN() << SYNCENDL;
else if (token == "eval") std::cout << evaluateUnUseDiff(pos) / FVScale << std::endl;
else if (token == "d") pos.print();
else if (token == "s") measureGenerateMoves(pos);
else if (token == "t") std::cout << pos.mateMoveIn1Ply().toCSA() << std::endl;
else if (token == "b") makeBook(pos, ssCmd);
#endif
else SYNCCOUT << "unknown command: " << cmd << SYNCENDL;
} while (token != "quit" && dialogue);
#if UBUNTU
// th1 スレッドの終了を待つ
th1.join();
#endif
threads.main()->waitForSearchFinished();
}
これだけだったろうか?
find . \( -name \Makefile -o -name \*.hpp -o -name \*.cpp -o -name \*.sfen -o -name \*.txt \) -type f | xargs -n 10 nkf -Lu --overwrite
改行の変換はこれ。どっかにまとめておきたいが。
nano service_io.hpp
こうして、
// #define UBUNTU
のコメントアウトを外す。
ヘッダ・ファイルの再読込み防止
In file included from bitboard.hpp:25:0,
from main.cpp:30:
service_io.hpp:9:0: warning: "UBUNTU" redefined
#define UBUNTU
^
あー、ヘッダファイル何度も読込んでるかも。
#pragma once
C++11 じゃこれは効かないんだろうか?
#ifndef APERY_SERVICE_IO_HPP
#define APERY_SERVICE_IO_HPP
ファイルの冒頭にこう書いて、
#endif // #ifndef APERY_SERVICE_IO_HPP
ファイルの末尾にこう書こう。
いや、そうでは無くて Makefile に -DUBUNTU と書き足したんじゃなかったか。
service_io.hpp
// #define UBUNTU
やっぱ こうする。
usi.cpp: In static member function ‘static void Searcher::doUSICommandLoop(int, char**)’ :
usi.cpp:1061:37: error: ‘ENQUEUE_INDEX’ was not declared in this scope
ssCmd >> service_io::name_queues[ENQUEUE_INDEX];
^
usi.cpp:1061:37: note: suggested alternative:
In file included from usi.cpp:22:0:
service_io.hpp:57:19: note: ‘service_io::ENQUEUE_INDEX’
static const int ENQUEUE_INDEX = 0;
^
usi.cpp:1063:37: error: ‘DEQUEUE_INDEX’ was not declared in this scope
ssCmd >> service_io::name_queues[DEQUEUE_INDEX];
^
usi.cpp:1063:37: note: suggested alternative:
In file included from usi.cpp:22:0:
service_io.hpp:58:19: note: ‘service_io::DEQUEUE_INDEX’
static const int DEQUEUE_INDEX = 1;
^
これ、ネームスペース抜けか?
そうだった。
コンパイルは通った。
# mv apery ../bin/apery
# cd ../bin
# ./apery --msgqueue 1114 durable 1115 durable &
1114 と、1115、まずは試しで 1112 と 1113 でいいか。
a1 a2
→ 1112 →
← 1113 ←
→ 1115 →
← 1114 ←
図にすると、こうか。なんかすっきりしないな。
a1 a2
→ 1112 →
← 1113 ←
→ 1114 →
← 1115 ←
こう変えよう。
# ./apery --msgqueue 1113 durable 1112 durable &
[2] 32479
# My ID watching failed [Channel is in error state]
エラーが返ってくるだけ、前進だな。
auto errorCb = [](const char *errMsg) {
std::cerr << "My ID watching failed [" << errMsg << "] lifeSpan_queues[" << DEQUEUE_INDEX << "]=[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl;
};
こう書くと、
# ./apery --msgqueue 1113 durable 1112 durable &
[3] 32746
# My ID watching failed [Channel is in error state] lifeSpan_queues[1]=[durable]
こうなる。
チャンネルが何かダメなのか?
キューを変えてみるか。
# ./apery --msgqueue 1115 durable 1114 durable &
[4] 410
今度はエラー応答も返ってこない。じゃあアクセスしてみよう。
# jobs
[1]- Running ./tamesi33a2_cpp.exe & (wd: /home/★user/shogi/cpp_service)
[4]+ Running ./apery --msgqueue 1115 durable 1114 durable &
なんか上のやつが 干渉してないか?
# kill %1
テスト
# ./tamesi34_cs.exe
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1114
エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1115
デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
<---- [interrupt!] Dequeue(^q^) usi
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? >
なんか「usi」が飛んできたのか。既にキューに入ってたとか。
じゃあ「usi」を送ってみるか。
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > usi
Enqueue(^q^) usi
apery: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.
Enqueue? >
浮かむ瀬から エラーが返ってきた。ループが再帰的な実行で うーん?
エラーが返ってくるだけ進歩だな。
「Is that ev_run()/ev_loop() inhibited in multiple thread of current libev?」(http://lists.schmorp.de)
http://lists.schmorp.de/pipermail/libev/2011q4/001659.html
ev_run( ) が再帰中に開放しているのが検出されるというのは どういうことなのか。
再帰しているのか?
エンキューには次のように書いている。
channel.bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
.onSuccess([&connection, &channel, &exchange_name, &routing_key, &message]() {
if (!channel.publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cerr << "failed to publish?\n";
}
// break in ev loop.
connection.close();
});
connection.close(); で、たしかに 接続を開放 している。
じゃあ、デキューとエンキューで、接続は別に行った方がいいのか。
書き直す。
コネクションをクローズするタイミングをよく確認。
「AMQP-CPP/tests/libev.cpp」(CopernicaMarketingSoftware/AMQP-CPP)
https://github.com/CopernicaMarketingSoftware/AMQP-CPP/blob/master/tests/libev.cpp
onSuccess の中で切断してないか?
channel.declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
.onError([](const char* errMsg) {
std::cerr << "error declaring queue: " << errMsg << "\n";
});
channel.bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
.onSuccess([&connection, &channel, &exchange_name, &routing_key, &message]() {
if (!channel.publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cerr << "failed to publish?\n";
}
// break in ev loop.
connection.close();
});
現状、onSuccess の中で切断しているが、bindQueue じゃなくて declareQueue の onSuccess の中で切断した方がいいんだろうか?
channel.declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
.onSuccess([&connection](const std::string &queueName, uint32_t messagecount, uint32_t consumercount) {
std::cout << "declared queue onSuccess queueName=[" << queueName << "] messagecount=[" << messagecount << "] consumercount=[" << consumercount << "]" << std::endl;
// break in ev loop.
connection.close();
})
.onError([](const char* errMsg) {
std::cerr << "error declaring queue: " << errMsg << "\n";
});
channel.bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
.onSuccess([ &channel, &exchange_name, &routing_key, &message]() {
// &connection,
if (!channel.publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cerr << "failed to publish?\n";
}
//// break in ev loop.
//connection.close();
});
こんなんでいいんだろうか?
# ./tamesi34_cs.exe
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1114
エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1115
デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > usi
apery: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.
Enqueue(^q^) usi
ダメなようだ。このエラーは何なのか。
# ps aux | grep apery
root 1582 0.0 0.0 12936 988 pts/8 S+ 14:11 0:00 grep --color=auto apery
なんらかの理由で落ちていて、それでループを抜けているのだろうか?どこかにエラーログは吐かれていないだろうか?
# ls -l
total 4076096
drwxr-xr-x 2 root root 4096 Mar 12 07:09 20161007
-rwxr-xr-x 1 root root 1911208 Mar 13 14:04 apery
-rw-r--r-- 1 root root 1531 Mar 12 05:48 benchmark.sfen
-rw-r--r-- 1 root root 4171912806 Mar 12 05:49 log1
-rw------- 1 root root 0 Mar 12 05:49 nohup.out
log1 というファイルはバグってそうだ。消そうか。
rm log1
root@tk2-217-18401:/home/★user/shogi/ukamuse_sdt4_child4/bin# ./apery
usi
isready
usinewgame
position sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves
go
quit
バックグラウンド・プロセス専用に改造したので、応答は画面に出てこない。
それにしても、quit と打っても出てこないんだな。
そういえば、標準入出力の両方を見てないんだった。
# ps aux | grep apery
root 1718 97.9 14.3 1481912 145836 pts/8 Rl 14:20 0:25 ./apery --msgqueue 1115 durable 1114 durable
root 1733 0.0 0.0 12936 984 pts/8 S+ 14:20 0:00 grep --color=auto apery
リダイレクトして実行すればよかったのか?
# ./apery --msgqueue 1115 durable 1114 durable 2> ./apery.err.log &
[2] 1761
# jobs
[2]+ Running ./apery --msgqueue 1115 durable 1114 durable 2> ./apery.err.log &
これでどうか?
# ./tamesi34_cs.exe
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1114
エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1115
デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > usi
Enqueue(^q^) usi
Enqueue? >
エラー出力が どっかいっただけじゃないか?
# rabbitmqctl list_queues
Listing queues ...
1112 0
1111 0
1113 0
myqueue 25
1115 0
1114 0
# ls
20161007 apery apery.err.log benchmark.sfen nohup.out
# nano log1
apery: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.
まあ、そういうことなのだった。意味ない。
サンプルソースでも よく読むか。
このエラーメッセージは ev.c ファイルのアサートが出しているのか。
自分のコードでもエラーを拾えないか。
例えば こんな感じで。
AMQP::TcpChannel channel{ &connection };
channel.onError([](const char *errMsg) {
// report error
std::cerr << "(^q^)channel error: " << errMsg << std::endl;
});
変わり無し。
channel.bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
.onSuccess([ &channel, &exchange_name, &routing_key, &message]() {
// &connection,
if (!channel.publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cerr << "failed to publish?\n";
}
//// break in ev loop.
//connection.close();
})
.onError([](const char *errMsg) {
// report error
std::cerr << "(^q^)channel bindQueue error: " << errMsg << std::endl;
});
じゃあ bindQueue にも onError を付けれないだろうか?
# ./apery --msgqueue 1115 durable 1114 durable &
[1] 2632
root@tk2-217-18401:/home/★user/shogi/ukamuse_sdt4_child4/bin# apery: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.
[1]+ Aborted ./apery --msgqueue 1115 durable 1114 durable
実行した直後に落ちたような。
# ./apery --msgqueue 1115 durable 1114 durable &
[1] 2655
root@tk2-217-18401:/home/★user/shogi/ukamuse_sdt4_child4/bin# ps aux | grep apery
root 2655 100 14.3 1481928 145852 pts/8 Rl 14:55 0:14 ./apery --msgqueue 1115 durable 1114 durable
root 2663 0.0 0.0 12936 988 pts/8 S+ 14:56 0:00 grep --color=auto apery
もう1回やったら大丈夫だった。
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > usi
apery: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.
Enqueue(^q^) usi
ぜんぜん変わらん。ググるか。
「libev: ev_loop recursion during release detected in RPC」(Apache)
https://issues.apache.org/jira/browse/KUDU-12
エラーが出ているというのはあるんだが、解決方法が書いてないのな。
「libev」(dirtysalt.github.io)
https://dirtysalt.github.io/libev.html
ソースを見せられても、やっぱり解決方法が分からん。
「Is that ev_run()/ev_loop() inhibited in multiple thread of current libev?」(http://lists.schmorp.de)
http://lists.schmorp.de/pipermail/libev/2011q4/001671.html
「Exceptions when a TCPSocket connection timed out in and out of the Reactor using Kernel.timeout #97」(celluloid/celluloid-io)
https://github.com/celluloid/celluloid-io/issues/97
カーネルのタイムアウト?
「マルチスレッド・サーバーの組織内libevはで再帰(EVBREAK_RECURSE)」(stack overflow)
http://ru.stackoverflow.com/questions/173685/%D0%A0%D0%B5%D0%BA%D1%83%D1%80%D1%81%D0%B8%D0%B2%D0%BD%D1%8B%D0%B9-%D0%B2%D1%8B%D0%B7%D0%BE%D0%B2-%D0%B2-libev-evbreak-recurse-%D0%BF%D1%80%D0%B8-%D0%BE%D1%80%D0%B3%D0%B0%D0%BD%D0%B8%D0%B7%D0%B0%D1%86%D0%B8%D0%B8-%D0%BC%D0%BD%D0%BE%D0%B3%D0%BE%D0%BF%D0%BE%D1%82%D0%BE%D1%87%D0%BD%D0%BE%D0%B3%D0%BE-%D1%81%D0%B5%D1%80%D0%B2%D0%B5
ロシア語をグーグル翻訳。
「libev まとめ」(Qiita)
http://qiita.com/janus_wel/items/51bbd7517ab237a0b65c
「libevによる非同期プログラミング」(Qiita)
http://qiita.com/0xfffffff7/items/c5287067623fdbd2c3cb
どうすっかなー。
tamesi33a2_cpp.exe でそんなエラーあっただろうか。
# ./tamesi33a2_cpp.exe &
バックグラウンドで実行して、
# ./tamesi34_cs.exe
ためし34でアクセス。
a1 a2
-> 1112 ->
<- 1113 <-
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
Unhandled Exception:
RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text="PRECONDITION_FAILED - inequivalent arg 'durable' for queue '1113' in vhost '/': received 'true' but current is 'false'", classId=50, methodId=10, cause=
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply () <0x409c15e0 + 0x00103> in <filename unknown>:0
at RabbitMQ.Client.Impl.ModelBase.ModelRpc (RabbitMQ.Client.Impl.MethodBase method, RabbitMQ.Client.Impl.ContentHeaderBase header, System.Byte[] body) <0x409bf4b0 + 0x000f9> in <filename unknown>:0
at RabbitMQ.Client.Framing.Impl.v0_8.Model.QueueDeclare (System.String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Boolean nowait, IDictionary arguments) <0x409c3810 + 0x00113> in <filename unknown>:0
at UsagiMQ.Program.GetChannel (Int32 index) <0x409851f0 + 0x00138> in <filename unknown>:0
at UsagiMQ.Program.StartConsume (System.String name_queue, Int32 lifeSpan_queue) <0x40985110 + 0x0002b> in <filename unknown>:0
at UsagiMQ.Program.Main (System.String[] args) <0x40980d50 + 0x002eb> in <filename unknown>:0
[ERROR] FATAL UNHANDLED EXCEPTION: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text="PRECONDITION_FAILED - inequivalent arg 'durable' for queue '1113' in vhost '/': received 'true' but current is 'false'", classId=50, methodId=10, cause=
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply () <0x409c15e0 + 0x00103> in <filename unknown>:0
at RabbitMQ.Client.Impl.ModelBase.ModelRpc (RabbitMQ.Client.Impl.MethodBase method, RabbitMQ.Client.Impl.ContentHeaderBase header, System.Byte[] body) <0x409bf4b0 + 0x000f9> in <filename unknown>:0
at RabbitMQ.Client.Framing.Impl.v0_8.Model.QueueDeclare (System.String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Boolean nowait, IDictionary arguments) <0x409c3810 + 0x00113> in <filename unknown>:0
at UsagiMQ.Program.GetChannel (Int32 index) <0x409851f0 + 0x00138> in <filename unknown>:0
at UsagiMQ.Program.StartConsume (System.String name_queue, Int32 lifeSpan_queue) <0x40985110 + 0x0002b> in <filename unknown>:0
at UsagiMQ.Program.Main (System.String[] args) <0x40980d50 + 0x002eb> in <filename unknown>:0
そういえば、1113 にアクセスするとエラーになるんだった。
キュー名を 1114、1115 に変えれるような仕組みを付けたい。
// OS : Windows10 : // このプログラムは動かない
//
// OS : Ubuntu 16.04
// Library : libev
// : Install : Command : sudo apt-get update
// : sudo apt-get install libev-dev
// Service : RabbitMQ
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Manual : Command : man rabbitmqctl
// : Start : Command : rabbitmq-server
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : : Command : rabbitmqctl list_queues
// Library : AMQP-CPP
// : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
// : Reference : Web site : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
// : Reference : Web site : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
// : Reference : Web site : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
// : Reference : Web site : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
// : Reference : Web site : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//
// Program : this
// : Compile : Command : g++ -std=c++11 tamesi33a2_cpp.cpp -o tamesi33a2_cpp.exe -lev -lamqpcpp -pthread
// : Execute : Command : // バックグラウンドで実行
// : ./tamesi33a2_cpp.exe --msgqueue 1115 durable 1114 durable &
// : Stop : Typing : [Ctrl]+[C]
//
// メッセージの末尾に daze を付けます。
// デキューは「1112」、エンキューは「1113」キューに向けて行います。
// 標準入出力は、キーボード、画面には接続しないようにします。
#include <string> // std::string
#include <iostream> // std::cout
#include <sstream> // std::ostringstream
#include <chrono>
#include <thread> // sleep_for
#include <future> // thread
// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
//using namespace service_io;
//static void service_io::workConsume();
//static void service_io::enqueue(struct ev_loop* loop, AMQP::TcpConnection& connection, AMQP::TcpChannel& channel, std::string message);
namespace service_io
{
static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
// 0 : enqueue用設定
// 1 : dequeue用設定
static const int ENQUEUE_INDEX = 0;
static const int DEQUEUE_INDEX = 1;
static const int NUM_INDEX = 2;
static std::string name_queues[] = { "1113", "1112" };
static std::string lifeSpan_queues[] = { "duration", "duration" };
/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
const int bufferSize = 100;
static std::string buffer[bufferSize] = {};
static int bufferCursors[2] = { 0, 0 };
const int PUT_INDEX = 0;
const int GET_INDEX = 1;
static void putMessage(std::string message)
{
buffer[bufferCursors[PUT_INDEX]] = message;
bufferCursors[PUT_INDEX]++;
if (!(bufferCursors[PUT_INDEX] < bufferSize))
{
bufferCursors[PUT_INDEX] = 0;
}
}
static std::string getMessage()
{
if ("" != buffer[bufferCursors[GET_INDEX]])
{
std::string message = buffer[bufferCursors[GET_INDEX]];
buffer[bufferCursors[GET_INDEX]] = "";
bufferCursors[GET_INDEX]++;
if (!(bufferCursors[GET_INDEX] < bufferSize))
{
bufferCursors[GET_INDEX] = 0;
}
return message;
}
return "";
}
}
// メッセージキューにエンキュー
static void enqueue(struct ev_loop* loop, AMQP::TcpConnection& connection, AMQP::TcpChannel& channel, std::string message)
{
std::string exchange_name = "myexchange";
std::string routing_key = "";
int lifeSpan = 0;
if ("durable" == lifeSpan_queues[ENQUEUE_INDEX])
{
lifeSpan = AMQP::durable;
}
else if ("autodelete" == lifeSpan_queues[ENQUEUE_INDEX])
{
lifeSpan = AMQP::autodelete;
}
else if ("passive" == lifeSpan_queues[ENQUEUE_INDEX])
{
lifeSpan = AMQP::passive;
}
else if ("exclusive" == lifeSpan_queues[ENQUEUE_INDEX])
{
lifeSpan = AMQP::exclusive;
}
channel.declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
.onError([](const char* errMsg) {
std::cerr << "error declaring queue: " << errMsg << "\n";
});
channel.bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
.onSuccess([&connection, &channel, &exchange_name, &routing_key, &message]() {
if (!channel.publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cerr << "failed to publish?\n";
}
// break in ev loop.
connection.close();
});
// このループは、パブリッシュ後に抜ける
ev_run(loop);
}
// 受信できたときに割り込んでくる処理
// startConsume() しておくこと。
static std::string dequeue() {
std::string message;
while ("" == message)
{
message = rotationBuffer::getMessage();
}
return message;
}
// メッセージ・キューの監視を開始
static void workConsume()
{
// Connect to the AMQP service.
auto *loop = EV_DEFAULT;
AMQP::LibEvHandler handler(loop);
AMQP::TcpConnection connection(&handler, ADDRESS);
AMQP::TcpChannel channel(&connection);
// I will go to the front of the box named "1111".
int lifeSpan = 0;
if ("durable" == lifeSpan_queues[DEQUEUE_INDEX])
{
lifeSpan = AMQP::durable;
}
else if ("autodelete" == lifeSpan_queues[DEQUEUE_INDEX])
{
lifeSpan = AMQP::autodelete;
}
else if ("passive" == lifeSpan_queues[DEQUEUE_INDEX])
{
lifeSpan = AMQP::passive;
}
else if ("exclusive" == lifeSpan_queues[DEQUEUE_INDEX])
{
lifeSpan = AMQP::exclusive;
}
channel.declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);
// I look inside the box.
auto errorCb = [](const char *errMsg) {
std::cerr << "My ID watching failed [" << errMsg << "]" << std::endl;
};
auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::string myString(message.body(), message.bodySize());
rotationBuffer::putMessage(myString);
channel.ack(deliveryTag);
};
channel.consume(name_queues[DEQUEUE_INDEX])
.onReceived(messageCb)
.onError(errorCb);
// I will keep on forever.
ev_run(loop, 0);
// I will not come here.
return;
}
}
int main(int argc, char* argv[])
{
std::string cmdArg;
// プログラム名を省き、コマンドライン引数だけをつなげる。
for (int i = 1; i < argc; ++i)
{
cmdArg += std::string(argv[i]) + " ";
}
std::istringstream ssCmd(cmdArg);
// 半角スペース区切りの最初の1トークンを拾う
std::string token;
ssCmd >> token;
if (token == "--msgqueue") {
// ex.) --msgqueue 1115 durable 1114 durable
ssCmd >> service_io::name_queues[service_io::ENQUEUE_INDEX];
ssCmd >> service_io::lifeSpan_queues[service_io::ENQUEUE_INDEX];
ssCmd >> service_io::name_queues[service_io::DEQUEUE_INDEX];
ssCmd >> service_io::lifeSpan_queues[service_io::DEQUEUE_INDEX];
}
// デキューの常時監視をスタート
auto th1 = std::thread([] { service_io::workConsume(); });
// エンキュー用の接続
//auto* loop = EV_DEFAULT;
struct ev_loop* loop = EV_DEFAULT;
AMQP::LibEvHandler handler{ loop };
AMQP::TcpConnection connection{ &handler, service_io::ADDRESS };
AMQP::TcpChannel channel{ &connection };
// 無限ループ
for (;;)
{
std::string message = service_io::rotationBuffer::getMessage();
if ("" != message)
{
// 末尾に daze を付ける。
message += "daze";
// エンキューする
service_io::enqueue(loop, connection, channel, message);
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
// このプログラムは、自分では接続を切らない。
// th1 スレッドの終了を待つ
th1.join();
return 0;
}
こんな感じでいいのだろうか?
# ./tamesi33a2_cpp.exe --msgqueue 1115 durable 1114 durable &
[1] 4177
root@tk2-217-18401:/home/★user/shogi/cpp_service# tamesi33a2_cpp.exe: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.
同様のエラーが出るようになったのか。
namespace service_io で囲んだからか、それとも キュー名を選べるようにしたからか?
# ./tamesi33a2_cpp.exe --msgqueue 1115 durable 1114 durable &
[1] 4239
ネームスペースを外すと すんなり行ったようにも見える。
じゃあ ネームスペースを付けてみよう。
# ./tamesi34_cs.exe
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? > 1114
削除するメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
[1114]キューを削除したはずだぜ☆(^~^) result=[4294967295]
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? > 1115
削除するメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
Unhandled Exception:
RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=504, text="Frame received for invalid channel 1", classId=0, methodId=0, cause=RabbitMQ.Client.Impl.ChannelErrorException: Frame received for invalid channel 1
なんか 操作をミスって キューを削除してしまったらしい。
Enqueue? > usi
Enqueue(^q^) usi
Enqueue? > tamesi33a2_cpp.exe: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.
同様のエラーが出ている。
ソースを戻して実行。
# ./tamesi33a2_cpp.exe &
[1] 4615
root@tk2-217-18401:/home/★user/shogi/cpp_service# My ID watching failed [Channel is in error state]
チャンネルか、メッセージ・キューが腐っているんだろうか?
a1 a2
-> 1116 ->
<- 1117 <-
キューを作り直してやりなおし。
> usi
Unhandled Exception:
RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text="PRECONDITION_FAILED - inequivalent arg 'durable' for queue '1116' in vhost '/': received 'true' but current is 'false'", classId=50, methodId=10, cause=
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply () <0x416f2760 + 0x00103> in <filename unknown>:0
属性の設定がおかしいんだろうか?
int lifeSpan = 0;
if ("durable" == lifeSpan_queues[ENQUEUE_INDEX])
{
lifeSpan = AMQP::durable;
}
else if ("autodelete" == lifeSpan_queues[ENQUEUE_INDEX])
{
lifeSpan = AMQP::autodelete;
}
else if ("passive" == lifeSpan_queues[ENQUEUE_INDEX])
{
lifeSpan = AMQP::passive;
}
else if ("exclusive" == lifeSpan_queues[ENQUEUE_INDEX])
{
lifeSpan = AMQP::exclusive;
}
channel.declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
.onError([](const char* errMsg) {
std::cerr << "error declaring queue: " << errMsg << "\n";
});
C++ で何かおかしいことをやっているだろうか。等号とか。
ミスを見つけた。
static std::string bgp_lifeSpan[] = { "duration", "duration" };
これは、
static std::string bgp_lifeSpan[] = { "durable", "durable" };
こっちが正しい。今回の不具合とは関係なさそうだが直していく。
./tamesi33a2_cpp.exe &
[1] 5307
My ID watching failed [Channel is in error state]
直してもエラーが出ている。
こんな風に書き直してみる。
int lifeSpan = 0;
std::string a = lifeSpan_queues[ENQUEUE_INDEX];
if ("durable" == a) { lifeSpan = AMQP::durable; }
else if ("autodelete" == a) { lifeSpan = AMQP::autodelete; }
else if ("passive" == a) { lifeSpan = AMQP::passive; }
else if ("exclusive" == a) { lifeSpan = AMQP::exclusive; }
else
{
std::cerr << "未対応のキュー寿命が指定されました。["+ a +"]" << std::endl;
exit(1);
}
channel.declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
.onError([&a](const char* errMsg) {
std::cerr << "error declaring queue: " << errMsg << " lifeSpan=[" << a << "]" << "\n";
});
# ./tamesi33a2_cpp.exe --msgqueue 1115 durable 1114 durable &
[1] 5577
引数付けて指定するのを忘れていたぜ。
# ./tamesi34_cs.exe
★中略
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > Kirin
Enqueue(^q^) Kirin
Enqueue? > Neko
Enqueue(^q^) Neko
Enqueue? > Inu
Enqueue(^q^) Inu
Enqueue? > root@tk2-217-18401:/home/★user/shogi/csharp_service# rabbitmqctl list_queues
Listing queues ...
1112 0
1116 0
1111 0
1113 0
1117 0
myqueue 25
1115 0
1114 3
./tamesi34_cs.exe を使うと メッセージはキューに送れるのと、
# jobs
[1]+ Terminated ./tamesi33a2_cpp.exe --msgqueue 1115 durable 1114 durable (wd: /home/★user/shogi/cpp_service)
./tamesi33a2_cpp.exe を使うと キューの中身が 0 になっていることは確認したので、エンキューができていないのだろう。
スレッドと mutex
「what is correspoding feature for synchronized in java?」(stack overflow)
http://stackoverflow.com/questions/5429653/what-is-correspoding-feature-for-synchronized-in-java
こんなの覚えないといけないだろうか?
とりあえず適当に使ってみよう。
/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
const int bufferSize = 100;
static std::string buffer[bufferSize] = {};
static int bufferCursors[2] = { 0, 0 };
const int PUT_INDEX = 0;
const int GET_INDEX = 1;
std::mutex _mutex;
static void putMessage(std::string message)
{
std::unique_lock<std::mutex> lock(_mutex);
buffer[bufferCursors[PUT_INDEX]] = message;
bufferCursors[PUT_INDEX]++;
if (!(bufferCursors[PUT_INDEX] < bufferSize))
{
bufferCursors[PUT_INDEX] = 0;
}
}
static std::string getMessage()
{
std::unique_lock<std::mutex> lock(_mutex);
if ("" != buffer[bufferCursors[GET_INDEX]])
{
std::string message = buffer[bufferCursors[GET_INDEX]];
buffer[bufferCursors[GET_INDEX]] = "";
bufferCursors[GET_INDEX]++;
if (!(bufferCursors[GET_INDEX] < bufferSize))
{
bufferCursors[GET_INDEX] = 0;
}
return message;
}
return "";
}
}
root@tk2-217-18401:/home/★user/shogi/cpp_service# rm tamesi33a2_cpp.cpp
root@tk2-217-18401:/home/★user/shogi/cpp_service# nano tamesi33a2_cpp.cpp
root@tk2-217-18401:/home/★user/shogi/cpp_service# g++ -std=c++11 tamesi33a2_cpp.cpp -o tamesi33a2_cpp.exe -lev -lamqpcpp -pthread
root@tk2-217-18401:/home/★user/shogi/cpp_service# ./tamesi33a2_cpp.exe --msgqueue 1117 durable 1116 durable &
[1] 6963
root@tk2-217-18401:/home/★user/shogi/cpp_service# My ID watching failed [Channel is in error state] lifeSpan=[durable]
初回に失敗するのはなんでだろう?
# ./tamesi34_cs.exe
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1118
エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1119
デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > ika
Enqueue(^q^) ika
tamesi33a2_cpp.exe: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.
Enqueue? >
いつものエラーで落ちている。 ika というメッセージをデキューはしてるんだと思うんだが、このエラーメッセージが出てくる前に何か不具合が起こってるんじゃないか? エラー出力はないか?
# jobs
[2]+ Running ./tamesi33a2_cpp.exe --msgqueue 1117 durable 1116 durable 2> ./tamesi33a2_cpp.err.log &
こう動かしておいて。
# ./tamesi34_cs.exe
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1116
エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1117
デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > ringo
Unhandled Exception:
RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text="PRECONDITION_FAILED - inequivalent arg 'durable' for queue '1116' in vhost '/': received 'true' but current is 'false'", classId=50, methodId=10, cause=
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply () <0x411595f0 + 0x00103> in <filename unknown>:0
at RabbitMQ.Client.Impl.ModelBase.ModelRpc (RabbitMQ.Client.Impl.MethodBase method, RabbitMQ.Client.Impl.ContentHeaderBase header, System.Byte[] body) <0x411574f0 + 0x000f9> in <filename unknown>:0
at RabbitMQ.Client.Framing.Impl.v0_8.Model.QueueDeclare (System.String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Boolean nowait, IDictionary arguments) <0x4115b820 + 0x00113> in <filename unknown>:0
at UsagiMQ.Program.GetChannel (Int32 index) <0x4111d1f0 + 0x00138> in <filename unknown>:0
at UsagiMQ.Program.Enqueue (System.String message) <0x4115c930 + 0x00013> in <filename unknown>:0
at UsagiMQ.Program.Main (System.String[] args) <0x41118d50 + 0x0031f> in <filename unknown>:0
[ERROR] FATAL UNHANDLED EXCEPTION: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text="PRECONDITION_FAILED - inequivalent arg 'durable' for queue '1116' in vhost '/': received 'true' but current is 'false'", classId=50, methodId=10, cause=
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply () <0x411595f0 + 0x00103> in <filename unknown>:0
at RabbitMQ.Client.Impl.ModelBase.ModelRpc (RabbitMQ.Client.Impl.MethodBase method, RabbitMQ.Client.Impl.ContentHeaderBase header, System.Byte[] body) <0x411574f0 + 0x000f9> in <filename unknown>:0
at RabbitMQ.Client.Framing.Impl.v0_8.Model.QueueDeclare (System.String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Boolean nowait, IDictionary arguments) <0x4115b820 + 0x00113> in <filename unknown>:0
at UsagiMQ.Program.GetChannel (Int32 index) <0x4111d1f0 + 0x00138> in <filename unknown>:0
at UsagiMQ.Program.Enqueue (System.String message) <0x4115c930 + 0x00013> in <filename unknown>:0
at UsagiMQ.Program.Main (System.String[] args) <0x41118d50 + 0x0031f> in <filename unknown>:0
うーむ。
RabbitMQ.Client.Framing.Impl.v0_8.Model.QueueDeclare (System.String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Boolean nowait, IDictionary arguments)
こういう関数を使ってるのは C# の方だったかな。ここで条件の設定が違うんだろうか?
autodelete
# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1116
エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1117
デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
GetChannel index = [1]
QUEUE_NAMES[index] = [1117]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [False]
nowait は仮に false 固定
arguments は仮に null 固定
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
durable と autodelete は両立するらしいが。
# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1116
エンキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1117
デキュー先のメッセージ・キューの寿命を選べだぜ☆(^~^)
(0) durable : RabbitMQが止まってもキューを残す
(1) autodelete : コンシューマーが1人も接続していなかったら消す
(2) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(3) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 0
GetChannel index = [1]
QUEUE_NAMES[index] = [1117]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [False]
nowait は仮に false 固定
arguments は仮に null 固定
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > blade
GetChannel index = [0]
QUEUE_NAMES[index] = [1116]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [False]
nowait は仮に false 固定
arguments は仮に null 固定
ここで突然 処理から抜けた。
tamesi34_cs.err.log にいつものエラーが書いてある。
足し算
なんだか急に 体調が悪くなったんで休憩。
これは書きかけ。
// OS : Windows 10
// IDE : Visual Studio 2015
// Install : NuGet : Install-Package RabbitMQ.Client -Version 4.1.1
//
// OS : Ubuntu 16.04
// Compile : Command : mcs /r:RabbitMQ.Client.dll -define:UBUNTU tamesi34_cs.cs
// : Command : chmod 755 tamesi34_cs.cs
// Execute : Command : // フォアグラウンドで実行する
// : ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
// Check : Command : // キューの中身の数を調べる
// : rabbitmqctl list_queues
//
// Library : RabbitMQ
// Refference : Website : RabbitMQ http://www.rabbitmq.com/
// : Website : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
// : Website : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
//--------------------------------------------------------------------------------
// tamesi34_cs.cs
// Ubuntu の RabbitMQ はソースのバージョンが古いのか、API が異なった。
// #define UBUNTU
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace UsagiMQ
{
[Flags]
enum LifeSpanType
{
/// <summary>
/// RabbitMQが止まってもキューを残す
/// </summary>
Durable = 1,
/// <summary>
/// コンシューマーが1人も接続していなかったら消す
/// </summary>
Autodelete = 0x1<<1,
/// <summary>
/// キューが存在するかどうかチェックするだけ。中身見ない時これ
/// </summary>
Passive = 0x1 << 2,
/// <summary>
/// この接続でだけ使える。この接続が切れたら消す
/// </summary>
Exclusive = 0x1 << 3
}
/// <summary>
/// メッセージを エンキューします。
/// キューの名前は指定してください。
/// デキューは割込みを受け付けます。
///
/// 参照 : QueueDeclare (v1.0) http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
/// 参照 : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
/// 参照 : QueueDelete (v1.4) https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.QueueDelete(System.UInt16,System.String,System.Boolean,System.Boolean,System.Boolean)
/// 参照 : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
/// 参照 : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
/// </summary>
class Program
{
const int ENQUEUE_INDEX = 0;
const int DEQUEUE_INDEX = 1;
const int DELETEQUEUE_INDEX = 2;
const int NUM_INDEX = 3;
const string HOST_NAME = "localhost";
static string[] QUEUE_NAMES = new string[NUM_INDEX];
/// <summary>
/// キューの寿命(複合可能)
/// durable : RabbitMQが止まってもキューを残す
/// autodelete : コンシューマーが1人も接続していなかったら消す
/// passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
/// exclusive : この接続でだけ使える。この接続が切れたら消す
/// </summary>
static LifeSpanType[] lifeSpans_queue = new LifeSpanType[NUM_INDEX];
static bool[]
durable_lifeSpans = new bool[NUM_INDEX],
autodelete_lifeSpans = new bool[NUM_INDEX],
passive_lifeSpans = new bool[NUM_INDEX],
exclusive_lifeSpans = new bool[NUM_INDEX];
/// <summary>
///
/// </summary>
/// <param name="index_queue"></param>
/// <param name="name_queue"></param>
/// <param name="lifeSpan">
/// (0) durable
/// (1) autodelete
/// (2) passive
/// (3) exclusive
/// </param>
static void SetLifeSpan(int index_queue, string name_queue, LifeSpanType lifeSpan)
{
QUEUE_NAMES[index_queue] = name_queue;
lifeSpans_queue[index_queue] = lifeSpan;
// 一旦クリアー
durable_lifeSpans[index_queue] = false;
autodelete_lifeSpans[index_queue] = false;
passive_lifeSpans[index_queue] = false;
exclusive_lifeSpans[index_queue] = false;
// durable
if (((int)lifeSpan & (int)LifeSpanType.Durable) == (int)LifeSpanType.Durable) { durable_lifeSpans[index_queue] = true; }
// autodelete
if (((int)lifeSpan & (int)LifeSpanType.Autodelete) == (int)LifeSpanType.Autodelete) { autodelete_lifeSpans[index_queue] = true; }
// passive
if (((int)lifeSpan & (int)LifeSpanType.Passive) == (int)LifeSpanType.Passive) { passive_lifeSpans[index_queue] = true; }
// exclusive
if (((int)lifeSpan & (int)LifeSpanType.Exclusive) == (int)LifeSpanType.Exclusive) { exclusive_lifeSpans[index_queue] = true; }
}
public static ConnectionFactory GetFactory()
{
if (null == m_factory_)
{
m_factory_ = new ConnectionFactory() { HostName = HOST_NAME };
}
return m_factory_;
}
static ConnectionFactory m_factory_;
public static IConnection GetConnection()
{
if (null == m_connection_)
{
m_connection_ = GetFactory().CreateConnection();
}
return m_connection_;
}
static IConnection m_connection_;
public static IModel GetChannel(int index)
{
if (null == m_channels_[index])
{
m_channels_[index] = GetConnection().CreateModel();
// 引数が7つの QueueDeclare でエラーを吐くことがあるので、情報を出力しておく。
Console.Error.WriteLine("GetChannel index = [" + index + "]");
Console.Error.WriteLine(" QUEUE_NAMES[index] = [" + QUEUE_NAMES[index] + "]");
Console.Error.WriteLine(" passive_lifeSpans[index] = [" + passive_lifeSpans[index] + "]");
Console.Error.WriteLine(" durable_lifeSpans[index] = [" + durable_lifeSpans[index] + "]");
Console.Error.WriteLine(" exclusive_lifeSpans[index] = [" + exclusive_lifeSpans[index] + "]");
Console.Error.WriteLine(" autodelete_lifeSpans[index] = [" + autodelete_lifeSpans[index] + "]");
Console.Error.WriteLine(" nowait は仮に false 固定");
Console.Error.WriteLine(" arguments は仮に null 固定");
#if UBUNTU
// Ubuntuでは何故か Spring.Messaging.Amqp.Rabbit の引数 7 つのやつになっている。
m_channels_[index].QueueDeclare(QUEUE_NAMES[index], passive_lifeSpans[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], false, null);
#else
m_channels_[index].QueueDeclare(QUEUE_NAMES[index], durable_lifeSpans[index], exclusive_lifeSpans[index], autodelete_lifeSpans[index], null);
#endif
}
return m_channels_[index];
}
static IModel[] m_channels_ = new IModel[NUM_INDEX];
public static EventingBasicConsumer GetConsumer(int index)
{
if (null == m_consumers_[index])
{
#if UBUNTU
// Ubuntuでは何故か v1.4.0 の引数が 0 個のやつになっている。調べたが引数が1個~6個のものは無かった。
m_consumers_[index] = new EventingBasicConsumer();
#else
m_consumers_[index] = new EventingBasicConsumer(GetChannel(index));
#endif
}
return m_consumers_[index];
}
static EventingBasicConsumer[] m_consumers_ = new EventingBasicConsumer[NUM_INDEX];
/// <summary>
/// 受信できたときに割り込んでくる処理
/// </summary>
#if UBUNTU
public static BasicDeliverEventHandler GetDequeueHandler()
#else
public static EventHandler<BasicDeliverEventArgs> GetDequeueHandler()
#endif
{
if (null == m_dequeueHandler_)
{
#if UBUNTU
m_dequeueHandler_ = new BasicDeliverEventHandler((model, ea) =>
#else
m_dequeueHandler_ = new EventHandler<BasicDeliverEventArgs>((model, ea) =>
#endif
{
byte[] body = ea.Body;
string message = Encoding.UTF8.GetString(body);
Console.WriteLine("<---- [interrupt!] Dequeue(^q^) {0}", message);
});
}
return m_dequeueHandler_;
}
#if UBUNTU
static BasicDeliverEventHandler m_dequeueHandler_;
#else
static EventHandler<BasicDeliverEventArgs> m_dequeueHandler_;
#endif
/// <summary>
/// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
/// </summary>
static void CloseConnection()
{
if (null != m_connection_)
{
m_connection_.Close();
m_connection_ = null;
}
}
/// <summary>
/// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
/// </summary>
static void CloseChannel(int index)
{
if (null != m_channels_[index])
{
m_channels_[index].Close();
m_channels_[index] = null;
}
}
static void Main(string[] args)
{
//----------------------------------------
// Delete
//----------------------------------------
for (;;)
{
Console.Write(@"削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? > ");
string queueName_delete = Console.ReadLine();
if (""== queueName_delete.Trim())
{
break;
}
Console.Write(@"削除するメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
("+ LifeSpanType.Durable + @") durable : RabbitMQが止まってもキューを残す
(" + LifeSpanType.Autodelete + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + LifeSpanType.Passive + @") passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + LifeSpanType.Exclusive + @") exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
LifeSpanType lifeSpan_delete = (LifeSpanType)int.Parse(Console.ReadLine());
SetLifeSpan(DELETEQUEUE_INDEX, queueName_delete, lifeSpan_delete);
uint result = DeleteQueue();
Console.WriteLine(@"["+ queueName_delete + "]キューを削除したはずだぜ☆(^~^) result=["+ result + "]");
}
//----------------------------------------
// Enqueue settings
//----------------------------------------
for (;;)
{
Console.Write(@"エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > ");
string queueName_enqueue = Console.ReadLine();
Console.Write(@"エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(" + LifeSpanType.Durable + @") durable : RabbitMQが止まってもキューを残す
(" + LifeSpanType.Autodelete + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + LifeSpanType.Passive + @") passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + LifeSpanType.Exclusive + @") exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
int lifeSpan_enqueue;
if(int.TryParse(Console.ReadLine(),out lifeSpan_enqueue))
{
SetLifeSpan(ENQUEUE_INDEX, queueName_enqueue, (LifeSpanType)lifeSpan_enqueue);
break;
}
}
//----------------------------------------
// Enqueue settings
//----------------------------------------
for (;;)
{
Console.Write(@"デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > ");
string queueName_dequeue = Console.ReadLine();
Console.Write(@"デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(" + LifeSpanType.Durable + @") durable : RabbitMQが止まってもキューを残す
(" + LifeSpanType.Autodelete + @") autodelete : コンシューマーが1人も接続していなかったら消す
(" + LifeSpanType.Passive + @") passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(" + LifeSpanType.Exclusive + @") exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > ");
int lifeSpan_dequeue;
if(int.TryParse(Console.ReadLine(),out lifeSpan_dequeue))
{
StartConsume(queueName_dequeue, (LifeSpanType)lifeSpan_dequeue);
break;
}
}
Console.Write(@"終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > ");
for (;;)
{
// "Hello World!" などを入力
string line = Console.ReadLine();
Enqueue(line);
Console.Write(@"Enqueue? > ");
}
// ここには来ない
// CloseConnection();
}
static uint DeleteQueue()
{
IModel channel = GetChannel(DELETEQUEUE_INDEX);
#if UBUNTU
// Ubuntuでは何故か 昔の .Net 用の引数 5 つのやつの変則版になっている。
// uint QueueDelete(ushort ticket, string queue, bool ifUnused, bool ifEmpty, bool nowait)
uint result = channel.QueueDelete( QUEUE_NAMES[DELETEQUEUE_INDEX],true,true,true);
#else
uint result = channel.QueueDelete(QUEUE_NAMES[DELETEQUEUE_INDEX],true,true);
#endif
// 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
CloseChannel(DELETEQUEUE_INDEX);
return result;
}
static void Enqueue(string message)
{
IModel channel = GetChannel(ENQUEUE_INDEX);
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", QUEUE_NAMES[ENQUEUE_INDEX], null, body);
Console.WriteLine(" Enqueue(^q^) {0}", message);
// 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
CloseChannel(ENQUEUE_INDEX);
}
/// <summary>
///
/// </summary>
/// <param name="name_queue">メッセージ・キューの名前</param>
/// <param name="lifeSpan_queue">既存のメッセージ・キューの場合、メッセージ・キューの設定は合わせる必要がある。
/// durable : RabbitMQが止まってもキューを残す
/// autodelete : コンシューマーが1人も接続していなかったら消す
/// passive : キューが存在するかどうかチェックするだけで、中身を見ない時はこれ
/// exclusive : この接続でだけ使える。この接続が切れたら消す
/// </param>
static void StartConsume(string name_queue, LifeSpanType lifeSpan_queue)
{
SetLifeSpan(DEQUEUE_INDEX, name_queue, lifeSpan_queue);
IModel channel = GetChannel(DEQUEUE_INDEX);
EventingBasicConsumer consumer = GetConsumer(DEQUEUE_INDEX);
// 受信できたときに割り込んでくる処理
consumer.Received += GetDequeueHandler();
#if UBUNTU
// Ubuntuでは何故か引数が 5 個のやつになっている。
channel.BasicConsume( QUEUE_NAMES[DEQUEUE_INDEX], true, "", null, consumer);
#else
channel.BasicConsume( QUEUE_NAMES[DEQUEUE_INDEX], true, consumer);
#endif
// 終了はさせない
// consumer.Received -= GetReceiveHandler();
// CloseChannel(DEQUEUE_INDEX);
}
}
}
1116 は 3 で合ってたか。
次はエラー。
inequivalent arg 'auto_delete' for queue '1117'
1117 は 1 が正解か。