1. muzudho1

    Posted

    muzudho1
Changes in title
+[C++] AMQP-CPP サンプル・プログラム
Changes in tags
Changes in body
Source | HTML | Preview
@@ -0,0 +1,402 @@
+AMQP-CPP は、プロセス間通信の手法 AMQP の実装ライブラリ。
+
+「AMQP-CPP」(CopernicaMarketingSoftware/AMQP-CPP)
+https://github.com/CopernicaMarketingSoftware/AMQP-CPP
+
+# 概要
+キュー名と、キューの寿命を指定することで、
+そのキューに残っているメッセージ数を 標準出力で返します。
+
+# 内容物
+
+- Makefile
+- tamesi37a1_msgq.hpp
+- tamesi37a1_msgq.cpp
+- tamesi37a1.cpp
+
+## Makefile
+タブにしなければならないところが、このブログでは半角スペースに置き換わっているので注意。
+
+```
+.PHONY: all clean
+
+all: tamesi37a1.exe
+
+tamesi37a1_msgq.o: tamesi37a1_msgq.cpp tamesi37a1_msgq.hpp
+ g++ -std=c++11 -c tamesi37a1_msgq.cpp -o tamesi37a1_msgq.o -lev -lamqpcpp -pthread
+
+tamesi37a1.o: tamesi37a1.cpp tamesi37a1_msgq.hpp
+ g++ -std=c++11 -c tamesi37a1.cpp -o tamesi37a1.o -lev -lamqpcpp -pthread
+
+tamesi37a1.exe: tamesi37a1.o tamesi37a1_msgq.o
+ g++ -std=c++11 tamesi37a1.o tamesi37a1_msgq.o -o tamesi37a1.exe -lev -lamqpcpp -pthread
+
+clean:
+ rm -f tamesi37a1.d tamesi37a1.o tamesi37a1_msgq.o tamesi37a1.exe tamesi37a1.out.log tamesi37a1.err.log
+```
+
+メイクファイルについては、別記事で解説した。
+http://qiita.com/muzudho1/items/a96ae4f39575614a50a8
+
+## tamesi37a1_msgq.hpp
+
+```
+// This program for Ubuntu 16.04.
+
+#ifndef TAMESI37A1_MSGQ_HPP
+#define TAMESI37A1_MSGQ_HPP
+// #pragma once
+
+#include <string>
+#include <mutex>
+
+// プロセス間通信用
+#include <ev.h>
+#include <amqpcpp.h>
+#include <amqpcpp/libev.h>
+
+namespace msgq
+{
+ static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
+ static std::string exchange_name = "myexchange";
+ static std::string routing_key = "";
+
+ // AMQP-CPPでの実装 :
+ // AMQP::durable=[1] RabbitMQが止まってもキューを残す
+ // AMQP::autodelete=[2] コンシューマーが1人も接続していなかったら消す
+ // AMQP::passive=[8] キューが存在するかどうかチェックするだけ。中身見ない時これ
+ // AMQP::exclusive=[512] この接続でだけ使える。この接続が切れたら消す
+ typedef int LifeSpan_t;
+
+ class Settings
+ {
+ std::string name_queue_;
+ std::mutex _mutex_queueName;
+ msgq::LifeSpan_t lifeSpan_queue_;
+ std::mutex _mutex_lifeSpan;
+
+ public:
+ Settings();
+ ~Settings();
+
+ void setQueueName(std::string value);
+ std::string getQueueName();
+ void setQueueLifeSpan(msgq::LifeSpan_t value);
+ msgq::LifeSpan_t getQueueLifeSpan();
+
+ bool parseCommandlineArgs(int argc, char* argv[]);
+
+ std::string Dump();
+ };
+
+ // 接続はシングルトンにします
+ static struct ev_loop* pLoop_ev = ev_loop_new();
+ static AMQP::LibEvHandler* pHandler_ev = new AMQP::LibEvHandler(pLoop_ev);
+
+ static AMQP::TcpConnection* pConnection_ev = nullptr;
+ static AMQP::TcpConnection* getConnection()
+ {
+ if (nullptr == pConnection_ev) {
+ pConnection_ev = new AMQP::TcpConnection(pHandler_ev, ADDRESS);
+ }
+ return pConnection_ev;
+ }
+ static void closeChannel();
+ static void closeConnection()
+ {
+ if (nullptr != pConnection_ev) {
+ // チャンネルにもヌルのフラグを入れる
+ closeChannel();
+
+ pConnection_ev->close();
+ pConnection_ev = nullptr;
+ }
+ }
+
+ // チャンネルはシングルトンにします。
+ static AMQP::TcpChannel* pChannel_ev = nullptr;
+ static AMQP::TcpChannel* getChannel()
+ {
+ if (nullptr == pChannel_ev)
+ {
+ pChannel_ev = new AMQP::TcpChannel(getConnection());
+ }
+ return pChannel_ev;
+ }
+ static void closeChannel()
+ {
+ if (nullptr != pChannel_ev) {
+ pChannel_ev->close();
+ pChannel_ev = nullptr;
+ }
+ }
+}
+
+#endif // #ifndef TAMESI37A1_MSGQ_HP
+```
+
+接続、チャンネルともに シングルトンで用意するのが要点。
+シングルトンにしておけば、複数の接続、チャンネルを開くことは可能。
+
+## tamesi37a1_msgq.cpp
+
+```
+// This program for Ubuntu 16.04.
+
+// #include "stdafx.h"
+#include "tamesi37a1_msgq.hpp"
+
+#include <iostream> // std::cout
+#include <sstream> // std::ostringstream
+
+namespace msgq
+{
+ static LifeSpan_t LifeSpanString_To_Int(std::string lifeSpan)
+ {
+ if ("durable" == lifeSpan) {
+ return AMQP::durable;
+ }
+ else if ("autodelete" == lifeSpan) {
+ return AMQP::autodelete;
+ }
+ else if ("passive" == lifeSpan) {
+ return AMQP::passive;
+ }
+ else if ("exclusive" == lifeSpan) {
+ return AMQP::exclusive;
+ }
+ else {
+ std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
+ exit(11);
+ }
+ }
+
+ Settings::Settings()
+ {
+ name_queue_ = "";
+ lifeSpan_queue_ = (LifeSpan_t)0;
+ }
+ Settings::~Settings()
+ {
+ }
+
+ void Settings::setQueueName(std::string value) {
+ std::lock_guard<std::mutex> lock(_mutex_queueName);
+ name_queue_ = value;
+ }
+ std::string Settings::getQueueName() {
+ std::lock_guard<std::mutex> lock(_mutex_queueName);
+ return name_queue_;
+ }
+ void Settings::setQueueLifeSpan(LifeSpan_t value) {
+ std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
+ lifeSpan_queue_ = value;
+ }
+ LifeSpan_t Settings::getQueueLifeSpan() {
+ std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
+ return lifeSpan_queue_;
+ }
+
+ bool Settings::parseCommandlineArgs(int argc, char* argv[])
+ {
+ std::string name_queue = "";
+ LifeSpan_t lifeSpan_queue = 0;
+
+ if (0<argc)
+ {
+ // プログラム名を省き、コマンドライン引数だけをつなげる。
+ std::string cmdArg;
+ for (int i = 1; i < argc; ++i)
+ {
+ cmdArg += std::string(argv[i]);
+ if (i < argc) {
+ cmdArg += " ";
+ }
+ }
+ std::istringstream data(cmdArg);
+
+ // 与件
+ // 「--queue 1112 durable autodelete」
+ // 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」
+
+ // 受け皿
+ // std::string name_queue = "";
+ // LifeSpan_t lifeSpan_queue = 0;
+
+ // 記憶
+ int m0 = -1; // queue index.
+ int m1 = -1; // lifespans index.
+
+ // 解析器
+ std::string a;
+ while (data >> a) {
+
+ if ("--" == a.substr(0, 2)) {//先頭の2文字が「--」の場合
+ if ("--queue" == a) { m0 = 0; m1 = -1; }
+ else { break; }
+ }
+ else if (
+ ">" == a.substr(0, 1)//先頭の1文字が「>」の場合
+ || "<" == a.substr(0, 1)//「<」
+ || ">" == a.substr(1, 1)//先頭から2文字目が「>」の場合
+ ) {
+ // 標準入力、標準出力、標準エラーを無視
+ break;
+ }
+ else if (0 == m0) { // キュー名
+ if (-1 == m1) {
+ name_queue = a;
+ lifeSpan_queue = 0;
+ m1++;
+ }
+ else {
+ lifeSpan_queue |= LifeSpanString_To_Int(a);
+ }
+ }
+ else
+ {
+ // 欲しい内容がくるまでスルー
+ }
+ }
+ }
+
+ if ("" != name_queue) {
+ setQueueName(name_queue);
+ setQueueLifeSpan(lifeSpan_queue);
+ return true;
+ }
+
+ std::cerr << "コマンドライン引数の「--queue 送信先キュー名 寿命」を漏れなく指定してください。" << std::endl
+ << "例: --queue 1113 durable autodelete" << std::endl
+ << "寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」" << std::endl
+ << "name_queue=[" << name_queue << "]" << std::endl;
+ return false;
+ }
+
+ std::string Settings::Dump()
+ {
+ static std::ostringstream sb;
+ sb << "Dump" << std::endl
+ << " name_queue =[" << getQueueName() << "]" << std::endl
+ << " lifeSpan_queue =[" << getQueueLifeSpan() << "]" << std::endl
+ << " ----" << std::endl;
+ return sb.str();
+ }
+
+}
+```
+
+コマンドライン引数を解析するコードの分量が大きい。
+
+## tamesi37a1.cpp
+
+```
+//--------------------------------------------------------------------------------
+// OS : Windows10 : It not work. This program for Ubuntu 16.04.
+//
+// OS : Ubuntu 16.04
+// Library : libev
+// : Install : Command : sudo apt-get update
+// : sudo apt-get install libev-dev
+// Service : RabbitMQ
+// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
+// : Reference : Web site : Top page http://www.rabbitmq.com/
+// : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
+// : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
+// : Manual : Command : man rabbitmqctl
+// : Start : Command : (1) rabbitmq-server
+// : (2) [Ctrl] + [Z]
+// : (3) bg (Job Number)
+// : Stop : Command : rabbitmqctl stop
+// : Check : Command : rabbitmqctl status
+// : : Command : rabbitmqctl list_queues
+// : // Count messages in queues.
+// Library : AMQP-CPP
+//
+// Program : this
+// : Author : Satoshi TAKAHASHI (Handle. Muzudho)
+// : License : MIT License https://opensource.org/licenses/MIT
+// : Explain : This program count messages in queue.
+// : Please settings queue to command line argument.
+// : Compile : Command : make
+// : Explain : Please, read Makefile.
+// : Execute : Command : ./tamesi37a1.exe --queue 1112 durable autodelete
+// : Run on the foreground.
+// : Explain : ./tamesi37a1.exe Executable file
+// : --queue queue settings section
+// : 1112 queue name (string ok)
+// : durable queue life span (multiple)
+// : autodelete queue life span
+// : Explain : command line argument of life span of queue (Compositable)
+// : durable The queue exists, Even if the RabbitMQ service closed.
+// : autodelete The queue delete when consumers to be nothing.
+// : passive Just check what queue exists. Don't use read/write to queue.
+// : exclusive The queue delete when connection closed. The queue exists is this connection only.
+// : FAQ : Failures case: Message conflict when starting multiple processes.
+// : Example : Windows 10 (This program not work) default queue life span. durable
+// : Ubuntu 16.04 default queue life span. durable autodelete
+// : Stop : Typing : [Ctrl]+[C]
+//
+// Referrences :
+// : AMQP-CPP : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
+// : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
+// : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
+// : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
+// : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
+// : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
+// : This : Web site : ラムダ式(C++11) (cpprefjp - C++日本語リファレンス) https ://cpprefjp.github.io/lang/cpp11/lambda_expressions.html
+//
+//--------------------------------------------------------------------------------
+
+#include <string> // std::string
+#include <iostream> // std::cout
+
+// プロセス間通信用
+#include <ev.h>
+#include <amqpcpp.h>
+#include <amqpcpp/libev.h>
+
+#include "tamesi37a1_msgq.hpp"
+
+int main(int argc, char* argv[])
+{
+ msgq::Settings settings;
+
+ // 引数の解析
+ if (!settings.parseCommandlineArgs(argc, argv))
+ {
+ exit(12);
+ }
+
+ std::string qn = settings.getQueueName();
+ msgq::LifeSpan_t life = settings.getQueueLifeSpan();
+
+ AMQP::TcpChannel* pChannel = msgq::getChannel();
+
+ //----------------------------------------
+ // 指定したキューの残りメッセージ数を返す
+ //----------------------------------------
+ auto callback = [](const std::string &name, int msgcount, int consumercount) {
+ std::cout << msgcount << std::endl;
+ // 接続を切って、このイベントハンドラから抜けることで、ev_runループから抜ける
+ msgq::closeConnection();
+ };
+ pChannel->declareQueue(qn,life)
+ .onSuccess(callback);
+
+ //----------------------------------------
+ // ループ
+ //----------------------------------------
+ ev_run(msgq::pLoop_ev, 0);
+
+ return 0;
+}
+```
+
+キューを開くときは、キューを作った時に設定したキューの寿命も 合わせて設定する必要がある。不便。
+
+接続、チャンネルを開く、キューの宣言、ev_run と進むと、
+キューの宣言のonSuccessが呼び出される。ここでキューのメッセージ数が分かるので標準出力する。
+
+そのあと接続を切って onSuccessのハンドラを抜けると、ev_runを抜けることができる。
+