前回の記事 : http://qiita.com/muzudho1/items/b45c52c50f11011e17f3
シングルトンにしよう
conversion from ‘std::nullptr_t’ to non-scalar type
C++ でのヌルの扱い方が分からん。
コネクションをシングルトンにするには こんなんでいいんだろうか?
//--------------------------------------------------------------------------------
// OS : Windows10 : // このプログラムは動かない
//
// OS : Ubuntu 16.04
// Library : libev
// : Install : Command : sudo apt-get update
// : sudo apt-get install libev-dev
// Service : RabbitMQ
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Manual : Command : man rabbitmqctl
// : Start : Command : rabbitmq-server
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : : Command : rabbitmqctl list_queues
// Library : AMQP-CPP
// : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
// : Reference : Web site : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
// : Reference : Web site : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
// : Reference : Web site : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
// : Reference : Web site : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
// : Reference : Web site : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//
// Program : this
// : Compile : Command : g++ -std=c++11 tamesi35a2_cpp.cpp -o tamesi35a2_cpp.exe -lev -lamqpcpp -pthread
// : Execute : Command : // バックグラウンドで実行
// : ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
// : Stop : Typing : [Ctrl]+[C]
//--------------------------------------------------------------------------------
//
// メッセージの末尾に daze を付けます。
// デキューは「1112」、エンキューは「1113」キューに向けて行います。
// 標準入出力は、キーボード、画面には接続しないようにします。
#include <string> // std::string
#include <iostream> // std::cout
#include <sstream> // std::ostringstream
#include <chrono>
#include <thread> // sleep_for
#include <future> // thread
// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
// このプログラム内だけで使われる数字。AMQP-CPPでの実装とは異なる。
// AMQP-CPPでの実装 : AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
enum LifeSpanType
{
// RabbitMQが止まってもキューを残す
durable = 1,
// コンシューマーが1人も接続していなかったら消す
autodelete = 0x1 << 1,
// キューが存在するかどうかチェックするだけ。中身見ない時これ
passive = 0x1 << 2,
// この接続でだけ使える。この接続が切れたら消す
exclusive = 0x1 << 3
};
static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
// 0 : enqueue用設定
// 1 : dequeue用設定
static const int ENQUEUE_INDEX = 0;
static const int DEQUEUE_INDEX = 1;
static const int NUM_INDEX = 2;
static std::string name_queues[] = { "", "" };
static LifeSpanType lifeSpan_queues[] = { (LifeSpanType)0, (LifeSpanType)0 };
static LifeSpanType LifeSpanString_To_Enum(std::string lifeSpan)
{
if ("durable"==lifeSpan) {
return durable;
} else if ("autodelete" == lifeSpan) {
return autodelete;
} else if ("passive" == lifeSpan) {
return passive;
} else if ("exclusive" == lifeSpan) {
return exclusive;
} else {
std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
exit(1);
}
}
static int LifeSpanString_To_AmqpInt(LifeSpanType lifeSpan)
{
int amqpInt = 0;
if ( ((int)durable & (int)lifeSpan) == (int)durable) {
amqpInt |= AMQP::durable;
}
if (((int)autodelete & (int)lifeSpan) == (int)autodelete) {
amqpInt |= AMQP::autodelete;
}
if (((int)passive & (int)lifeSpan) == (int)passive) {
amqpInt |= AMQP::passive;
}
if (((int)exclusive & (int)lifeSpan) == (int)exclusive) {
amqpInt |= AMQP::exclusive;
}
return amqpInt;
}
static std::string Dump()
{
static std::ostringstream sb;
sb << "tamesi35a2_cs.cs Dump" << std::endl
<< " name_queues [ENQUEUE_INDEX] =[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
<< " lifeSpan_queues [ENQUEUE_INDEX] =[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl
<< " ----" << std::endl
<< " name_queues [DEQUEUE_INDEX] =[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl
<< " lifeSpan_queues [DEQUEUE_INDEX] =[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl
<< " ----" << std::endl;
return sb.str();
}
// 接続はシングルトンにします
static struct ev_loop* pLoop_ev = EV_DEFAULT;
static AMQP::LibEvHandler handler_ev{ pLoop_ev };
static AMQP::TcpConnection* pConnection_ev = nullptr;
static AMQP::TcpConnection* getConnection()
{
if (nullptr== pConnection_ev) {
pConnection_ev = new AMQP::TcpConnection( &handler_ev, ADDRESS );
std::cout << "(^▽^)接続したぜ☆" << std::endl;
}
return pConnection_ev;
}
static void closeConnection()
{
if (nullptr != pConnection_ev) {
pConnection_ev->close();
pConnection_ev = nullptr;
}
}
/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
const int bufferSize = 100;
static std::string buffer[bufferSize] = {};
static int bufferCursors[2] = { 0, 0 };
const int PUT_INDEX = 0;
const int GET_INDEX = 1;
std::mutex _mutex;
static void putMessage(std::string message)
{
std::unique_lock<std::mutex> lock(_mutex);
buffer[bufferCursors[PUT_INDEX]] = message;
bufferCursors[PUT_INDEX]++;
if (!(bufferCursors[PUT_INDEX] < bufferSize))
{
bufferCursors[PUT_INDEX] = 0;
}
}
static std::string getMessage()
{
std::unique_lock<std::mutex> lock(_mutex);
if ("" != buffer[bufferCursors[GET_INDEX]])
{
std::string message = buffer[bufferCursors[GET_INDEX]];
buffer[bufferCursors[GET_INDEX]] = "";
bufferCursors[GET_INDEX]++;
if (!(bufferCursors[GET_INDEX] < bufferSize))
{
bufferCursors[GET_INDEX] = 0;
}
return message;
}
return "";
}
}
// メッセージキューにエンキュー
static void enqueue(std::string message)
{
std::cout << "(^▽^)エンキュー開始だぜ☆" << std::endl;
// エンキュー用の接続
AMQP::TcpChannel channel{ getConnection() };
std::cout << "(^▽^)チャンネル開いたぜ☆" << std::endl;
std::string exchange_name = "myexchange";
std::string routing_key = "";
int lifeSpan = LifeSpanString_To_AmqpInt(lifeSpan_queues[ENQUEUE_INDEX]);
channel.declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
.onError([&lifeSpan](const char* errMsg) {
std::cerr << "error declaring queue: " << errMsg << " lifeSpan=[" << lifeSpan << "]" << "\n";
});
std::cout << "(^▽^)デクレア・キューしたぜ☆" << std::endl;
channel.bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
.onSuccess([&channel, &exchange_name, &routing_key, &message]() {
if (!channel.publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cerr << "failed to publish?\n";
}
// break in ev loop.
closeConnection();
});
std::cout << "(^▽^)バインド・キューしたぜ☆" << std::endl;
// このループは、パブリッシュ後に抜ける
ev_run(pLoop_ev);
}
// 受信できたときに割り込んでくる処理
// startConsume() しておくこと。
static std::string dequeue() {
std::string message;
while ("" == message)
{
message = rotationBuffer::getMessage();
}
return message;
}
//// メッセージ・キューの送信を担当
//static void workProduce()
//{
//
//}
// メッセージ・キューの監視を開始
static void workConsume()
{
std::cout << "(^q^)コンシューム開始だぜ☆" << std::endl;
// Connect to the AMQP service.
AMQP::TcpChannel channel(getConnection());
std::cout << "(^q^)チャンネル開いたぜ☆" << std::endl;
// I will go to the front of the box named "1111".
int lifeSpan = LifeSpanString_To_AmqpInt(lifeSpan_queues[DEQUEUE_INDEX]);
channel.declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);
std::cout << "(^q^)デクレア・キューしたぜ☆" << std::endl;
// I look inside the box.
auto errorCb = [&lifeSpan](const char *errMsg) {
std::cerr << "My ID watching failed [" << errMsg << "] lifeSpan=[" << lifeSpan << "]" << std::endl;
};
auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::string myString(message.body(), message.bodySize());
rotationBuffer::putMessage(myString);
channel.ack(deliveryTag);
};
channel.consume(name_queues[DEQUEUE_INDEX])
.onReceived(messageCb)
.onError(errorCb);
std::cout << "(^q^)コンシュームしたぜ☆" << std::endl;
// I will keep on forever.
ev_run(pLoop_ev);
// I will not come here.
return;
}
int main(int argc, char* argv[])
{
std::cout << "(^q^)Welcome! AMQP::durable=[" << AMQP::durable << "] AMQP::autodelete=[" << AMQP::autodelete << "] AMQP::passive=[" << AMQP::passive << "] AMQP::exclusive=[" << AMQP::exclusive << "]" << std::endl;
// 引数の解析
if(0<argc)
{
// プログラム名を省き、コマンドライン引数だけをつなげる。
std::string cmdArg;
for (int i = 1; i < argc; ++i)
{
cmdArg += std::string(argv[i]);
if (i < argc) {
cmdArg += " ";
}
}
std::cout << "(^q^)cmdArg=[" << cmdArg << "]" << std::endl;
std::istringstream data(cmdArg);
//data >> std::skipws;
std::cout << "(^q^)data.str()=[" << data.str() << "]" << std::endl;
// 与件
// 「--enqueue 1117 durable autodelete --dequeue 1116 durable autodelete」
// 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」
// 受け皿
std::string queueNames[2] = {}; // 結果はこれらの配列に入れる
std::vector<std::string> lifespans[2] = {};
// 記憶
int m0 = -1; // 0:enqueue, 1:dequeue
int m1 = -1; // lifespans index.
// 解析器
std::string a;
while (data >> a) {
std::cout << "(^_^)a0=[" << a << "]" << std::endl;
if ("--enqueue" == a)
{
std::cout << "(^q^)エンキュー!" << std::endl;
m0 = 0;
m1 = -1;
}
else if ("--dequeue" == a)
{
std::cout << "(^q^)デキュー!" << std::endl;
m0 = 1;
m1 = -1;
}
else if (-1 == m1 && 0 == m0) {
std::cout << "(^q^)エンキューのキュー名!" << std::endl;
queueNames[m0] = a; // キュー名
lifespans[m0] = std::vector<std::string>();// 配列の中にリストを準備
m1++;
}
else if (-1 == m1 && 1 == m0) {
std::cout << "(^q^)デキューのキュー名!" << std::endl;
queueNames[m0] = a; // キュー名
lifespans[m0] = std::vector<std::string>();// 配列の中にリストを準備
m1++;
}
else
{
std::cout << "(^q^)寿命! m0=[" << m0 << "] m1=[" << m1 << "]" << std::endl;
lifespans[m0].push_back(a);
std::cout << "(^q^)zyumyo-!" << std::endl;
} // 要素数-1 で最後の要素
}
std::cout << "(^-^)Loop end." << std::endl;
if ("" == queueNames[0] || "" == queueNames[1]) {
std::cerr << "コマンドライン引数の「--enqueue キュー名 寿命」「--dequeue キュー名 寿命」を漏れなく指定してください。例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete" << std::endl;
std::cerr << "queueNames[0]=[" << queueNames[0] << "]" << std::endl;
//std::cerr << "queueNames[1]=[" << queueNames[1] << "]" << std::endl;
//std::cerr << Dump();
exit(1);
}
std::cout << "(^q^)エンキューのキュー名=[" << queueNames[0] << "]" << std::endl;
std::cout << "(^q^)デキューのキュー名=[" << queueNames[1] << "]" << std::endl;
std::cout << "(^q^)エンキューの寿命の数=[" << lifespans[0].size() << "]" << std::endl;
std::cout << "(^q^)デキューの寿命の数=[" << lifespans[1].size() << "]" << std::endl;
// エンキュー
{
m0 = 0;
name_queues[ENQUEUE_INDEX] = queueNames[m0];
lifeSpan_queues[ENQUEUE_INDEX] = (LifeSpanType)0;
{
for (std::string lifespan : lifespans[m0])
{
lifeSpan_queues[ENQUEUE_INDEX] = (LifeSpanType)((int)lifeSpan_queues[ENQUEUE_INDEX] | (int)LifeSpanString_To_Enum(lifespan));
}
}
}
std::cout << "(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan_queues[ENQUEUE_INDEX]=[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl;
// デキュー
{
m0 = 1;
name_queues[DEQUEUE_INDEX] = queueNames[m0];
lifeSpan_queues[DEQUEUE_INDEX] = (LifeSpanType)0;
{
for (std::string lifespan : lifespans[m0])
{
lifeSpan_queues[DEQUEUE_INDEX] = (LifeSpanType)((int)lifeSpan_queues[DEQUEUE_INDEX] | (int)LifeSpanString_To_Enum(lifespan));
}
}
}
std::cout << "(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[" << name_queues[DEQUEUE_INDEX] << "] lifeSpan_queues[DEQUEUE_INDEX]=[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl;
std::cout << Dump();
std::cout << "(^q^)ダンプおわり" << std::endl;
}
else
{
std::cerr << "コマンドライン引数を指定してください。例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete";
exit(1);
}
// デキューの常時監視をスタート
auto th1 = std::thread([] { workConsume(); });
// 無限ループ
std::cout << "(^q^)メッセージ監視ループに入るぜ☆" << std::endl;
for (;;)
{
std::string message = rotationBuffer::getMessage();
if ("" != message)
{
std::cout << "(^q^)メッセージを受け取ったか?" << std::endl;
// 末尾に daze を付ける。
message += "daze";
// エンキューする
enqueue( message);
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
// このプログラムは、自分では接続を切らない。
// th1 スレッドの終了を待つ
th1.join();
std::cout << "(^q^)Finished!" << std::endl;
return 0;
}
# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 1
(^q^)ファクトリーを用意したぜ☆ HostName=[localhost]
(^q^)接続したぜ☆
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [1]
name_queues[index] = [1113]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [False]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[1]
(^q^)コンシューマーを取得したぜ☆ index=[1]
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > usi
(^q^)Enqueue: エンキュー開始☆ message=[usi]
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [0]
name_queues[index] = [1112]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [True]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[0]
(^q^)Enqueue: body.Length=[3]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
Enqueue? >
反応がない。
> [1]+ Aborted ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null (wd: /home/★user/shogi/cpp_service)
(wd now: /home/★user/shogi/csharp_service)
落ちてる。
tamesi35a2_cpp.err.log
tamesi35a2_cpp.exe: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.
シングルトン化を進めるか。
tamesi35a2_cpp.out.log
(^q^)Welcome! AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
(^q^)cmdArg=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^q^)data.str()=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^_^)a0=[--enqueue]
(^q^)エンキュー!
(^_^)a0=[1113]
(^q^)エンキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[0] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[--dequeue]
(^q^)デキュー!
(^_^)a0=[1112]
(^q^)デキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[autodelete]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^-^)Loop end.
(^q^)エンキューのキュー名=[1113]
(^q^)デキューのキュー名=[1112]
(^q^)エンキューの寿命の数=[1]
(^q^)デキューの寿命の数=[2]
(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[1113] lifeSpan_queues[ENQUEUE_INDEX]=[1]
(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[1112] lifeSpan_queues[DEQUEUE_INDEX]=[3]
tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
(^q^)ダンプおわり
(^q^)メッセージ監視ループに入るぜ☆
(^q^)コンシューム開始だぜ☆
(^▽^)接続したぜ☆
(^q^)チャンネル開いたぜ☆
(^q^)デクレア・キューしたぜ☆
(^q^)コンシュームしたぜ☆
(^q^)メッセージを受け取ったか?
(^▽^)エンキュー開始だぜ☆
(^▽^)チャンネル開いたぜ☆
(^▽^)デクレア・キューしたぜ☆
(^▽^)バインド・キューしたぜ☆
こっちは バインド・キューして、あとはパブリッシュか。
チャンネルもシングルトンにすると
こんな感じだろうか?
//--------------------------------------------------------------------------------
// OS : Windows10 : // このプログラムは動かない
//
// OS : Ubuntu 16.04
// Library : libev
// : Install : Command : sudo apt-get update
// : sudo apt-get install libev-dev
// Service : RabbitMQ
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Manual : Command : man rabbitmqctl
// : Start : Command : rabbitmq-server
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : : Command : rabbitmqctl list_queues
// Library : AMQP-CPP
// : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
// : Reference : Web site : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
// : Reference : Web site : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
// : Reference : Web site : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
// : Reference : Web site : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
// : Reference : Web site : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//
// Program : this
// : Compile : Command : g++ -std=c++11 tamesi35a2_cpp.cpp -o tamesi35a2_cpp.exe -lev -lamqpcpp -pthread
// : Execute : Command : // バックグラウンドで実行
// : ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
// : Stop : Typing : [Ctrl]+[C]
//--------------------------------------------------------------------------------
//
// メッセージの末尾に daze を付けます。
// デキューは「1112」、エンキューは「1113」キューに向けて行います。
// 標準入出力は、キーボード、画面には接続しないようにします。
#include <string> // std::string
#include <iostream> // std::cout
#include <sstream> // std::ostringstream
#include <chrono>
#include <thread> // sleep_for
#include <future> // thread
// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
// このプログラム内だけで使われる数字。AMQP-CPPでの実装とは異なる。
// AMQP-CPPでの実装 : AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
enum LifeSpanType
{
// RabbitMQが止まってもキューを残す
durable = 1,
// コンシューマーが1人も接続していなかったら消す
autodelete = 0x1 << 1,
// キューが存在するかどうかチェックするだけ。中身見ない時これ
passive = 0x1 << 2,
// この接続でだけ使える。この接続が切れたら消す
exclusive = 0x1 << 3
};
static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
// 0 : enqueue用設定
// 1 : dequeue用設定
static const int ENQUEUE_INDEX = 0;
static const int DEQUEUE_INDEX = 1;
static const int NUM_INDEX = 2;
static std::string name_queues[] = { "", "" };
static LifeSpanType lifeSpan_queues[] = { (LifeSpanType)0, (LifeSpanType)0 };
static LifeSpanType LifeSpanString_To_Enum(std::string lifeSpan)
{
if ("durable"==lifeSpan) {
return durable;
} else if ("autodelete" == lifeSpan) {
return autodelete;
} else if ("passive" == lifeSpan) {
return passive;
} else if ("exclusive" == lifeSpan) {
return exclusive;
} else {
std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
exit(1);
}
}
static int LifeSpanString_To_AmqpInt(LifeSpanType lifeSpan)
{
int amqpInt = 0;
if ( ((int)durable & (int)lifeSpan) == (int)durable) {
amqpInt |= AMQP::durable;
}
if (((int)autodelete & (int)lifeSpan) == (int)autodelete) {
amqpInt |= AMQP::autodelete;
}
if (((int)passive & (int)lifeSpan) == (int)passive) {
amqpInt |= AMQP::passive;
}
if (((int)exclusive & (int)lifeSpan) == (int)exclusive) {
amqpInt |= AMQP::exclusive;
}
return amqpInt;
}
static std::string Dump()
{
static std::ostringstream sb;
sb << "tamesi35a2_cs.cs Dump" << std::endl
<< " name_queues [ENQUEUE_INDEX] =[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
<< " lifeSpan_queues [ENQUEUE_INDEX] =[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl
<< " ----" << std::endl
<< " name_queues [DEQUEUE_INDEX] =[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl
<< " lifeSpan_queues [DEQUEUE_INDEX] =[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl
<< " ----" << std::endl;
return sb.str();
}
// 接続はシングルトンにします
static struct ev_loop* pLoop_ev = EV_DEFAULT;
static AMQP::LibEvHandler handler_ev{ pLoop_ev };
static AMQP::TcpConnection* pConnection_ev = nullptr;
static AMQP::TcpConnection* getConnection()
{
if (nullptr== pConnection_ev) {
pConnection_ev = new AMQP::TcpConnection( &handler_ev, ADDRESS );
std::cout << "(^▽^)接続したぜ☆" << std::endl;
}
return pConnection_ev;
}
static void closeConnection()
{
if (nullptr != pConnection_ev) {
pConnection_ev->close();
pConnection_ev = nullptr;
}
}
// チャンネルはシングルトンにします。
static AMQP::TcpChannel* pChannel_ev = nullptr;
static AMQP::TcpChannel* getChannel()
{
if (nullptr== pChannel_ev)
{
pChannel_ev = new AMQP::TcpChannel(getConnection());
std::cout << "(^▽^)チャンネル開いたぜ☆" << std::endl;
}
return pChannel_ev;
}
static void closeChannel()
{
if (nullptr != pChannel_ev) {
pChannel_ev->close();
pChannel_ev = nullptr;
}
}
/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
const int bufferSize = 100;
static std::string buffer[bufferSize] = {};
static int bufferCursors[2] = { 0, 0 };
const int PUT_INDEX = 0;
const int GET_INDEX = 1;
std::mutex _mutex;
static void putMessage(std::string message)
{
std::unique_lock<std::mutex> lock(_mutex);
buffer[bufferCursors[PUT_INDEX]] = message;
bufferCursors[PUT_INDEX]++;
if (!(bufferCursors[PUT_INDEX] < bufferSize))
{
bufferCursors[PUT_INDEX] = 0;
}
}
static std::string getMessage()
{
std::unique_lock<std::mutex> lock(_mutex);
if ("" != buffer[bufferCursors[GET_INDEX]])
{
std::string message = buffer[bufferCursors[GET_INDEX]];
buffer[bufferCursors[GET_INDEX]] = "";
bufferCursors[GET_INDEX]++;
if (!(bufferCursors[GET_INDEX] < bufferSize))
{
bufferCursors[GET_INDEX] = 0;
}
return message;
}
return "";
}
}
// メッセージキューにエンキュー
static void enqueue(std::string message)
{
std::cout << "(^▽^)エンキュー開始だぜ☆" << std::endl;
std::string exchange_name = "myexchange";
std::string routing_key = "";
int lifeSpan = LifeSpanString_To_AmqpInt(lifeSpan_queues[ENQUEUE_INDEX]);
getChannel()->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
.onError([&lifeSpan](const char* errMsg) {
std::cerr << "error declaring queue: " << errMsg << " lifeSpan=[" << lifeSpan << "]" << "\n";
});
std::cout << "(^▽^)デクレア・キューしたぜ☆" << std::endl;
getChannel()->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
.onSuccess([&exchange_name, &routing_key, &message]() {
if (!getChannel()->publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cerr << "failed to publish?\n";
}
// break in ev loop.
closeConnection();
});
std::cout << "(^▽^)バインド・キューしたぜ☆" << std::endl;
// このループは、パブリッシュ後に抜ける
ev_run(pLoop_ev);
std::cout << "(^▽^)バインド・キューのループから抜けたぜ☆" << std::endl;
}
// 受信できたときに割り込んでくる処理
// startConsume() しておくこと。
static std::string dequeue() {
std::string message;
while ("" == message)
{
message = rotationBuffer::getMessage();
}
return message;
}
//// メッセージ・キューの送信を担当
//static void workProduce()
//{
//
//}
// メッセージ・キューの監視を開始
static void workConsume()
{
std::cout << "(^q^)コンシューム開始だぜ☆" << std::endl;
// I will go to the front of the box named "1111".
int lifeSpan = LifeSpanString_To_AmqpInt(lifeSpan_queues[DEQUEUE_INDEX]);
getChannel()->declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);
std::cout << "(^q^)デクレア・キューしたぜ☆" << std::endl;
// I look inside the box.
auto errorCb = [&lifeSpan](const char *errMsg) {
std::cerr << "My ID watching failed [" << errMsg << "] lifeSpan=[" << lifeSpan << "]" << std::endl;
};
auto messageCb = [](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::string myString(message.body(), message.bodySize());
rotationBuffer::putMessage(myString);
getChannel()->ack(deliveryTag);
};
getChannel()->consume(name_queues[DEQUEUE_INDEX])
.onReceived(messageCb)
.onError(errorCb);
std::cout << "(^q^)コンシュームしたぜ☆" << std::endl;
// I will keep on forever.
ev_run(pLoop_ev);
std::cout << "(^q^)コンシュームのループから抜けたぜ☆?" << std::endl;
// I will not come here.
return;
}
int main(int argc, char* argv[])
{
std::cout << "(^q^)Welcome! AMQP::durable=[" << AMQP::durable << "] AMQP::autodelete=[" << AMQP::autodelete << "] AMQP::passive=[" << AMQP::passive << "] AMQP::exclusive=[" << AMQP::exclusive << "]" << std::endl;
// 引数の解析
if(0<argc)
{
// プログラム名を省き、コマンドライン引数だけをつなげる。
std::string cmdArg;
for (int i = 1; i < argc; ++i)
{
cmdArg += std::string(argv[i]);
if (i < argc) {
cmdArg += " ";
}
}
std::cout << "(^q^)cmdArg=[" << cmdArg << "]" << std::endl;
std::istringstream data(cmdArg);
//data >> std::skipws;
std::cout << "(^q^)data.str()=[" << data.str() << "]" << std::endl;
// 与件
// 「--enqueue 1117 durable autodelete --dequeue 1116 durable autodelete」
// 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」
// 受け皿
std::string queueNames[2] = {}; // 結果はこれらの配列に入れる
std::vector<std::string> lifespans[2] = {};
// 記憶
int m0 = -1; // 0:enqueue, 1:dequeue
int m1 = -1; // lifespans index.
// 解析器
std::string a;
while (data >> a) {
std::cout << "(^_^)a0=[" << a << "]" << std::endl;
if ("--enqueue" == a)
{
std::cout << "(^q^)エンキュー!" << std::endl;
m0 = 0;
m1 = -1;
}
else if ("--dequeue" == a)
{
std::cout << "(^q^)デキュー!" << std::endl;
m0 = 1;
m1 = -1;
}
else if (-1 == m1 && 0 == m0) {
std::cout << "(^q^)エンキューのキュー名!" << std::endl;
queueNames[m0] = a; // キュー名
lifespans[m0] = std::vector<std::string>();// 配列の中にリストを準備
m1++;
}
else if (-1 == m1 && 1 == m0) {
std::cout << "(^q^)デキューのキュー名!" << std::endl;
queueNames[m0] = a; // キュー名
lifespans[m0] = std::vector<std::string>();// 配列の中にリストを準備
m1++;
}
else
{
std::cout << "(^q^)寿命! m0=[" << m0 << "] m1=[" << m1 << "]" << std::endl;
lifespans[m0].push_back(a);
std::cout << "(^q^)zyumyo-!" << std::endl;
} // 要素数-1 で最後の要素
}
std::cout << "(^-^)Loop end." << std::endl;
if ("" == queueNames[0] || "" == queueNames[1]) {
std::cerr << "コマンドライン引数の「--enqueue キュー名 寿命」「--dequeue キュー名 寿命」を漏れなく指定してください。例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete" << std::endl;
std::cerr << "queueNames[0]=[" << queueNames[0] << "]" << std::endl;
//std::cerr << "queueNames[1]=[" << queueNames[1] << "]" << std::endl;
//std::cerr << Dump();
exit(1);
}
std::cout << "(^q^)エンキューのキュー名=[" << queueNames[0] << "]" << std::endl;
std::cout << "(^q^)デキューのキュー名=[" << queueNames[1] << "]" << std::endl;
std::cout << "(^q^)エンキューの寿命の数=[" << lifespans[0].size() << "]" << std::endl;
std::cout << "(^q^)デキューの寿命の数=[" << lifespans[1].size() << "]" << std::endl;
// エンキュー
{
m0 = 0;
name_queues[ENQUEUE_INDEX] = queueNames[m0];
lifeSpan_queues[ENQUEUE_INDEX] = (LifeSpanType)0;
{
for (std::string lifespan : lifespans[m0])
{
lifeSpan_queues[ENQUEUE_INDEX] = (LifeSpanType)((int)lifeSpan_queues[ENQUEUE_INDEX] | (int)LifeSpanString_To_Enum(lifespan));
}
}
}
std::cout << "(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan_queues[ENQUEUE_INDEX]=[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl;
// デキュー
{
m0 = 1;
name_queues[DEQUEUE_INDEX] = queueNames[m0];
lifeSpan_queues[DEQUEUE_INDEX] = (LifeSpanType)0;
{
for (std::string lifespan : lifespans[m0])
{
lifeSpan_queues[DEQUEUE_INDEX] = (LifeSpanType)((int)lifeSpan_queues[DEQUEUE_INDEX] | (int)LifeSpanString_To_Enum(lifespan));
}
}
}
std::cout << "(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[" << name_queues[DEQUEUE_INDEX] << "] lifeSpan_queues[DEQUEUE_INDEX]=[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl;
std::cout << Dump();
std::cout << "(^q^)ダンプおわり" << std::endl;
}
else
{
std::cerr << "コマンドライン引数を指定してください。例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete";
exit(1);
}
// デキューの常時監視をスタート
auto th1 = std::thread([] { workConsume(); });
// 無限ループ
std::cout << "(^q^)メッセージ監視ループに入るぜ☆" << std::endl;
for (;;)
{
std::string message = rotationBuffer::getMessage();
if ("" != message)
{
std::cout << "(^q^)メッセージを受け取ったか?" << std::endl;
// 末尾に daze を付ける。
message += "daze";
// エンキューする
enqueue( message);
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
// このプログラムは、自分では接続を切らない。
// th1 スレッドの終了を待つ
th1.join();
std::cout << "(^q^)Finished!" << std::endl;
return 0;
}
# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 1
(^q^)ファクトリーを用意したぜ☆ HostName=[localhost]
(^q^)接続したぜ☆
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [1]
name_queues[index] = [1113]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [False]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[1]
(^q^)コンシューマーを取得したぜ☆ index=[1]
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > usi
(^q^)Enqueue: エンキュー開始☆ message=[usi]
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [0]
name_queues[index] = [1112]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [True]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[0]
(^q^)Enqueue: body.Length=[3]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
Enqueue? >
反応がない。
[1]+ Aborted ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null (wd: /home/★user/shogi/cpp_service)
(wd now: /home/★user/shogi/csharp_service)
tamesi35a2_cpp.out.log
(^q^)Welcome! AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
(^q^)cmdArg=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^q^)data.str()=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^_^)a0=[--enqueue]
(^q^)エンキュー!
(^_^)a0=[1113]
(^q^)エンキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[0] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[--dequeue]
(^q^)デキュー!
(^_^)a0=[1112]
(^q^)デキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[autodelete]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^-^)Loop end.
(^q^)エンキューのキュー名=[1113]
(^q^)デキューのキュー名=[1112]
(^q^)エンキューの寿命の数=[1]
(^q^)デキューの寿命の数=[2]
(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[1113] lifeSpan_queues[ENQUEUE_INDEX]=[1]
(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[1112] lifeSpan_queues[DEQUEUE_INDEX]=[3]
tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
(^q^)ダンプおわり
(^q^)メッセージ監視ループに入るぜ☆
(^q^)コンシューム開始だぜ☆
(^▽^)接続したぜ☆
(^▽^)チャンネル開いたぜ☆
(^q^)デクレア・キューしたぜ☆
(^q^)コンシュームしたぜ☆
(^q^)メッセージを受け取ったか?
(^▽^)エンキュー開始だぜ☆
(^▽^)デクレア・キューしたぜ☆
(^▽^)バインド・キューしたぜ☆
tamesi35a2_cpp.err.log
tamesi35a2_cpp.exe: ev.c:3541: ev_run: Assertion `("libev: ev_loop recursion during release detected", ((loop)->loop_done) != 0x80)' failed.
接続を閉じる部分
static void closeConnection()
{
if (nullptr != pConnection_ev) {
std::cout << "(^▽^)これから接続を閉じるぜ☆" << std::endl;
pConnection_ev->close();
std::cout << "(^▽^)接続を閉じたぜ☆" << std::endl;
pConnection_ev = nullptr;
}
}
こんどはエラーログが吐かれてない。
もう1回やったら吐き出されていた。
(^q^)Welcome! AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
(^q^)cmdArg=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^q^)data.str()=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^_^)a0=[--enqueue]
(^q^)エンキュー!
(^_^)a0=[1113]
(^q^)エンキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[0] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[--dequeue]
(^q^)デキュー!
(^_^)a0=[1112]
(^q^)デキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[autodelete]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^-^)Loop end.
(^q^)エンキューのキュー名=[1113]
(^q^)デキューのキュー名=[1112]
(^q^)エンキューの寿命の数=[1]
(^q^)デキューの寿命の数=[2]
(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[1113] lifeSpan_queues[ENQUEUE_INDEX]=[1]
(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[1112] lifeSpan_queues[DEQUEUE_INDEX]=[3]
tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
(^q^)ダンプおわり
(^q^)メッセージ監視ループに入るぜ☆
(^q^)コンシューム開始だぜ☆
(^▽^)接続したぜ☆
(^▽^)チャンネル開いたぜ☆
(^q^)デクレア・キューしたぜ☆
(^q^)コンシュームしたぜ☆
(^q^)メッセージを受け取ったか?
(^▽^)エンキュー開始だぜ☆
(^▽^)デクレア・キューしたぜ☆
(^▽^)バインド・キューしたぜ☆
しかし abort では、吐き出す間もないんじゃないか?
std::cout << "(^q^)メッセージを受け取ったか? message=[" << message << "]" << std::endl;
ちゃんと 受け取ってるのか調べてみよう。
getChannel()->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
.onSuccess([&exchange_name, &routing_key, &message]() {
std::cout << "(^▽^)これからパブリッシュするぜ☆" << std::endl;
if (!getChannel()->publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cerr << "failed to publish?\n";
}
std::cout << "(^▽^)パブリッシュしたぜ☆" << std::endl;
// break in ev loop.
closeConnection();
std::cout << "(^▽^)オンサクセスを終わるぜ☆" << std::endl;
});
このあたりもくわしく。
(^q^)Welcome! AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
(^q^)cmdArg=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^q^)data.str()=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^_^)a0=[--enqueue]
(^q^)エンキュー!
(^_^)a0=[1113]
(^q^)エンキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[0] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[--dequeue]
(^q^)デキュー!
(^_^)a0=[1112]
(^q^)デキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[autodelete]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^-^)Loop end.
(^q^)エンキューのキュー名=[1113]
(^q^)デキューのキュー名=[1112]
(^q^)エンキューの寿命の数=[1]
(^q^)デキューの寿命の数=[2]
(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[1113] lifeSpan_queues[ENQUEUE_INDEX]=[1]
(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[1112] lifeSpan_queues[DEQUEUE_INDEX]=[3]
tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
(^q^)ダンプおわり
(^q^)メッセージ監視ループに入るぜ☆
(^q^)コンシューム開始だぜ☆
(^▽^)接続したぜ☆
(^▽^)チャンネル開いたぜ☆
(^q^)デクレア・キューしたぜ☆
(^q^)コンシュームしたぜ☆
(^q^)メッセージを受け取ったか? message=[usi]
(^▽^)エンキュー開始だぜ☆
(^▽^)デクレア・キューしたぜ☆
(^▽^)バインド・キューしたぜ☆
usi は受け取れているんだがなぁ。
エンキューとデキューで分けてみよう
こんなんでどうか?
//--------------------------------------------------------------------------------
// OS : Windows10 : // このプログラムは動かない
//
// OS : Ubuntu 16.04
// Library : libev
// : Install : Command : sudo apt-get update
// : sudo apt-get install libev-dev
// Service : RabbitMQ
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Manual : Command : man rabbitmqctl
// : Start : Command : rabbitmq-server
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : : Command : rabbitmqctl list_queues
// Library : AMQP-CPP
// : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
// : Reference : Web site : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
// : Reference : Web site : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
// : Reference : Web site : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
// : Reference : Web site : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
// : Reference : Web site : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//
// Program : this
// : Compile : Command : g++ -std=c++11 tamesi35a2_cpp.cpp -o tamesi35a2_cpp.exe -lev -lamqpcpp -pthread
// : Execute : Command : // バックグラウンドで実行
// : ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
// : Stop : Typing : [Ctrl]+[C]
//--------------------------------------------------------------------------------
//
// メッセージの末尾に daze を付けます。
// デキューは「1112」、エンキューは「1113」キューに向けて行います。
// 標準入出力は、キーボード、画面には接続しないようにします。
#include <string> // std::string
#include <iostream> // std::cout
#include <sstream> // std::ostringstream
#include <chrono>
#include <thread> // sleep_for
#include <future> // thread
// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
// このプログラム内だけで使われる数字。AMQP-CPPでの実装とは異なる。
// AMQP-CPPでの実装 : AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
enum LifeSpanType
{
// RabbitMQが止まってもキューを残す
durable = 1,
// コンシューマーが1人も接続していなかったら消す
autodelete = 0x1 << 1,
// キューが存在するかどうかチェックするだけ。中身見ない時これ
passive = 0x1 << 2,
// この接続でだけ使える。この接続が切れたら消す
exclusive = 0x1 << 3
};
static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
// 0 : enqueue用設定
// 1 : dequeue用設定
static const int ENQUEUE_INDEX = 0;
static const int DEQUEUE_INDEX = 1;
static const int NUM_INDEX = 2;
static std::string name_queues[] = { "", "" };
static LifeSpanType lifeSpan_queues[] = { (LifeSpanType)0, (LifeSpanType)0 };
static LifeSpanType LifeSpanString_To_Enum(std::string lifeSpan)
{
if ("durable"==lifeSpan) {
return durable;
} else if ("autodelete" == lifeSpan) {
return autodelete;
} else if ("passive" == lifeSpan) {
return passive;
} else if ("exclusive" == lifeSpan) {
return exclusive;
} else {
std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
exit(1);
}
}
static int LifeSpanString_To_AmqpInt(LifeSpanType lifeSpan)
{
int amqpInt = 0;
if ( ((int)durable & (int)lifeSpan) == (int)durable) {
amqpInt |= AMQP::durable;
}
if (((int)autodelete & (int)lifeSpan) == (int)autodelete) {
amqpInt |= AMQP::autodelete;
}
if (((int)passive & (int)lifeSpan) == (int)passive) {
amqpInt |= AMQP::passive;
}
if (((int)exclusive & (int)lifeSpan) == (int)exclusive) {
amqpInt |= AMQP::exclusive;
}
return amqpInt;
}
static std::string Dump()
{
static std::ostringstream sb;
sb << "tamesi35a2_cs.cs Dump" << std::endl
<< " name_queues [ENQUEUE_INDEX] =[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
<< " lifeSpan_queues [ENQUEUE_INDEX] =[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl
<< " ----" << std::endl
<< " name_queues [DEQUEUE_INDEX] =[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl
<< " lifeSpan_queues [DEQUEUE_INDEX] =[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl
<< " ----" << std::endl;
return sb.str();
}
// 接続はシングルトンにします
static struct ev_loop* pLoop_ev[2] = { EV_DEFAULT, EV_DEFAULT };
static AMQP::LibEvHandler* pHandler_ev[2] = { new AMQP::LibEvHandler(pLoop_ev[ENQUEUE_INDEX]), new AMQP::LibEvHandler(pLoop_ev[DEQUEUE_INDEX]) };
static AMQP::TcpConnection* pConnection_ev[2] = { nullptr,nullptr };
static AMQP::TcpConnection* getConnection(int index)
{
if (nullptr== pConnection_ev[index]) {
pConnection_ev[index] = new AMQP::TcpConnection( pHandler_ev[index], ADDRESS );
std::cout << "(^▽^)接続したぜ☆ index=[" << index << "]" << std::endl;
}
return pConnection_ev[index];
}
static void closeConnection(int index)
{
if (nullptr != pConnection_ev[index]) {
std::cout << "(^▽^)これから接続を閉じるぜ☆ index=[" << index << "]" << std::endl;
pConnection_ev[index]->close();
std::cout << "(^▽^)接続を閉じたぜ☆" << std::endl;
pConnection_ev[index] = nullptr;
}
}
// チャンネルはシングルトンにします。
static AMQP::TcpChannel* pChannel_ev[2] = { nullptr,nullptr };
static AMQP::TcpChannel* getChannel(int index)
{
if (nullptr== pChannel_ev[index])
{
pChannel_ev[index] = new AMQP::TcpChannel(getConnection(index));
std::cout << "(^▽^)チャンネル開いたぜ☆ index=[" << index << "]" << std::endl;
}
return pChannel_ev[index];
}
static void closeChannel(int index)
{
if (nullptr != pChannel_ev[index]) {
pChannel_ev[index]->close();
pChannel_ev[index] = nullptr;
}
}
/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
const int bufferSize = 100;
static std::string buffer[bufferSize] = {};
static int bufferCursors[2] = { 0, 0 };
const int PUT_INDEX = 0;
const int GET_INDEX = 1;
std::mutex _mutex;
static void putMessage(std::string message)
{
std::unique_lock<std::mutex> lock(_mutex);
buffer[bufferCursors[PUT_INDEX]] = message;
bufferCursors[PUT_INDEX]++;
if (!(bufferCursors[PUT_INDEX] < bufferSize))
{
bufferCursors[PUT_INDEX] = 0;
}
}
static std::string getMessage()
{
std::unique_lock<std::mutex> lock(_mutex);
if ("" != buffer[bufferCursors[GET_INDEX]])
{
std::string message = buffer[bufferCursors[GET_INDEX]];
buffer[bufferCursors[GET_INDEX]] = "";
bufferCursors[GET_INDEX]++;
if (!(bufferCursors[GET_INDEX] < bufferSize))
{
bufferCursors[GET_INDEX] = 0;
}
return message;
}
return "";
}
}
// メッセージキューにエンキュー
static void enqueue(std::string message)
{
std::cout << "(^▽^)エンキュー開始だぜ☆" << std::endl;
std::string exchange_name = "myexchange";
std::string routing_key = "";
int lifeSpan = LifeSpanString_To_AmqpInt(lifeSpan_queues[ENQUEUE_INDEX]);
getChannel(ENQUEUE_INDEX)->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
.onError([&lifeSpan](const char* errMsg) {
std::cerr << "error declaring queue: " << errMsg << " lifeSpan=[" << lifeSpan << "]" << "\n";
});
std::cout << "(^▽^)デクレア・キューしたぜ☆" << std::endl;
getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
.onSuccess([&exchange_name, &routing_key, &message]() {
std::cout << "(^▽^)これからパブリッシュするぜ☆" << std::endl;
if (!getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cerr << "failed to publish?\n";
}
std::cout << "(^▽^)パブリッシュしたぜ☆" << std::endl;
// break in ev loop.
closeConnection(ENQUEUE_INDEX);
std::cout << "(^▽^)オンサクセスを終わるぜ☆" << std::endl;
});
std::cout << "(^▽^)バインド・キューしたぜ☆" << std::endl;
// このループは、パブリッシュ後に抜ける
ev_run(pLoop_ev[ENQUEUE_INDEX]);
std::cout << "(^▽^)バインド・キューのループから抜けたぜ☆" << std::endl;
}
// 受信できたときに割り込んでくる処理
// startConsume() しておくこと。
static std::string dequeue() {
std::string message;
while ("" == message)
{
message = rotationBuffer::getMessage();
}
return message;
}
//// メッセージ・キューの送信を担当
//static void workProduce()
//{
//
//}
// メッセージ・キューの監視を開始
static void workConsume()
{
std::cout << "(^q^)コンシューム開始だぜ☆" << std::endl;
// I will go to the front of the box named "1111".
int lifeSpan = LifeSpanString_To_AmqpInt(lifeSpan_queues[DEQUEUE_INDEX]);
getChannel(DEQUEUE_INDEX)->declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);
std::cout << "(^q^)デクレア・キューしたぜ☆" << std::endl;
// I look inside the box.
auto errorCb = [&lifeSpan](const char *errMsg) {
std::cerr << "My ID watching failed [" << errMsg << "] lifeSpan=[" << lifeSpan << "]" << std::endl;
};
auto messageCb = [](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::string myString(message.body(), message.bodySize());
rotationBuffer::putMessage(myString);
getChannel(DEQUEUE_INDEX)->ack(deliveryTag);
};
getChannel(DEQUEUE_INDEX)->consume(name_queues[DEQUEUE_INDEX])
.onReceived(messageCb)
.onError(errorCb);
std::cout << "(^q^)コンシュームしたぜ☆" << std::endl;
// I will keep on forever.
ev_run(pLoop_ev[DEQUEUE_INDEX]);
std::cout << "(^q^)コンシュームのループから抜けたぜ☆?" << std::endl;
// I will not come here.
return;
}
int main(int argc, char* argv[])
{
std::cout << "(^q^)Welcome! AMQP::durable=[" << AMQP::durable << "] AMQP::autodelete=[" << AMQP::autodelete << "] AMQP::passive=[" << AMQP::passive << "] AMQP::exclusive=[" << AMQP::exclusive << "]" << std::endl;
// 引数の解析
if(0<argc)
{
// プログラム名を省き、コマンドライン引数だけをつなげる。
std::string cmdArg;
for (int i = 1; i < argc; ++i)
{
cmdArg += std::string(argv[i]);
if (i < argc) {
cmdArg += " ";
}
}
std::cout << "(^q^)cmdArg=[" << cmdArg << "]" << std::endl;
std::istringstream data(cmdArg);
//data >> std::skipws;
std::cout << "(^q^)data.str()=[" << data.str() << "]" << std::endl;
// 与件
// 「--enqueue 1117 durable autodelete --dequeue 1116 durable autodelete」
// 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」
// 受け皿
std::string queueNames[2] = {}; // 結果はこれらの配列に入れる
std::vector<std::string> lifespans[2] = {};
// 記憶
int m0 = -1; // 0:enqueue, 1:dequeue
int m1 = -1; // lifespans index.
// 解析器
std::string a;
while (data >> a) {
std::cout << "(^_^)a0=[" << a << "]" << std::endl;
if ("--enqueue" == a)
{
std::cout << "(^q^)エンキュー!" << std::endl;
m0 = 0;
m1 = -1;
}
else if ("--dequeue" == a)
{
std::cout << "(^q^)デキュー!" << std::endl;
m0 = 1;
m1 = -1;
}
else if (-1 == m1 && 0 == m0) {
std::cout << "(^q^)エンキューのキュー名!" << std::endl;
queueNames[m0] = a; // キュー名
lifespans[m0] = std::vector<std::string>();// 配列の中にリストを準備
m1++;
}
else if (-1 == m1 && 1 == m0) {
std::cout << "(^q^)デキューのキュー名!" << std::endl;
queueNames[m0] = a; // キュー名
lifespans[m0] = std::vector<std::string>();// 配列の中にリストを準備
m1++;
}
else
{
std::cout << "(^q^)寿命! m0=[" << m0 << "] m1=[" << m1 << "]" << std::endl;
lifespans[m0].push_back(a);
std::cout << "(^q^)zyumyo-!" << std::endl;
} // 要素数-1 で最後の要素
}
std::cout << "(^-^)Loop end." << std::endl;
if ("" == queueNames[0] || "" == queueNames[1]) {
std::cerr << "コマンドライン引数の「--enqueue キュー名 寿命」「--dequeue キュー名 寿命」を漏れなく指定してください。例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete" << std::endl;
std::cerr << "queueNames[0]=[" << queueNames[0] << "]" << std::endl;
//std::cerr << "queueNames[1]=[" << queueNames[1] << "]" << std::endl;
//std::cerr << Dump();
exit(1);
}
std::cout << "(^q^)エンキューのキュー名=[" << queueNames[0] << "]" << std::endl;
std::cout << "(^q^)デキューのキュー名=[" << queueNames[1] << "]" << std::endl;
std::cout << "(^q^)エンキューの寿命の数=[" << lifespans[0].size() << "]" << std::endl;
std::cout << "(^q^)デキューの寿命の数=[" << lifespans[1].size() << "]" << std::endl;
// エンキュー
{
m0 = 0;
name_queues[ENQUEUE_INDEX] = queueNames[m0];
lifeSpan_queues[ENQUEUE_INDEX] = (LifeSpanType)0;
{
for (std::string lifespan : lifespans[m0])
{
lifeSpan_queues[ENQUEUE_INDEX] = (LifeSpanType)((int)lifeSpan_queues[ENQUEUE_INDEX] | (int)LifeSpanString_To_Enum(lifespan));
}
}
}
std::cout << "(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan_queues[ENQUEUE_INDEX]=[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl;
// デキュー
{
m0 = 1;
name_queues[DEQUEUE_INDEX] = queueNames[m0];
lifeSpan_queues[DEQUEUE_INDEX] = (LifeSpanType)0;
{
for (std::string lifespan : lifespans[m0])
{
lifeSpan_queues[DEQUEUE_INDEX] = (LifeSpanType)((int)lifeSpan_queues[DEQUEUE_INDEX] | (int)LifeSpanString_To_Enum(lifespan));
}
}
}
std::cout << "(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[" << name_queues[DEQUEUE_INDEX] << "] lifeSpan_queues[DEQUEUE_INDEX]=[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl;
std::cout << Dump();
std::cout << "(^q^)ダンプおわり" << std::endl;
}
else
{
std::cerr << "コマンドライン引数を指定してください。例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete";
exit(1);
}
// デキューの常時監視をスタート
auto th1 = std::thread([] { workConsume(); });
// 無限ループ
std::cout << "(^q^)メッセージ監視ループに入るぜ☆" << std::endl;
for (;;)
{
std::string message = rotationBuffer::getMessage();
if ("" != message)
{
std::cout << "(^q^)メッセージを受け取ったか? message=[" << message << "]" << std::endl;
// 末尾に daze を付ける。
message += "daze";
// エンキューする
enqueue( message);
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
// このプログラムは、自分では接続を切らない。
// th1 スレッドの終了を待つ
th1.join();
std::cout << "(^q^)Finished!" << std::endl;
return 0;
}
(^q^)Welcome! AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
(^q^)cmdArg=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^q^)data.str()=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^_^)a0=[--enqueue]
(^q^)エンキュー!
(^_^)a0=[1113]
(^q^)エンキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[0] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[--dequeue]
(^q^)デキュー!
(^_^)a0=[1112]
(^q^)デキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[autodelete]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^-^)Loop end.
(^q^)エンキューのキュー名=[1113]
(^q^)デキューのキュー名=[1112]
(^q^)エンキューの寿命の数=[1]
(^q^)デキューの寿命の数=[2]
(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[1113] lifeSpan_queues[ENQUEUE_INDEX]=[1]
(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[1112] lifeSpan_queues[DEQUEUE_INDEX]=[3]
tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
(^q^)ダンプおわり
(^q^)メッセージ監視ループに入るぜ☆
(^q^)コンシューム開始だぜ☆
(^▽^)接続したぜ☆ index=[1]
(^▽^)チャンネル開いたぜ☆ index=[1]
(^q^)デクレア・キューしたぜ☆
(^q^)コンシュームしたぜ☆
(^q^)メッセージを受け取ったか? message=[usi]
(^▽^)エンキュー開始だぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)デクレア・キューしたぜ☆
(^▽^)バインド・キューしたぜ☆
パブリッシュも別スレッドにするか。
あれ? 両方別スレッドにしたら メインスレッドが終わってしまうぜ。
パブリッシュの方だけ別スレッドにしよう。
//--------------------------------------------------------------------------------
// OS : Windows10 : // このプログラムは動かない
//
// OS : Ubuntu 16.04
// Library : libev
// : Install : Command : sudo apt-get update
// : sudo apt-get install libev-dev
// Service : RabbitMQ
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Manual : Command : man rabbitmqctl
// : Start : Command : rabbitmq-server
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : : Command : rabbitmqctl list_queues
// Library : AMQP-CPP
// : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
// : Reference : Web site : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
// : Reference : Web site : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
// : Reference : Web site : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
// : Reference : Web site : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
// : Reference : Web site : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//
// Program : this
// : Compile : Command : g++ -std=c++11 tamesi35a2_cpp.cpp -o tamesi35a2_cpp.exe -lev -lamqpcpp -pthread
// : Execute : Command : // バックグラウンドで実行
// : ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null &
// : Stop : Typing : [Ctrl]+[C]
//--------------------------------------------------------------------------------
//
// メッセージの末尾に daze を付けます。
// デキューは「1112」、エンキューは「1113」キューに向けて行います。
// 標準入出力は、キーボード、画面には接続しないようにします。
#include <string> // std::string
#include <iostream> // std::cout
#include <sstream> // std::ostringstream
#include <chrono>
#include <thread> // sleep_for
#include <future> // thread
// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
// このプログラム内だけで使われる数字。AMQP-CPPでの実装とは異なる。
// AMQP-CPPでの実装 : AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
enum LifeSpanType
{
// RabbitMQが止まってもキューを残す
durable = 1,
// コンシューマーが1人も接続していなかったら消す
autodelete = 0x1 << 1,
// キューが存在するかどうかチェックするだけ。中身見ない時これ
passive = 0x1 << 2,
// この接続でだけ使える。この接続が切れたら消す
exclusive = 0x1 << 3
};
static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
// 0 : enqueue用設定
// 1 : dequeue用設定
static const int ENQUEUE_INDEX = 0;
static const int DEQUEUE_INDEX = 1;
static const int NUM_INDEX = 2;
static std::string name_queues[] = { "", "" };
static LifeSpanType lifeSpan_queues[] = { (LifeSpanType)0, (LifeSpanType)0 };
static LifeSpanType LifeSpanString_To_Enum(std::string lifeSpan)
{
if ("durable"==lifeSpan) {
return durable;
} else if ("autodelete" == lifeSpan) {
return autodelete;
} else if ("passive" == lifeSpan) {
return passive;
} else if ("exclusive" == lifeSpan) {
return exclusive;
} else {
std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
exit(1);
}
}
static int LifeSpanString_To_AmqpInt(LifeSpanType lifeSpan)
{
int amqpInt = 0;
if ( ((int)durable & (int)lifeSpan) == (int)durable) {
amqpInt |= AMQP::durable;
}
if (((int)autodelete & (int)lifeSpan) == (int)autodelete) {
amqpInt |= AMQP::autodelete;
}
if (((int)passive & (int)lifeSpan) == (int)passive) {
amqpInt |= AMQP::passive;
}
if (((int)exclusive & (int)lifeSpan) == (int)exclusive) {
amqpInt |= AMQP::exclusive;
}
return amqpInt;
}
static std::string Dump()
{
static std::ostringstream sb;
sb << "tamesi35a2_cs.cs Dump" << std::endl
<< " name_queues [ENQUEUE_INDEX] =[" << name_queues[ENQUEUE_INDEX] << "]" << std::endl
<< " lifeSpan_queues [ENQUEUE_INDEX] =[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl
<< " ----" << std::endl
<< " name_queues [DEQUEUE_INDEX] =[" << name_queues[DEQUEUE_INDEX] << "]" << std::endl
<< " lifeSpan_queues [DEQUEUE_INDEX] =[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl
<< " ----" << std::endl;
return sb.str();
}
// 接続はシングルトンにします
static struct ev_loop* pLoop_ev[2] = { EV_DEFAULT, EV_DEFAULT };
static AMQP::LibEvHandler* pHandler_ev[2] = { new AMQP::LibEvHandler(pLoop_ev[ENQUEUE_INDEX]), new AMQP::LibEvHandler(pLoop_ev[DEQUEUE_INDEX]) };
static AMQP::TcpConnection* pConnection_ev[2] = { nullptr,nullptr };
static AMQP::TcpConnection* getConnection(int index)
{
if (nullptr== pConnection_ev[index]) {
pConnection_ev[index] = new AMQP::TcpConnection( pHandler_ev[index], ADDRESS );
std::cout << "(^▽^)接続したぜ☆ index=[" << index << "]" << std::endl;
}
return pConnection_ev[index];
}
static void closeConnection(int index)
{
if (nullptr != pConnection_ev[index]) {
std::cout << "(^▽^)これから接続を閉じるぜ☆ index=[" << index << "]" << std::endl;
pConnection_ev[index]->close();
std::cout << "(^▽^)接続を閉じたぜ☆" << std::endl;
pConnection_ev[index] = nullptr;
}
}
// チャンネルはシングルトンにします。
static AMQP::TcpChannel* pChannel_ev[2] = { nullptr,nullptr };
static AMQP::TcpChannel* getChannel(int index)
{
if (nullptr== pChannel_ev[index])
{
pChannel_ev[index] = new AMQP::TcpChannel(getConnection(index));
std::cout << "(^▽^)チャンネル開いたぜ☆ index=[" << index << "]" << std::endl;
}
return pChannel_ev[index];
}
static void closeChannel(int index)
{
if (nullptr != pChannel_ev[index]) {
pChannel_ev[index]->close();
pChannel_ev[index] = nullptr;
}
}
/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
const int bufferSize = 100;
static std::string buffer[bufferSize] = {};
static int bufferCursors[2] = { 0, 0 };
const int PUT_INDEX = 0;
const int GET_INDEX = 1;
std::mutex _mutex;
static void putMessage(std::string message)
{
std::unique_lock<std::mutex> lock(_mutex);
buffer[bufferCursors[PUT_INDEX]] = message;
bufferCursors[PUT_INDEX]++;
if (!(bufferCursors[PUT_INDEX] < bufferSize))
{
bufferCursors[PUT_INDEX] = 0;
}
}
static std::string getMessage()
{
std::unique_lock<std::mutex> lock(_mutex);
if ("" != buffer[bufferCursors[GET_INDEX]])
{
std::string message = buffer[bufferCursors[GET_INDEX]];
buffer[bufferCursors[GET_INDEX]] = "";
bufferCursors[GET_INDEX]++;
if (!(bufferCursors[GET_INDEX] < bufferSize))
{
bufferCursors[GET_INDEX] = 0;
}
return message;
}
return "";
}
}
// メッセージキューにエンキュー
static void enqueue(std::string message)
{
std::cout << "(^▽^)エンキュー開始だぜ☆" << std::endl;
std::string exchange_name = "myexchange";
std::string routing_key = "";
int lifeSpan = LifeSpanString_To_AmqpInt(lifeSpan_queues[ENQUEUE_INDEX]);
getChannel(ENQUEUE_INDEX)->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
.onError([&lifeSpan](const char* errMsg) {
std::cerr << "error declaring queue: " << errMsg << " lifeSpan=[" << lifeSpan << "]" << "\n";
});
std::cout << "(^▽^)デクレア・キューしたぜ☆" << std::endl;
getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
.onSuccess([&exchange_name, &routing_key, &message]() {
std::cout << "(^▽^)これからパブリッシュするぜ☆" << std::endl;
if (!getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cerr << "failed to publish?\n";
}
std::cout << "(^▽^)パブリッシュしたぜ☆" << std::endl;
// break in ev loop.
closeConnection(ENQUEUE_INDEX);
std::cout << "(^▽^)オンサクセスを終わるぜ☆" << std::endl;
});
std::cout << "(^▽^)バインド・キューしたぜ☆" << std::endl;
// このループは、パブリッシュ後に抜ける
ev_run(pLoop_ev[ENQUEUE_INDEX]);
std::cout << "(^▽^)バインド・キューのループから抜けたぜ☆" << std::endl;
}
// 受信できたときに割り込んでくる処理
// startConsume() しておくこと。
static std::string dequeue() {
std::string message;
while ("" == message)
{
message = rotationBuffer::getMessage();
}
return message;
}
// メッセージ・キューの送信を担当
static void workProduce()
{
// 無限ループ
std::cout << "(^q^)メッセージ監視ループに入るぜ☆" << std::endl;
for (;;)
{
std::string message = rotationBuffer::getMessage();
if ("" != message)
{
std::cout << "(^q^)メッセージを受け取ったか? message=[" << message << "]" << std::endl;
// 末尾に daze を付ける。
message += "daze";
// エンキューする
enqueue(message);
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
}
// メッセージ・キューの監視を開始
static void workConsume()
{
std::cout << "(^q^)コンシューム開始だぜ☆" << std::endl;
// I will go to the front of the box named "1111".
int lifeSpan = LifeSpanString_To_AmqpInt(lifeSpan_queues[DEQUEUE_INDEX]);
getChannel(DEQUEUE_INDEX)->declareQueue(name_queues[DEQUEUE_INDEX], lifeSpan);
std::cout << "(^q^)デクレア・キューしたぜ☆" << std::endl;
// I look inside the box.
auto errorCb = [&lifeSpan](const char *errMsg) {
std::cerr << "My ID watching failed [" << errMsg << "] lifeSpan=[" << lifeSpan << "]" << std::endl;
};
auto messageCb = [](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::string myString(message.body(), message.bodySize());
rotationBuffer::putMessage(myString);
getChannel(DEQUEUE_INDEX)->ack(deliveryTag);
};
getChannel(DEQUEUE_INDEX)->consume(name_queues[DEQUEUE_INDEX])
.onReceived(messageCb)
.onError(errorCb);
std::cout << "(^q^)コンシュームしたぜ☆" << std::endl;
// I will keep on forever.
ev_run(pLoop_ev[DEQUEUE_INDEX]);
std::cout << "(^q^)コンシュームのループから抜けたぜ☆?" << std::endl;
// I will not come here.
return;
}
int main(int argc, char* argv[])
{
std::cout << "(^q^)Welcome! AMQP::durable=[" << AMQP::durable << "] AMQP::autodelete=[" << AMQP::autodelete << "] AMQP::passive=[" << AMQP::passive << "] AMQP::exclusive=[" << AMQP::exclusive << "]" << std::endl;
// 引数の解析
if(0<argc)
{
// プログラム名を省き、コマンドライン引数だけをつなげる。
std::string cmdArg;
for (int i = 1; i < argc; ++i)
{
cmdArg += std::string(argv[i]);
if (i < argc) {
cmdArg += " ";
}
}
std::cout << "(^q^)cmdArg=[" << cmdArg << "]" << std::endl;
std::istringstream data(cmdArg);
//data >> std::skipws;
std::cout << "(^q^)data.str()=[" << data.str() << "]" << std::endl;
// 与件
// 「--enqueue 1117 durable autodelete --dequeue 1116 durable autodelete」
// 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」
// 受け皿
std::string queueNames[2] = {}; // 結果はこれらの配列に入れる
std::vector<std::string> lifespans[2] = {};
// 記憶
int m0 = -1; // 0:enqueue, 1:dequeue
int m1 = -1; // lifespans index.
// 解析器
std::string a;
while (data >> a) {
std::cout << "(^_^)a0=[" << a << "]" << std::endl;
if ("--enqueue" == a)
{
std::cout << "(^q^)エンキュー!" << std::endl;
m0 = 0;
m1 = -1;
}
else if ("--dequeue" == a)
{
std::cout << "(^q^)デキュー!" << std::endl;
m0 = 1;
m1 = -1;
}
else if (-1 == m1 && 0 == m0) {
std::cout << "(^q^)エンキューのキュー名!" << std::endl;
queueNames[m0] = a; // キュー名
lifespans[m0] = std::vector<std::string>();// 配列の中にリストを準備
m1++;
}
else if (-1 == m1 && 1 == m0) {
std::cout << "(^q^)デキューのキュー名!" << std::endl;
queueNames[m0] = a; // キュー名
lifespans[m0] = std::vector<std::string>();// 配列の中にリストを準備
m1++;
}
else
{
std::cout << "(^q^)寿命! m0=[" << m0 << "] m1=[" << m1 << "]" << std::endl;
lifespans[m0].push_back(a);
std::cout << "(^q^)zyumyo-!" << std::endl;
} // 要素数-1 で最後の要素
}
std::cout << "(^-^)Loop end." << std::endl;
if ("" == queueNames[0] || "" == queueNames[1]) {
std::cerr << "コマンドライン引数の「--enqueue キュー名 寿命」「--dequeue キュー名 寿命」を漏れなく指定してください。例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete" << std::endl;
std::cerr << "queueNames[0]=[" << queueNames[0] << "]" << std::endl;
//std::cerr << "queueNames[1]=[" << queueNames[1] << "]" << std::endl;
//std::cerr << Dump();
exit(1);
}
std::cout << "(^q^)エンキューのキュー名=[" << queueNames[0] << "]" << std::endl;
std::cout << "(^q^)デキューのキュー名=[" << queueNames[1] << "]" << std::endl;
std::cout << "(^q^)エンキューの寿命の数=[" << lifespans[0].size() << "]" << std::endl;
std::cout << "(^q^)デキューの寿命の数=[" << lifespans[1].size() << "]" << std::endl;
// エンキュー
{
m0 = 0;
name_queues[ENQUEUE_INDEX] = queueNames[m0];
lifeSpan_queues[ENQUEUE_INDEX] = (LifeSpanType)0;
{
for (std::string lifespan : lifespans[m0])
{
lifeSpan_queues[ENQUEUE_INDEX] = (LifeSpanType)((int)lifeSpan_queues[ENQUEUE_INDEX] | (int)LifeSpanString_To_Enum(lifespan));
}
}
}
std::cout << "(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan_queues[ENQUEUE_INDEX]=[" << lifeSpan_queues[ENQUEUE_INDEX] << "]" << std::endl;
// デキュー
{
m0 = 1;
name_queues[DEQUEUE_INDEX] = queueNames[m0];
lifeSpan_queues[DEQUEUE_INDEX] = (LifeSpanType)0;
{
for (std::string lifespan : lifespans[m0])
{
lifeSpan_queues[DEQUEUE_INDEX] = (LifeSpanType)((int)lifeSpan_queues[DEQUEUE_INDEX] | (int)LifeSpanString_To_Enum(lifespan));
}
}
}
std::cout << "(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[" << name_queues[DEQUEUE_INDEX] << "] lifeSpan_queues[DEQUEUE_INDEX]=[" << lifeSpan_queues[DEQUEUE_INDEX] << "]" << std::endl;
std::cout << Dump();
std::cout << "(^q^)ダンプおわり" << std::endl;
}
else
{
std::cerr << "コマンドライン引数を指定してください。例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete";
exit(1);
}
// 常時監視をスタート
//auto th1 = std::thread([] { workConsume(); });
auto th2 = std::thread([] { workProduce(); });
workConsume();
// このプログラムは、自分では接続を切らない。
// スレッドの終了を待つ
th2.join();
//th1.join();
std::cout << "(^q^)Finished!" << std::endl;
return 0;
}
(^q^)Welcome! AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
(^q^)cmdArg=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^q^)data.str()=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^_^)a0=[--enqueue]
(^q^)エンキュー!
(^_^)a0=[1113]
(^q^)エンキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[0] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[--dequeue]
(^q^)デキュー!
(^_^)a0=[1112]
(^q^)デキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[autodelete]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^-^)Loop end.
(^q^)エンキューのキュー名=[1113]
(^q^)デキューのキュー名=[1112]
(^q^)エンキューの寿命の数=[1]
(^q^)デキューの寿命の数=[2]
(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[1113] lifeSpan_queues[ENQUEUE_INDEX]=[1]
(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[1112] lifeSpan_queues[DEQUEUE_INDEX]=[3]
tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
(^q^)ダンプおわり
(^q^)コンシューム開始だぜ☆
(^q^)メッセージ監視ループに入るぜ☆
(^▽^)接続したぜ☆ index=[1]
(^▽^)チャンネル開いたぜ☆ index=[1]
(^q^)デクレア・キューしたぜ☆
(^q^)コンシュームしたぜ☆
(^q^)メッセージを受け取ったか? message=[usi]
(^▽^)エンキュー開始だぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)デクレア・キューしたぜ☆
(^▽^)バインド・キューしたぜ☆
意味ね。
(^q^)Welcome! AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
(^q^)cmdArg=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^q^)data.str()=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^_^)a0=[--enqueue]
(^q^)エンキュー!
(^_^)a0=[1113]
(^q^)エンキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[0] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[--dequeue]
(^q^)デキュー!
(^_^)a0=[1112]
(^q^)デキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[autodelete]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^-^)Loop end.
(^q^)エンキューのキュー名=[1113]
(^q^)デキューのキュー名=[1112]
(^q^)エンキューの寿命の数=[1]
(^q^)デキューの寿命の数=[2]
(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[1113] lifeSpan_queues[ENQUEUE_INDEX]=[1]
(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[1112] lifeSpan_queues[DEQUEUE_INDEX]=[3]
tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
(^q^)ダンプおわり
(^q^)コンシューム開始だぜ☆
(^q^)メッセージ監視ループに入るぜ☆
(^▽^)接続したぜ☆ index=[1]
(^▽^)チャンネル開いたぜ☆ index=[1]
(^q^)デクレア・キューしたぜ☆
(^q^)コンシュームしたぜ☆
(^q^)メッセージを受け取ったか? message=[usi]
(^▽^)エンキュー開始だぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)デクレア・キューしたぜ☆
(^▽^)バインド・キューしたぜ☆
変わり無しか。
getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
.onSuccess([&exchange_name, &routing_key, &message]() {
std::cout << "(^▽^)これからパブリッシュするぜ☆" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
if (!getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cerr << "failed to publish?\n";
}
std::cout << "(^▽^)パブリッシュしたぜ☆" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
// break in ev loop.
closeConnection(ENQUEUE_INDEX);
std::cout << "(^▽^)オンサクセスを終わるぜ☆" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
});
5秒ずつ 待ったらどうか?
(^q^)cmdArg=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^q^)data.str()=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^_^)a0=[--enqueue]
(^q^)エンキュー!
(^_^)a0=[1113]
(^q^)エンキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[0] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[--dequeue]
(^q^)デキュー!
(^_^)a0=[1112]
(^q^)デキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[autodelete]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^-^)Loop end.
(^q^)エンキューのキュー名=[1113]
(^q^)デキューのキュー名=[1112]
(^q^)エンキューの寿命の数=[1]
(^q^)デキューの寿命の数=[2]
(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[1113] lifeSpan_queues[ENQUEUE_INDEX]=[1]
(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[1112] lifeSpan_queues[DEQUEUE_INDEX]=[3]
tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
(^q^)ダンプおわり
(^q^)コンシューム開始だぜ☆
(^q^)メッセージ監視ループに入るぜ☆
(^▽^)接続したぜ☆ index=[1]
(^▽^)チャンネル開いたぜ☆ index=[1]
(^q^)デクレア・キューしたぜ☆
(^q^)コンシュームしたぜ☆
(^q^)メッセージを受け取ったか? message=[usi]
(^▽^)エンキュー開始だぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)デクレア・キューしたぜ☆
(^▽^)バインド・キューしたぜ☆
変わり無しか。
構造体が同じなのか?
「libev publishing subscribing issue #108」(CopernicaMarketingSoftware/AMQP-CPP)
https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/108
//static struct ev_loop* pLoop_ev[2] = { EV_DEFAULT, EV_DEFAULT };
static struct ev_loop* pLoop_ev[2] = { ev_loop_new(), ev_loop_new() };
こうしたらいいんだろうか?
umadaze
# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 1
(^q^)ファクトリーを用意したぜ☆ HostName=[localhost]
(^q^)接続したぜ☆
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [1]
name_queues[index] = [1113]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [False]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[1]
(^q^)コンシューマーを取得したぜ☆ index=[1]
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > <---- [interrupt!] Dequeue(^q^) umadaze
おや? umadaze が返ってきている。キューに溜まっていたんだろうか?
daze が付いているので、バックグラウンドの浮かむ瀬がエンキューしたんだろうか?
usi
(^q^)Enqueue: エンキュー開始☆ message=[usi]
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [0]
name_queues[index] = [1112]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [True]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[0]
(^q^)Enqueue: body.Length=[3]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
usi を送っても返ってこないが。
# jobs
[1]+ Running ./tamesi35a2_cpp.exe --enqueue 1113 durable --dequeue 1112 durable autodelete > ./tamesi35a2_cpp.out.log 2> ./tamesi35a2_cpp.err.log < /dev/null & (wd: /home/★user/shogi/cpp_service)
abort はしなくなったな。
tamesi35a2_cpp.err.log
error declaring queue: Frame could not be sent lifeSpan=[1]
エラーメッセージが変わった。
tamesi35a2_cpp.out.log
(^q^)Welcome! AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
(^q^)cmdArg=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^q^)data.str()=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^_^)a0=[--enqueue]
(^q^)エンキュー!
(^_^)a0=[1113]
(^q^)エンキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[0] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[--dequeue]
(^q^)デキュー!
(^_^)a0=[1112]
(^q^)デキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[autodelete]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^-^)Loop end.
(^q^)エンキューのキュー名=[1113]
(^q^)デキューのキュー名=[1112]
(^q^)エンキューの寿命の数=[1]
(^q^)デキューの寿命の数=[2]
(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[1113] lifeSpan_queues[ENQUEUE_INDEX]=[1]
(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[1112] lifeSpan_queues[DEQUEUE_INDEX]=[3]
tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
(^q^)ダンプおわり
(^q^)コンシューム開始だぜ☆
(^q^)メッセージ監視ループに入るぜ☆
(^▽^)接続したぜ☆ index=[1]
(^▽^)チャンネル開いたぜ☆ index=[1]
(^q^)デクレア・キューしたぜ☆
(^q^)コンシュームしたぜ☆
(^q^)メッセージを受け取ったか? message=[uma]
(^▽^)エンキュー開始だぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)デクレア・キューしたぜ☆
(^▽^)バインド・キューしたぜ☆
(^▽^)これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^▽^)これから接続を閉じるぜ☆ index=[0]
(^▽^)接続を閉じたぜ☆
(^▽^)オンサクセスを終わるぜ☆
(^▽^)バインド・キューのループから抜けたぜ☆
(^q^)メッセージを受け取ったか? message=[usi]
(^▽^)エンキュー開始だぜ☆
(^▽^)デクレア・キューしたぜ☆
(^▽^)バインド・キューしたぜ☆
(^▽^)バインド・キューのループから抜けたぜ☆
標準出力の内容も変わった。
getChannel(ENQUEUE_INDEX)->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
.onError([&lifeSpan](const char* errMsg) {
std::cerr << "error declaring queue: " << errMsg << " lifeSpan=[" << lifeSpan << "]" << "\n";
});
std::cout << "(^▽^)デクレア・キューしたぜ☆" << std::endl;
ここで落ちているのか。
応答が返ってこないな。落ちてはないな。キューにメッセージは入っているようだ。
もう1回起動。
# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 1
(^q^)ファクトリーを用意したぜ☆ HostName=[localhost]
(^q^)接続したぜ☆
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [1]
name_queues[index] = [1113]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [False]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[1]
(^q^)コンシューマーを取得したぜ☆ index=[1]
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > <---- [interrupt!] Dequeue(^q^) usidaze
ワンテンポ遅れて usidaze が割り込んできたな。
切断を止めると
エラーは出ないが 応答もなくなったぜ。
チャンネルを閉じると
# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 1
(^q^)ファクトリーを用意したぜ☆ HostName=[localhost]
(^q^)接続したぜ☆
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [1]
name_queues[index] = [1113]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [False]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[1]
(^q^)コンシューマーを取得したぜ☆ index=[1]
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > kame
(^q^)Enqueue: エンキュー開始☆ message=[kame]
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [0]
name_queues[index] = [1112]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [True]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[0]
(^q^)Enqueue: body.Length=[4]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
Enqueue? > usagi
(^q^)Enqueue: エンキュー開始☆ message=[usagi]
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [0]
name_queues[index] = [1112]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [True]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[0]
(^q^)Enqueue: body.Length=[5]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
Enqueue? > <---- [interrupt!] Dequeue(^q^) kamedaze
[Ctrl]+[C] で抜けたタイミングで 割り込みが入ってきたんだが。
どういうことなのか?
切断しないと、パブリッシュできてなくないか?
getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
.onSuccess([&exchange_name, &routing_key, &message]() {
std::cout << "(^▽^)これからパブリッシュするぜ☆" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
if (!getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cerr << "failed to publish?\n";
}
std::cout << "(^▽^)パブリッシュしたぜ☆" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
// break in ev loop.
//closeConnection(ENQUEUE_INDEX);
closeChannel(ENQUEUE_INDEX);
std::cout << "(^▽^)オンサクセスを終わるぜ☆" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
});
よく見ると、5秒待機するという 要らんコードが残っていた。
接続も切らず、チャンネルも閉じず、5秒だけ消す
# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 1
(^q^)ファクトリーを用意したぜ☆ HostName=[localhost]
(^q^)接続したぜ☆
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [1]
name_queues[index] = [1113]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [False]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[1]
(^q^)コンシューマーを取得したぜ☆ index=[1]
<---- [interrupt!] Dequeue(^q^) kitunedaze
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > umigame
(^q^)Enqueue: エンキュー開始☆ message=[umigame]
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [0]
name_queues[index] = [1112]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [True]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[0]
(^q^)Enqueue: body.Length=[7]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
Enqueue? > <---- [interrupt!] Dequeue(^q^) umigamedaze
simesaba
(^q^)Enqueue: エンキュー開始☆ message=[simesaba]
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [0]
name_queues[index] = [1112]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [True]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[0]
(^q^)Enqueue: body.Length=[8]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
Enqueue? >
umigame は返ってきたが、simesaba は返ってこない。
tamesi35a2_cpp.out.log
(^q^)Welcome! AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
(^q^)cmdArg=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^q^)data.str()=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^_^)a0=[--enqueue]
(^q^)エンキュー!
(^_^)a0=[1113]
(^q^)エンキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[0] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[--dequeue]
(^q^)デキュー!
(^_^)a0=[1112]
(^q^)デキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[autodelete]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^-^)Loop end.
(^q^)エンキューのキュー名=[1113]
(^q^)デキューのキュー名=[1112]
(^q^)エンキューの寿命の数=[1]
(^q^)デキューの寿命の数=[2]
(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[1113] lifeSpan_queues[ENQUEUE_INDEX]=[1]
(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[1112] lifeSpan_queues[DEQUEUE_INDEX]=[3]
tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
(^q^)ダンプおわり
(^q^)コンシューム開始だぜ☆
(^q^)メッセージ監視ループに入るぜ☆
(^▽^)接続したぜ☆ index=[1]
(^▽^)チャンネル開いたぜ☆ index=[1]
(^q^)デクレア・キューしたぜ☆
(^q^)コンシュームしたぜ☆
(^q^)メッセージを受け取ったか? message=[umigame]
(^▽^)エンキュー開始だぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)デクレア・キューしたぜ☆
(^▽^)バインド・キューしたぜ☆
(^▽^)これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^▽^)オンサクセスを終わるぜ☆
tamesi35a2_cpp.err.log
空っぽ。
# rabbitmqctl list_queues
Listing queues ...
1112 0
1113 0
simesaba はどこに行ったのか?
# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 1
(^q^)ファクトリーを用意したぜ☆ HostName=[localhost]
(^q^)接続したぜ☆
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [1]
name_queues[index] = [1113]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [False]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[1]
(^q^)コンシューマーを取得したぜ☆ index=[1]
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > takoyaki
(^q^)Enqueue: エンキュー開始☆ message=[takoyaki]
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [0]
name_queues[index] = [1112]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [True]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[0]
(^q^)Enqueue: body.Length=[8]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
Enqueue? > kabocha
(^q^)Enqueue: エンキュー開始☆ message=[kabocha]
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [0]
name_queues[index] = [1112]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [True]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[0]
(^q^)Enqueue: body.Length=[7]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
Enqueue? > root@tk2-217-18401:/home/csg10/shogi/csharp_service# rabbitmqctl list_queues
Listing queues ...
1112 0
1113 0
takoyaki と kabocha はどこへ行ったのか?
(^q^)Welcome! AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
(^q^)cmdArg=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^q^)data.str()=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^_^)a0=[--enqueue]
(^q^)エンキュー!
(^_^)a0=[1113]
(^q^)エンキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[0] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[--dequeue]
(^q^)デキュー!
(^_^)a0=[1112]
(^q^)デキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[autodelete]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^-^)Loop end.
(^q^)エンキューのキュー名=[1113]
(^q^)デキューのキュー名=[1112]
(^q^)エンキューの寿命の数=[1]
(^q^)デキューの寿命の数=[2]
(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[1113] lifeSpan_queues[ENQUEUE_INDEX]=[1]
(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[1112] lifeSpan_queues[DEQUEUE_INDEX]=[3]
tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
(^q^)ダンプおわり
(^q^)コンシューム開始だぜ☆
(^q^)メッセージ監視ループに入るぜ☆
(^▽^)接続したぜ☆ index=[1]
(^▽^)チャンネル開いたぜ☆ index=[1]
(^q^)デクレア・キューしたぜ☆
(^q^)コンシュームしたぜ☆
(^q^)メッセージを受け取ったか? message=[umigame]
(^▽^)エンキュー開始だぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)デクレア・キューしたぜ☆
(^▽^)バインド・キューしたぜ☆
(^▽^)これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^▽^)オンサクセスを終わるぜ☆
ログは umigame のまま。
/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
const int bufferSize = 100;
static std::string buffer[bufferSize] = {};
static int bufferCursors[2] = { 0, 0 };
const int PUT_INDEX = 0;
const int GET_INDEX = 1;
std::mutex _mutex;
static void putMessage(std::string message)
{
std::unique_lock<std::mutex> lock(_mutex);
buffer[bufferCursors[PUT_INDEX]] = message;
std::cout << "(^◇^)回転式バッファー : putMessage : [" << bufferCursors[PUT_INDEX] << "] 入れた☆! message=[" << message << "]" << std::endl;
bufferCursors[PUT_INDEX]++;
if (!(bufferCursors[PUT_INDEX] < bufferSize))
{
bufferCursors[PUT_INDEX] = 0;
}
std::cout << "(^◇^)回転式バッファー : putMessage : 次回は[" << bufferCursors[PUT_INDEX] << "] ☆!" << std::endl;
}
static std::string getMessage()
{
std::unique_lock<std::mutex> lock(_mutex);
if ("" != buffer[bufferCursors[GET_INDEX]])
{
std::string message = buffer[bufferCursors[GET_INDEX]];
std::cout << "(^◇^)回転式バッファー : getMessage : [" << bufferCursors[GET_INDEX] << "] 出した☆! message=[" << message << "]" << std::endl;
buffer[bufferCursors[GET_INDEX]] = "";
bufferCursors[GET_INDEX]++;
if (!(bufferCursors[GET_INDEX] < bufferSize))
{
bufferCursors[GET_INDEX] = 0;
}
std::cout << "(^◇^)回転式バッファー : getMessage : 次回は[" << bufferCursors[GET_INDEX] << "] ☆!" << std::endl;
return message;
}
return "";
}
}
情報をもっと出そう。
# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 1
(^q^)ファクトリーを用意したぜ☆ HostName=[localhost]
(^q^)接続したぜ☆
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [1]
name_queues[index] = [1113]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [False]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[1]
(^q^)コンシューマーを取得したぜ☆ index=[1]
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > suzume
(^q^)Enqueue: エンキュー開始☆ message=[suzume]
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [0]
name_queues[index] = [1112]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [True]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[0]
(^q^)Enqueue: body.Length=[6]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
Enqueue? > <---- [interrupt!] Dequeue(^q^) suzumedaze
hato
(^q^)Enqueue: エンキュー開始☆ message=[hato]
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [0]
name_queues[index] = [1112]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [True]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[0]
(^q^)Enqueue: body.Length=[4]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
Enqueue? >
suzume は suzumedaze だが、hato は帰ってこない。
# rabbitmqctl list_queues
Listing queues ...
1112 0
1113 0
(^q^)Welcome! AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
(^q^)cmdArg=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^q^)data.str()=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^_^)a0=[--enqueue]
(^q^)エンキュー!
(^_^)a0=[1113]
(^q^)エンキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[0] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[--dequeue]
(^q^)デキュー!
(^_^)a0=[1112]
(^q^)デキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[autodelete]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^-^)Loop end.
(^q^)エンキューのキュー名=[1113]
(^q^)デキューのキュー名=[1112]
(^q^)エンキューの寿命の数=[1]
(^q^)デキューの寿命の数=[2]
(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[1113] lifeSpan_queues[ENQUEUE_INDEX]=[1]
(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[1112] lifeSpan_queues[DEQUEUE_INDEX]=[3]
tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
(^q^)ダンプおわり
(^q^)コンシューム開始だぜ☆
(^q^)メッセージ監視ループに入るぜ☆
(^▽^)接続したぜ☆ index=[1]
(^▽^)チャンネル開いたぜ☆ index=[1]
(^q^)デクレア・キューしたぜ☆
(^q^)コンシュームしたぜ☆
(^◇^)回転式バッファー : putMessage : [0] 入れた☆! message=[suzume]
(^◇^)回転式バッファー : putMessage : 次回は[1] ☆!
(^◇^)回転式バッファー : getMessage : [0] 出した☆! message=[suzume]
(^◇^)回転式バッファー : getMessage : 次回は[1] ☆!
(^q^)メッセージを受け取ったか? message=[suzume]
(^▽^)エンキュー開始だぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)デクレア・キューしたぜ☆
mutex の使い方を間違っているんだろうか?
「C++11のthreadで遊んでみる その2 - mutex編」(minus9d's diary)
http://minus9d.hatenablog.com/entry/20130914/1379168684
//std::unique_lock<std::mutex> lock(_mutex);
std::lock_guard<std::mutex> lock(_mutex);
こうなのだろうか?
# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 1
(^q^)ファクトリーを用意したぜ☆ HostName=[localhost]
(^q^)接続したぜ☆
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [1]
name_queues[index] = [1113]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [False]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[1]
(^q^)コンシューマーを取得したぜ☆ index=[1]
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > kuma
(^q^)Enqueue: エンキュー開始☆ message=[kuma]
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [0]
name_queues[index] = [1112]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [True]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[0]
(^q^)Enqueue: body.Length=[4]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
Enqueue? > <---- [interrupt!] Dequeue(^q^) kumadaze
sai
(^q^)Enqueue: エンキュー開始☆ message=[sai]
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [0]
name_queues[index] = [1112]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [True]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[0]
(^q^)Enqueue: body.Length=[3]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
kuma は帰ってきたが、sai は帰ってこなかった。
ロックの効果も変わってない。コンシュームは2回走るのだろうか?
auto messageCb = [](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::string myString(message.body(), message.bodySize());
std::cout << "(^q^)レシーブしたぜ☆ myString=[" << myString << "]" << std::endl;
rotationBuffer::putMessage(myString);
std::cout << "(^q^)レシーブ : これからackするぜ☆ deliveryTag=[" << deliveryTag << "]" << std::endl;
getChannel(DEQUEUE_INDEX)->ack(deliveryTag);
std::cout << "(^q^)レシーブ : ackしたぜ☆ deliveryTag=[" << deliveryTag << "]" << std::endl;
};
ack って既読か? 外してみるか。
(^q^)Welcome! AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
(^q^)cmdArg=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^q^)data.str()=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^_^)a0=[--enqueue]
(^q^)エンキュー!
(^_^)a0=[1113]
(^q^)エンキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[0] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[--dequeue]
(^q^)デキュー!
(^_^)a0=[1112]
(^q^)デキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[autodelete]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^-^)Loop end.
(^q^)エンキューのキュー名=[1113]
(^q^)デキューのキュー名=[1112]
(^q^)エンキューの寿命の数=[1]
(^q^)デキューの寿命の数=[2]
(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[1113] lifeSpan_queues[ENQUEUE_INDEX]=[1]
(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[1112] lifeSpan_queues[DEQUEUE_INDEX]=[3]
tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
(^q^)ダンプおわり
(^q^)コンシューム開始だぜ☆
(^q^)メインスレッドはずっと回ってるぜ☆
(^▽^)接続したぜ☆ index=[1]
(^▽^)チャンネル開いたぜ☆ index=[1]
(^q^)メッセージ監視ループに入るぜ☆
(^q^)コンシュームは、デクレア・キューしたぜ☆
(^q^)コンシュームを設定したぜ☆
(^q^)レシーブしたぜ☆ myString=[rama]
(^◇^)回転式バッファー : putMessage : [0] 入れた☆! message=[rama]
(^◇^)回転式バッファー : putMessage : 次回は[1] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[1]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[1]
(^◇^)回転式バッファー : getMessage : [0] 出した☆! message=[rama]
(^◇^)回転式バッファー : getMessage : 次回は[1] ☆!
(^q^)メッセージを受け取ったか? message=[rama]
(^▽^)エンキュー開始だぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)デクレア・キューしたぜ☆
(^▽^)バインド・キューしたぜ☆
(^▽^)これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^▽^)オンサクセスを終わるぜ☆
(^q^)レシーブしたぜ☆ myString=[^@^@]
(^◇^)回転式バッファー : putMessage : [1] 入れた☆! message=[^@^@]
(^◇^)回転式バッファー : putMessage : 次回は[2] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[2]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[2]
(^q^)レシーブしたぜ☆ myString=[dangomusi]
(^◇^)回転式バッファー : putMessage : [2] 入れた☆! message=[dangomusi]
(^◇^)回転式バッファー : putMessage : 次回は[3] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[3]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[3]
putMessage は生きているが、getMessage が来ないな。
ループでブロックされてるんじゃないか?
closeChannel(ENQUEUE_INDEX);
これを使ってみるか。
# ./tamesi34_cs.exe 2> ./tamesi34_cs.err.log
削除したいキューがあれば名前を、無ければ空文字列を入れろだぜ☆(^~^)
キュー名を入力 : キューを削除します
空文字列で[Enter] : 次のステップへ進む
Name or empty ? >
エンキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1112
エンキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 3
デキュー先のメッセージ・キューの名前を入れろだぜ☆(^~^)
Queue name? > 1113
デキュー先のメッセージ・キューの寿命を、足し算して答えろだぜ☆(^~^)
(1) durable : RabbitMQが止まってもキューを残す
(2) autodelete : コンシューマーが1人も接続していなかったら消す
(4) passive : キューが存在するかどうかチェックするだけ。中身見ない時これ
(8) exclusive : この接続でだけ使える。この接続が切れたら消す
Number ? > 1
(^q^)ファクトリーを用意したぜ☆ HostName=[localhost]
(^q^)接続したぜ☆
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [1]
name_queues[index] = [1113]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [False]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[1]
(^q^)コンシューマーを取得したぜ☆ index=[1]
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
Enqueue? > udon
(^q^)Enqueue: エンキュー開始☆ message=[udon]
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [0]
name_queues[index] = [1112]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [True]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[0]
(^q^)Enqueue: body.Length=[4]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
Enqueue? > <---- [interrupt!] Dequeue(^q^) udondaze
pasuta
(^q^)Enqueue: エンキュー開始☆ message=[pasuta]
tamesi34_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1112]
lifeSpan_queues [ENQUEUE_INDEX] =[Durable, Autodelete]
durable_lifeSpans [ENQUEUE_INDEX] =[True]
autodelete_lifeSpans[ENQUEUE_INDEX] =[True]
passive_lifeSpans [ENQUEUE_INDEX] =[False]
exclusive_lifeSpans [ENQUEUE_INDEX] =[False]
----
name_queues [DEQUEUE_INDEX] =[1113]
lifeSpan_queues [DEQUEUE_INDEX] =[Durable]
durable_lifeSpans [DEQUEUE_INDEX] =[True]
autodelete_lifeSpans[DEQUEUE_INDEX] =[False]
passive_lifeSpans [DEQUEUE_INDEX] =[False]
exclusive_lifeSpans [DEQUEUE_INDEX] =[False]
----
name_queues [DELETEQUEUE_INDEX]=[]
lifeSpan_queues [DELETEQUEUE_INDEX]=[0]
durable_lifeSpans [DELETEQUEUE_INDEX]=[False]
autodelete_lifeSpans[DELETEQUEUE_INDEX]=[False]
passive_lifeSpans [DELETEQUEUE_INDEX]=[False]
exclusive_lifeSpans [DELETEQUEUE_INDEX]=[False]
----
(tamesi34_cs.cs?) GetChannel index = [0]
name_queues[index] = [1112]
passive_lifeSpans[index] = [False]
durable_lifeSpans[index] = [True]
exclusive_lifeSpans[index] = [False]
autodelete_lifeSpans[index] = [True]
nowait は仮に false 固定
arguments は仮に null 固定
(^q^)チャンネルを開いたぜ☆ message=[0]
(^q^)Enqueue: body.Length=[6]
(^q^)Enqueue: パブリッシュしたぜ。キュー名=[1112]
(^q^)チャンネルを閉じたぜ☆
うどんは帰ってきたが、パスタは帰ってこない。
(^q^)Welcome! AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
(^q^)cmdArg=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^q^)data.str()=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^_^)a0=[--enqueue]
(^q^)エンキュー!
(^_^)a0=[1113]
(^q^)エンキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[0] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[--dequeue]
(^q^)デキュー!
(^_^)a0=[1112]
(^q^)デキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[autodelete]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^-^)Loop end.
(^q^)エンキューのキュー名=[1113]
(^q^)デキューのキュー名=[1112]
(^q^)エンキューの寿命の数=[1]
(^q^)デキューの寿命の数=[2]
(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[1113] lifeSpan_queues[ENQUEUE_INDEX]=[1]
(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[1112] lifeSpan_queues[DEQUEUE_INDEX]=[3]
tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
(^q^)ダンプおわり
(^q^)コンシューム開始だぜ☆
(^q^)メインスレッドはずっと回ってるぜ☆
(^q^) workProduce : メッセージ監視ループに入るぜ☆
(^▽^)接続したぜ☆ index=[1]
(^▽^)チャンネル開いたぜ☆ index=[1]
(^q^)コンシュームは、デクレア・キューしたぜ☆
(^q^)コンシュームを設定したぜ☆
(^q^)レシーブしたぜ☆ myString=[udon]
(^◇^)回転式バッファー : putMessage : [0] 入れた☆! message=[udon]
(^◇^)回転式バッファー : putMessage : 次回は[1] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[1]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[1]
(^◇^)回転式バッファー : ゲットメッセージ : [0] 出した☆! message=[udon]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[1] ☆!
(^q^) workProduce : メッセージを受け取ったか? message=[udon]
(^▽^)エンキュー開始だぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)デクレア・キューしたぜ☆
(^▽^)バインド・キューしたぜ☆
(^▽^)これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^▽^)これからチャンネルを閉じるぜ☆ index=[0]
(^▽^)チャンネルを閉じたぜ☆
(^▽^)オンサクセスを終わるぜ☆
(^q^)レシーブしたぜ☆ myString=[pasuta]
(^◇^)回転式バッファー : putMessage : [1] 入れた☆! message=[pasuta]
(^◇^)回転式バッファー : putMessage : 次回は[2] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[2]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[2]
次は 切断してみよう。
(>_<)enqueue: error declaring queue: [Frame could not be sent] queue name=[1113] lifeSpan=[1]
(>_<)enqueue: error declaring queue: [Frame could not be sent] queue name=[1113] lifeSpan=[1]
切断するとこれだ。
(^q^)Welcome! AMQP::durable=[1] AMQP::autodelete=[2] AMQP::passive=[8] AMQP::exclusive=[512]
(^q^)cmdArg=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^q^)data.str()=[--enqueue 1113 durable --dequeue 1112 durable autodelete ]
(^_^)a0=[--enqueue]
(^q^)エンキュー!
(^_^)a0=[1113]
(^q^)エンキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[0] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[--dequeue]
(^q^)デキュー!
(^_^)a0=[1112]
(^q^)デキューのキュー名!
(^_^)a0=[durable]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^_^)a0=[autodelete]
(^q^)寿命! m0=[1] m1=[0]
(^q^)zyumyo-!
(^-^)Loop end.
(^q^)エンキューのキュー名=[1113]
(^q^)デキューのキュー名=[1112]
(^q^)エンキューの寿命の数=[1]
(^q^)デキューの寿命の数=[2]
(^q^)エンキューおわり name_queues[ENQUEUE_INDEX]=[1113] lifeSpan_queues[ENQUEUE_INDEX]=[1]
(^q^)デキューおわり name_queues[DEQUEUE_INDEX]=[1112] lifeSpan_queues[DEQUEUE_INDEX]=[3]
tamesi35a2_cs.cs Dump
name_queues [ENQUEUE_INDEX] =[1113]
lifeSpan_queues [ENQUEUE_INDEX] =[1]
----
name_queues [DEQUEUE_INDEX] =[1112]
lifeSpan_queues [DEQUEUE_INDEX] =[3]
----
(^q^)ダンプおわり
(^q^)コンシューム開始だぜ☆
(^q^) workProduce : メッセージ監視ループに入るぜ☆
(^q^)メインスレッドはずっと回ってるぜ☆
(^▽^)接続したぜ☆ index=[1]
(^▽^)チャンネル開いたぜ☆ index=[1]
(^q^)コンシュームは、デクレア・キューしたぜ☆
(^q^)コンシュームを設定したぜ☆
(^q^)レシーブしたぜ☆ myString=[nezumi]
(^◇^)回転式バッファー : putMessage : [0] 入れた☆! message=[nezumi]
(^◇^)回転式バッファー : putMessage : 次回は[1] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[1]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[1]
(^◇^)回転式バッファー : ゲットメッセージ : [0] 出した☆! message=[nezumi]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[1] ☆!
(^q^) workProduce : メッセージを受け取ったか? message=[nezumi]
(^▽^)エンキュー開始だぜ☆
(^▽^)接続したぜ☆ index=[0]
(^▽^)チャンネル開いたぜ☆ index=[0]
(^▽^)デクレア・キューしたぜ☆
(^▽^)バインド・キューしたぜ☆
(^▽^)これからパブリッシュするぜ☆
(^▽^)パブリッシュしたぜ☆
(^▽^)これから接続を閉じるぜ☆ index=[0]
(^▽^)接続を閉じたぜ☆
(^▽^)オンサクセスを終わるぜ☆
(^▽^)バインド・キューのループから抜けたぜ☆
(^q^) workProduce : エンキューはでけたか?
(^q^)レシーブしたぜ☆ myString=[tatu]
(^◇^)回転式バッファー : putMessage : [1] 入れた☆! message=[tatu]
(^◇^)回転式バッファー : putMessage : 次回は[2] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[2]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[2]
(^◇^)回転式バッファー : ゲットメッセージ : [1] 出した☆! message=[tatu]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[2] ☆!
(^q^) workProduce : メッセージを受け取ったか? message=[tatu]
(^▽^)エンキュー開始だぜ☆
(^▽^)デクレア・キューしたぜ☆
(^▽^)バインド・キューしたぜ☆
(^▽^)バインド・キューのループから抜けたぜ☆
(^q^) workProduce : エンキューはでけたか?
(^q^)レシーブしたぜ☆ myString=[tori]
(^◇^)回転式バッファー : putMessage : [2] 入れた☆! message=[tori]
(^◇^)回転式バッファー : putMessage : 次回は[3] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[3]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[3]
(^◇^)回転式バッファー : ゲットメッセージ : [2] 出した☆! message=[tori]
(^◇^)回転式バッファー : ゲットメッセージ : 次回は[3] ☆!
(^q^) workProduce : メッセージを受け取ったか? message=[tori]
(^▽^)エンキュー開始だぜ☆
(^▽^)デクレア・キューしたぜ☆
(^▽^)バインド・キューしたぜ☆
(^▽^)バインド・キューのループから抜けたぜ☆
(^q^) workProduce : エンキューはでけたか?
ループのさせ方の考え方を間違えているんだろうか?
// メッセージ・キューの送信を担当
static void workProduce()
{
std::cout << "(^▽^)エンキュー開始だぜ☆" << std::endl;
std::string exchange_name = "myexchange";
std::string routing_key = "";
int lifeSpan = LifeSpanString_To_AmqpInt(lifeSpan_queues[ENQUEUE_INDEX]);
getChannel(ENQUEUE_INDEX)->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
.onError([&lifeSpan](const char* errMsg) {
std::cerr << "(>_<)enqueue: error declaring queue: [" << errMsg << "] queue name=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan=[" << lifeSpan << "]" << "\n";
});
std::cout << "(^▽^)デクレア・キューしたぜ☆" << std::endl;
getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
.onSuccess([&exchange_name, &routing_key]() {
for (;;)
{
std::string message = rotationBuffer::getMessage();
if ("" != message)
{
std::cout << "(^q^) workProduce : メッセージを受け取ったか? message=[" << message << "]" << std::endl;
// 末尾に daze を付ける。
message += "daze";
std::cout << "(^▽^)workProduce : これからパブリッシュするぜ☆" << std::endl;
if (!getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cerr << "failed to publish?\n";
}
std::cout << "(^▽^)パブリッシュしたぜ☆" << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
// break in ev loop.
//closeConnection(ENQUEUE_INDEX);
//closeChannel(ENQUEUE_INDEX);
});
std::cout << "(^▽^)バインド・キューしたぜ☆" << std::endl;
std::cout << "(^q^) workProduce : メッセージ監視ループに入るぜ☆" << std::endl;
// ここで無限ループさせる。
ev_run(pLoop_ev[ENQUEUE_INDEX]);
std::cout << "(>_<) workProduce : メッセージ監視ループを出てしまったぜ☆?" << std::endl;
}
こうでもない。
// メッセージ・キューの送信を担当
static void workProduce()
{
std::cout << "(^▽^)エンキュー開始だぜ☆" << std::endl;
std::string exchange_name = "myexchange";
std::string routing_key = "";
int lifeSpan = LifeSpanString_To_AmqpInt(lifeSpan_queues[ENQUEUE_INDEX]);
getChannel(ENQUEUE_INDEX)->declareQueue(name_queues[ENQUEUE_INDEX], lifeSpan)
.onError([&lifeSpan](const char* errMsg) {
std::cerr << "(>_<)enqueue: error declaring queue: [" << errMsg << "] queue name=[" << name_queues[ENQUEUE_INDEX] << "] lifeSpan=[" << lifeSpan << "]" << "\n";
});
std::cout << "(^▽^)デクレア・キューしたぜ☆" << std::endl;
getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_queues[ENQUEUE_INDEX], routing_key)
.onSuccess([&exchange_name, &routing_key]() {
std::string message = rotationBuffer::getMessage();
std::cout << "(^q^) workProduce : メッセージを受け取ったか? message=[" << message << "]" << std::endl;
// 末尾に daze を付ける。
message += "daze";
std::cout << "(^▽^)workProduce : これからパブリッシュするぜ☆" << std::endl;
if (!getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
std::cerr << "failed to publish?\n";
}
std::cout << "(^▽^)パブリッシュしたぜ☆" << std::endl;
// break in ev loop.
//closeConnection(ENQUEUE_INDEX);
//closeChannel(ENQUEUE_INDEX);
});
std::cout << "(^▽^)バインド・キューしたぜ☆" << std::endl;
std::cout << "(^q^) workProduce : メッセージ監視ループに入るぜ☆" << std::endl;
for (;;)
{
if (rotationBuffer::hasMessage())
{
std::cout << "(^q^) workProduce : メッセージがあるぜ☆" << std::endl;
// ここで、バインドキューのオンサクセスの処理へ。
ev_run(pLoop_ev[ENQUEUE_INDEX]);
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
std::cout << "(>_<) workProduce : メッセージ監視ループを出てしまったぜ☆?" << std::endl;
}
こんな目に。
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^)レシーブしたぜ☆ myString=[denkiunagi]
(^◇^)回転式バッファー : putMessage : [1] 入れた☆! message=[denkiunagi]
(^◇^)回転式バッファー : putMessage : 次回は[2] ☆!
(^q^)レシーブ : これからackするぜ☆ deliveryTag=[2]
(^q^)レシーブ : ackしたぜ☆ deliveryTag=[2]
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆
(^q^) workProduce : メッセージがあるぜ☆