Help us understand the problem. What is going on with this article?

[C++] AMQP-CPP サンプル・プログラム

More than 3 years have passed since last update.

AMQP-CPP は、プロセス間通信の手法 AMQP の実装ライブラリ。

「AMQP-CPP」(CopernicaMarketingSoftware/AMQP-CPP)
https://github.com/CopernicaMarketingSoftware/AMQP-CPP

概要

キュー名と、キューの寿命を指定することで、
そのキューに残っているメッセージ数を 標準出力で返します。

内容物

  • Makefile
  • tamesi37a1_msgq.hpp
  • tamesi37a1_msgq.cpp
  • tamesi37a1.cpp

Makefile

タブにしなければならないところが、このブログでは半角スペースに置き換わっているので注意。

.PHONY: all clean

all: tamesi37a1.exe

tamesi37a1_msgq.o: tamesi37a1_msgq.cpp tamesi37a1_msgq.hpp
        g++ -std=c++11 -c tamesi37a1_msgq.cpp -o tamesi37a1_msgq.o -lev -lamqpcpp -pthread

tamesi37a1.o: tamesi37a1.cpp tamesi37a1_msgq.hpp
        g++ -std=c++11 -c tamesi37a1.cpp -o tamesi37a1.o -lev -lamqpcpp -pthread

tamesi37a1.exe: tamesi37a1.o tamesi37a1_msgq.o
        g++ -std=c++11 tamesi37a1.o tamesi37a1_msgq.o -o tamesi37a1.exe -lev -lamqpcpp -pthread

clean:
        rm -f tamesi37a1.d tamesi37a1.o tamesi37a1_msgq.o tamesi37a1.exe tamesi37a1.out.log tamesi37a1.err.log

メイクファイルについては、別記事で解説した。
http://qiita.com/muzudho1/items/a96ae4f39575614a50a8

tamesi37a1_msgq.hpp

// This program for Ubuntu 16.04.

#ifndef TAMESI37A1_MSGQ_HPP
#define TAMESI37A1_MSGQ_HPP
// #pragma once

#include <string>
#include <mutex>

// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

namespace msgq
{
    static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
    static std::string exchange_name = "myexchange";
    static std::string routing_key = "";

    // AMQP-CPPでの実装 :
    // AMQP::durable=[1] RabbitMQが止まってもキューを残す
    // AMQP::autodelete=[2] コンシューマーが1人も接続していなかったら消す
    // AMQP::passive=[8] キューが存在するかどうかチェックするだけ。中身見ない時これ
    // AMQP::exclusive=[512] この接続でだけ使える。この接続が切れたら消す
    typedef int LifeSpan_t;

    class Settings
    {
        std::string name_queue_;
        std::mutex _mutex_queueName;
        msgq::LifeSpan_t lifeSpan_queue_;
        std::mutex _mutex_lifeSpan;

    public:
        Settings();
        ~Settings();

        void setQueueName(std::string value);
        std::string getQueueName();
        void setQueueLifeSpan(msgq::LifeSpan_t value);
        msgq::LifeSpan_t getQueueLifeSpan();

        bool parseCommandlineArgs(int argc, char* argv[]);

        std::string Dump();
    };

    // 接続はシングルトンにします
    static struct ev_loop* pLoop_ev = ev_loop_new();
    static AMQP::LibEvHandler* pHandler_ev = new AMQP::LibEvHandler(pLoop_ev);

    static AMQP::TcpConnection* pConnection_ev = nullptr;
    static AMQP::TcpConnection* getConnection()
    {
        if (nullptr == pConnection_ev) {
            pConnection_ev = new AMQP::TcpConnection(pHandler_ev, ADDRESS);
        }
        return pConnection_ev;
    }
    static void closeChannel();
    static void closeConnection()
    {
        if (nullptr != pConnection_ev) {
            // チャンネルにもヌルのフラグを入れる
            closeChannel();

            pConnection_ev->close();
            pConnection_ev = nullptr;
        }
    }

    // チャンネルはシングルトンにします。
    static AMQP::TcpChannel* pChannel_ev = nullptr;
    static AMQP::TcpChannel* getChannel()
    {
        if (nullptr == pChannel_ev)
        {
            pChannel_ev = new AMQP::TcpChannel(getConnection());
        }
        return pChannel_ev;
    }
    static void closeChannel()
    {
        if (nullptr != pChannel_ev) {
            pChannel_ev->close();
            pChannel_ev = nullptr;
        }
    }
}

#endif // #ifndef TAMESI37A1_MSGQ_HP

接続、チャンネルともに シングルトンで用意するのが要点。
シングルトンにしておけば、複数の接続、チャンネルを開くことは可能。

tamesi37a1_msgq.cpp

// This program for Ubuntu 16.04.

// #include "stdafx.h"
#include "tamesi37a1_msgq.hpp"

#include <iostream> // std::cout
#include <sstream> // std::ostringstream

namespace msgq
{
    static LifeSpan_t LifeSpanString_To_Int(std::string lifeSpan)
    {
        if ("durable" == lifeSpan) {
            return AMQP::durable;
        }
        else if ("autodelete" == lifeSpan) {
            return AMQP::autodelete;
        }
        else if ("passive" == lifeSpan) {
            return AMQP::passive;
        }
        else if ("exclusive" == lifeSpan) {
            return AMQP::exclusive;
        }
        else {
            std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
            exit(11);
        }
    }

    Settings::Settings()
    {
        name_queue_ = "";
        lifeSpan_queue_ = (LifeSpan_t)0;
    }
    Settings::~Settings()
    {
    }

    void Settings::setQueueName(std::string value) {
        std::lock_guard<std::mutex> lock(_mutex_queueName);
        name_queue_ = value;
    }
    std::string Settings::getQueueName() {
        std::lock_guard<std::mutex> lock(_mutex_queueName);
        return name_queue_;
    }
    void Settings::setQueueLifeSpan(LifeSpan_t value) {
        std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
        lifeSpan_queue_ = value;
    }
    LifeSpan_t Settings::getQueueLifeSpan() {
        std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
        return lifeSpan_queue_;
    }

    bool Settings::parseCommandlineArgs(int argc, char* argv[])
    {
        std::string name_queue = "";
        LifeSpan_t lifeSpan_queue = 0;

        if (0<argc)
        {
            // プログラム名を省き、コマンドライン引数だけをつなげる。
            std::string cmdArg;
            for (int i = 1; i < argc; ++i)
            {
                cmdArg += std::string(argv[i]);
                if (i < argc) {
                    cmdArg += " ";
                }
            }
            std::istringstream data(cmdArg);

            // 与件
            // 「--queue 1112 durable autodelete」
            // 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」

            // 受け皿
            // std::string name_queue = "";
            // LifeSpan_t lifeSpan_queue = 0;

            // 記憶
            int m0 = -1; // queue index.
            int m1 = -1; // lifespans index.

                         // 解析器
            std::string a;
            while (data >> a) {

                if ("--" == a.substr(0, 2)) {//先頭の2文字が「--」の場合
                    if ("--queue" == a) { m0 = 0;   m1 = -1; }
                    else { break; }
                }
                else if (
                    ">" == a.substr(0, 1)//先頭の1文字が「>」の場合
                    || "<" == a.substr(0, 1)//「<」
                    || ">" == a.substr(1, 1)//先頭から2文字目が「>」の場合
                    ) {
                    // 標準入力、標準出力、標準エラーを無視
                    break;
                }
                else if (0 == m0) { // キュー名
                    if (-1 == m1) {
                        name_queue = a;
                        lifeSpan_queue = 0;
                        m1++;
                    }
                    else {
                        lifeSpan_queue |= LifeSpanString_To_Int(a);
                    }
                }
                else
                {
                    // 欲しい内容がくるまでスルー
                }
            }
        }

        if ("" != name_queue) {
            setQueueName(name_queue);
            setQueueLifeSpan(lifeSpan_queue);
            return true;
        }

        std::cerr << "コマンドライン引数の「--queue 送信先キュー名 寿命」を漏れなく指定してください。" << std::endl
            << "例: --queue 1113 durable autodelete" << std::endl
            << "寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」" << std::endl
            << "name_queue=[" << name_queue << "]" << std::endl;
        return false;
    }

    std::string Settings::Dump()
    {
        static std::ostringstream sb;
        sb << "Dump" << std::endl
            << "    name_queue     =[" << getQueueName() << "]" << std::endl
            << "    lifeSpan_queue =[" << getQueueLifeSpan() << "]" << std::endl
            << "    ----" << std::endl;
        return sb.str();
    }

}

コマンドライン引数を解析するコードの分量が大きい。

tamesi37a1.cpp

//--------------------------------------------------------------------------------
// OS      : Windows10 : It not work. This program for Ubuntu 16.04.
//
// OS      : Ubuntu 16.04
// Library : libev
//         : Install   : Command  : sudo apt-get update
//                                : sudo apt-get install libev-dev
// Service : RabbitMQ
//         : Install   : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
//         : Reference : Web site : Top page http://www.rabbitmq.com/
//                     : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
//                     : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
//         : Manual    : Command  : man rabbitmqctl
//         : Start     : Command  : (1) rabbitmq-server
//                                : (2) [Ctrl] + [Z]
//                                : (3) bg (Job Number)
//         : Stop      : Command  : rabbitmqctl stop
//         : Check     : Command  : rabbitmqctl status
//         :           : Command  : rabbitmqctl list_queues
//                                : // Count messages in queues.
// Library : AMQP-CPP
//
// Program : this
//         : Author                         : Satoshi TAKAHASHI (Handle. Muzudho)
//         : License                        : MIT License https://opensource.org/licenses/MIT
//         : Explain                        : This program count messages in queue.
//                                          : Please settings queue to command line argument.
//         : Compile   : Command            : make
//                                : Explain : Please, read Makefile.
//         : Execute   : Command            : ./tamesi37a1.exe --queue 1112 durable autodelete
//                                          : Run on the foreground.
//                                : Explain :     ./tamesi37a1.exe         Executable file
//                                          :     --queue                  queue settings section
//                                          :     1112                     queue name                  (string ok)
//                                          :     durable                  queue life span             (multiple)
//                                          :     autodelete               queue life span
//                                : Explain : command line argument of life span of queue (Compositable)
//                                          :     durable     The queue exists, Even if the RabbitMQ service closed.
//                                          :     autodelete  The queue delete when consumers to be nothing.
//                                          :     passive     Just check what queue exists. Don't use read/write to queue.
//                                          :     exclusive   The queue delete when connection closed. The queue exists is this connection only.
//                                : FAQ     : Failures case: Message conflict when starting multiple processes.
//                                : Example : Windows 10   (This program not work) default queue life span.  durable
//                                          : Ubuntu 16.04                         default queue life span.  durable autodelete
//         : Stop      : Typing   : [Ctrl]+[C]
//
// Referrences :
//             : AMQP-CPP : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
//                                   : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
//                                   : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
//                                   : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
//                                   : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
//                                   : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//             : This     : Web site : ラムダ式(C++11) (cpprefjp - C++日本語リファレンス) https ://cpprefjp.github.io/lang/cpp11/lambda_expressions.html
//
//--------------------------------------------------------------------------------

#include <string> // std::string
#include <iostream> // std::cout

// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

#include "tamesi37a1_msgq.hpp"

int main(int argc, char* argv[])
{
    msgq::Settings settings;

    // 引数の解析
    if (!settings.parseCommandlineArgs(argc, argv))
    {
        exit(12);
    }

    std::string         qn      = settings.getQueueName();
    msgq::LifeSpan_t    life    = settings.getQueueLifeSpan();

    AMQP::TcpChannel* pChannel = msgq::getChannel();

    //----------------------------------------
    // 指定したキューの残りメッセージ数を返す
    //----------------------------------------
    auto callback = [](const std::string &name, int msgcount, int consumercount) {
        std::cout << "queueName = \"" << name << "\"" << std::endl
            << "messages = " << msgcount << std::endl
            << "nconsumers = " << consumercount << std::endl;
        // 接続を切って、このイベントハンドラから抜けることで、ev_runループから抜ける
        msgq::closeConnection();
    };
    pChannel->declareQueue(qn,life)
        .onSuccess(callback);

    //----------------------------------------
    // ループ
    //----------------------------------------
    ev_run(msgq::pLoop_ev, 0);

    return 0;
}

キューを開くときは、キューを作った時に設定したキューの寿命も 合わせて設定する必要がある。不便。

接続、チャンネルを開く、キューの宣言、ev_run と進むと、
キューの宣言のonSuccessが呼び出される。ここでキューのメッセージ数が分かるので標準出力する。

そのあと接続を切って onSuccessのハンドラを抜けると、ev_runを抜けることができる。

おまけ Expect

tamesi37a1.expect

#!/usr/bin/expect --
# 末尾の -- は、${argv} でコマンド引数を取ってくる印

# コマンドライン引数の例
# --queue 1112 durable autodelete

# カレント・ディレクトリを設定
cd /home/★user/★project/cpp_service/t37a1.d

# Expectをタイムアウトさせない
set timeout -1

# コマンドを実行する。エコーはさせない
spawn -noecho ./tamesi37a1.exe ${argv}

# 部分一致した一行をエコーさせる
expect "queueName = "
expect "messages = "
expect "consumers = "

おまけ PHP

<?php
// 処理時間の計測
$time=microtime(true);
exec('/usr/bin/expect /home/★user/★project/expect_service/tamesi37a1.expect '.urldecode($_SERVER['QUERY_STRING']),$o);
// 出力が配列$oに入っているので全行表示
foreach($o as $v)echo "$v\n";
// 処理時間の出力
echo '<br />' . (microtime(true) - $time);
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away