AMQP-CPP は、プロセス間通信の手法 AMQP の実装ライブラリ。
「AMQP-CPP」(CopernicaMarketingSoftware/AMQP-CPP)
https://github.com/CopernicaMarketingSoftware/AMQP-CPP
概要
キュー名と、キューの寿命を指定することで、
そのキューに残っているメッセージ数を 標準出力で返します。
内容物
- Makefile
- tamesi37a1_msgq.hpp
- tamesi37a1_msgq.cpp
- tamesi37a1.cpp
Makefile
タブにしなければならないところが、このブログでは半角スペースに置き換わっているので注意。
.PHONY: all clean
all: tamesi37a1.exe
tamesi37a1_msgq.o: tamesi37a1_msgq.cpp tamesi37a1_msgq.hpp
g++ -std=c++11 -c tamesi37a1_msgq.cpp -o tamesi37a1_msgq.o -lev -lamqpcpp -pthread
tamesi37a1.o: tamesi37a1.cpp tamesi37a1_msgq.hpp
g++ -std=c++11 -c tamesi37a1.cpp -o tamesi37a1.o -lev -lamqpcpp -pthread
tamesi37a1.exe: tamesi37a1.o tamesi37a1_msgq.o
g++ -std=c++11 tamesi37a1.o tamesi37a1_msgq.o -o tamesi37a1.exe -lev -lamqpcpp -pthread
clean:
rm -f tamesi37a1.d tamesi37a1.o tamesi37a1_msgq.o tamesi37a1.exe tamesi37a1.out.log tamesi37a1.err.log
メイクファイルについては、別記事で解説した。
http://qiita.com/muzudho1/items/a96ae4f39575614a50a8
tamesi37a1_msgq.hpp
// This program for Ubuntu 16.04.
#ifndef TAMESI37A1_MSGQ_HPP
#define TAMESI37A1_MSGQ_HPP
// #pragma once
#include <string>
#include <mutex>
// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
namespace msgq
{
static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
static std::string exchange_name = "myexchange";
static std::string routing_key = "";
// AMQP-CPPでの実装 :
// AMQP::durable=[1] RabbitMQが止まってもキューを残す
// AMQP::autodelete=[2] コンシューマーが1人も接続していなかったら消す
// AMQP::passive=[8] キューが存在するかどうかチェックするだけ。中身見ない時これ
// AMQP::exclusive=[512] この接続でだけ使える。この接続が切れたら消す
typedef int LifeSpan_t;
class Settings
{
std::string name_queue_;
std::mutex _mutex_queueName;
msgq::LifeSpan_t lifeSpan_queue_;
std::mutex _mutex_lifeSpan;
public:
Settings();
~Settings();
void setQueueName(std::string value);
std::string getQueueName();
void setQueueLifeSpan(msgq::LifeSpan_t value);
msgq::LifeSpan_t getQueueLifeSpan();
bool parseCommandlineArgs(int argc, char* argv[]);
std::string Dump();
};
// 接続はシングルトンにします
static struct ev_loop* pLoop_ev = ev_loop_new();
static AMQP::LibEvHandler* pHandler_ev = new AMQP::LibEvHandler(pLoop_ev);
static AMQP::TcpConnection* pConnection_ev = nullptr;
static AMQP::TcpConnection* getConnection()
{
if (nullptr == pConnection_ev) {
pConnection_ev = new AMQP::TcpConnection(pHandler_ev, ADDRESS);
}
return pConnection_ev;
}
static void closeChannel();
static void closeConnection()
{
if (nullptr != pConnection_ev) {
// チャンネルにもヌルのフラグを入れる
closeChannel();
pConnection_ev->close();
pConnection_ev = nullptr;
}
}
// チャンネルはシングルトンにします。
static AMQP::TcpChannel* pChannel_ev = nullptr;
static AMQP::TcpChannel* getChannel()
{
if (nullptr == pChannel_ev)
{
pChannel_ev = new AMQP::TcpChannel(getConnection());
}
return pChannel_ev;
}
static void closeChannel()
{
if (nullptr != pChannel_ev) {
pChannel_ev->close();
pChannel_ev = nullptr;
}
}
}
#endif // #ifndef TAMESI37A1_MSGQ_HP
接続、チャンネルともに シングルトンで用意するのが要点。
シングルトンにしておけば、複数の接続、チャンネルを開くことは可能。
tamesi37a1_msgq.cpp
// This program for Ubuntu 16.04.
// #include "stdafx.h"
#include "tamesi37a1_msgq.hpp"
#include <iostream> // std::cout
#include <sstream> // std::ostringstream
namespace msgq
{
static LifeSpan_t LifeSpanString_To_Int(std::string lifeSpan)
{
if ("durable" == lifeSpan) {
return AMQP::durable;
}
else if ("autodelete" == lifeSpan) {
return AMQP::autodelete;
}
else if ("passive" == lifeSpan) {
return AMQP::passive;
}
else if ("exclusive" == lifeSpan) {
return AMQP::exclusive;
}
else {
std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
exit(11);
}
}
Settings::Settings()
{
name_queue_ = "";
lifeSpan_queue_ = (LifeSpan_t)0;
}
Settings::~Settings()
{
}
void Settings::setQueueName(std::string value) {
std::lock_guard<std::mutex> lock(_mutex_queueName);
name_queue_ = value;
}
std::string Settings::getQueueName() {
std::lock_guard<std::mutex> lock(_mutex_queueName);
return name_queue_;
}
void Settings::setQueueLifeSpan(LifeSpan_t value) {
std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
lifeSpan_queue_ = value;
}
LifeSpan_t Settings::getQueueLifeSpan() {
std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
return lifeSpan_queue_;
}
bool Settings::parseCommandlineArgs(int argc, char* argv[])
{
std::string name_queue = "";
LifeSpan_t lifeSpan_queue = 0;
if (0<argc)
{
// プログラム名を省き、コマンドライン引数だけをつなげる。
std::string cmdArg;
for (int i = 1; i < argc; ++i)
{
cmdArg += std::string(argv[i]);
if (i < argc) {
cmdArg += " ";
}
}
std::istringstream data(cmdArg);
// 与件
// 「--queue 1112 durable autodelete」
// 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」
// 受け皿
// std::string name_queue = "";
// LifeSpan_t lifeSpan_queue = 0;
// 記憶
int m0 = -1; // queue index.
int m1 = -1; // lifespans index.
// 解析器
std::string a;
while (data >> a) {
if ("--" == a.substr(0, 2)) {//先頭の2文字が「--」の場合
if ("--queue" == a) { m0 = 0; m1 = -1; }
else { break; }
}
else if (
">" == a.substr(0, 1)//先頭の1文字が「>」の場合
|| "<" == a.substr(0, 1)//「<」
|| ">" == a.substr(1, 1)//先頭から2文字目が「>」の場合
) {
// 標準入力、標準出力、標準エラーを無視
break;
}
else if (0 == m0) { // キュー名
if (-1 == m1) {
name_queue = a;
lifeSpan_queue = 0;
m1++;
}
else {
lifeSpan_queue |= LifeSpanString_To_Int(a);
}
}
else
{
// 欲しい内容がくるまでスルー
}
}
}
if ("" != name_queue) {
setQueueName(name_queue);
setQueueLifeSpan(lifeSpan_queue);
return true;
}
std::cerr << "コマンドライン引数の「--queue 送信先キュー名 寿命」を漏れなく指定してください。" << std::endl
<< "例: --queue 1113 durable autodelete" << std::endl
<< "寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」" << std::endl
<< "name_queue=[" << name_queue << "]" << std::endl;
return false;
}
std::string Settings::Dump()
{
static std::ostringstream sb;
sb << "Dump" << std::endl
<< " name_queue =[" << getQueueName() << "]" << std::endl
<< " lifeSpan_queue =[" << getQueueLifeSpan() << "]" << std::endl
<< " ----" << std::endl;
return sb.str();
}
}
コマンドライン引数を解析するコードの分量が大きい。
tamesi37a1.cpp
//--------------------------------------------------------------------------------
// OS : Windows10 : It not work. This program for Ubuntu 16.04.
//
// OS : Ubuntu 16.04
// Library : libev
// : Install : Command : sudo apt-get update
// : sudo apt-get install libev-dev
// Service : RabbitMQ
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
// : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
// : Manual : Command : man rabbitmqctl
// : Start : Command : (1) rabbitmq-server
// : (2) [Ctrl] + [Z]
// : (3) bg (Job Number)
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : : Command : rabbitmqctl list_queues
// : // Count messages in queues.
// Library : AMQP-CPP
//
// Program : this
// : Author : Satoshi TAKAHASHI (Handle. Muzudho)
// : License : MIT License https://opensource.org/licenses/MIT
// : Explain : This program count messages in queue.
// : Please settings queue to command line argument.
// : Compile : Command : make
// : Explain : Please, read Makefile.
// : Execute : Command : ./tamesi37a1.exe --queue 1112 durable autodelete
// : Run on the foreground.
// : Explain : ./tamesi37a1.exe Executable file
// : --queue queue settings section
// : 1112 queue name (string ok)
// : durable queue life span (multiple)
// : autodelete queue life span
// : Explain : command line argument of life span of queue (Compositable)
// : durable The queue exists, Even if the RabbitMQ service closed.
// : autodelete The queue delete when consumers to be nothing.
// : passive Just check what queue exists. Don't use read/write to queue.
// : exclusive The queue delete when connection closed. The queue exists is this connection only.
// : FAQ : Failures case: Message conflict when starting multiple processes.
// : Example : Windows 10 (This program not work) default queue life span. durable
// : Ubuntu 16.04 default queue life span. durable autodelete
// : Stop : Typing : [Ctrl]+[C]
//
// Referrences :
// : AMQP-CPP : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
// : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
// : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
// : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
// : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
// : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
// : This : Web site : ラムダ式(C++11) (cpprefjp - C++日本語リファレンス) https ://cpprefjp.github.io/lang/cpp11/lambda_expressions.html
//
//--------------------------------------------------------------------------------
#include <string> // std::string
#include <iostream> // std::cout
// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include "tamesi37a1_msgq.hpp"
int main(int argc, char* argv[])
{
msgq::Settings settings;
// 引数の解析
if (!settings.parseCommandlineArgs(argc, argv))
{
exit(12);
}
std::string qn = settings.getQueueName();
msgq::LifeSpan_t life = settings.getQueueLifeSpan();
AMQP::TcpChannel* pChannel = msgq::getChannel();
//----------------------------------------
// 指定したキューの残りメッセージ数を返す
//----------------------------------------
auto callback = [](const std::string &name, int msgcount, int consumercount) {
std::cout << "queueName = \"" << name << "\"" << std::endl
<< "messages = " << msgcount << std::endl
<< "nconsumers = " << consumercount << std::endl;
// 接続を切って、このイベントハンドラから抜けることで、ev_runループから抜ける
msgq::closeConnection();
};
pChannel->declareQueue(qn,life)
.onSuccess(callback);
//----------------------------------------
// ループ
//----------------------------------------
ev_run(msgq::pLoop_ev, 0);
return 0;
}
キューを開くときは、キューを作った時に設定したキューの寿命も 合わせて設定する必要がある。不便。
接続、チャンネルを開く、キューの宣言、ev_run と進むと、
キューの宣言のonSuccessが呼び出される。ここでキューのメッセージ数が分かるので標準出力する。
そのあと接続を切って onSuccessのハンドラを抜けると、ev_runを抜けることができる。
おまけ Expect
tamesi37a1.expect
#!/usr/bin/expect --
# 末尾の -- は、${argv} でコマンド引数を取ってくる印
# コマンドライン引数の例
# --queue 1112 durable autodelete
# カレント・ディレクトリを設定
cd /home/★user/★project/cpp_service/t37a1.d
# Expectをタイムアウトさせない
set timeout -1
# コマンドを実行する。エコーはさせない
spawn -noecho ./tamesi37a1.exe ${argv}
# 部分一致した一行をエコーさせる
expect "queueName = "
expect "messages = "
expect "consumers = "
おまけ PHP
<?php
// 処理時間の計測
$time=microtime(true);
exec('/usr/bin/expect /home/★user/★project/expect_service/tamesi37a1.expect '.urldecode($_SERVER['QUERY_STRING']),$o);
// 出力が配列$oに入っているので全行表示
foreach($o as $v)echo "$v\n";
// 処理時間の出力
echo '<br />' . (microtime(true) - $time);