Last updated at Posted at 2017-03-20

前回の記事 : http://qiita.com/muzudho1/items/4de318c07521718857bb


次は PHP と 浮かむ瀬 をプロセス間通信させたい。

PHP でのプロセス間通信のやり方は 前に書いたのだった。


require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// GETクエリ文字列を取得
$QMsg = urldecode($_SERVER['QUERY_STRING']);
if( "" === $QMsg )
    $QMsg = "hello, world";

// 別のクエリーも送れることを説明
echo '(^q^)' . $QMsg . '<br />';
echo 'これを1回クリックしろだぜ☆m9(^~^)!<br />';
echo '<a href="http://★Webサイト/tamesi14.php?ClickSitanDaze">http://★Webサイト/tamesi14.php?ClickSitanDaze</a><br />';

// キューを開く
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QKey = '1111';
// queue_declare(
//        $queue       = '',
//        $passive     = false,
//        $durable     = false,
//        $exclusive   = false,
//        $auto_delete = true,
//        $nowait      = false,
//        $arguments   = null,
//        $ticket      = null
$channel->queue_declare($QKey, false, false, false, false);

// エンキューする
$msgObj = new AMQPMessage( $QMsg );
$channel->basic_publish($msgObj, '', $QKey);

echo "対局プログラムの近くに届けておくぜ☆(^▽^)v ['" . $QMsg . "']\n";

// リソースを開放する


// デキュー用
// サーバーの外部からはアクセスできないようにすること。
// このプログラムは、コマンドラインで実行する
// php loop6.php
// あるいは
// nohup /usr/bin/php /home/★user/★project/php_service/loop6.php > /home/★user/★project/php_log/loop6.out.log 2> /home/★user/★project/php_log/loop6.err.log < /dev/null &

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// プロセス間通信の前準備をするぜ☆(^~^)
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QKey = '1111';
$channel->queue_declare($QKey, false, false, false, false);

// キューを開いてしまえば、あとは無限ループ!

// デキューしたタイミングですることをここに書く
$callback = function($msg) {
    echo " [x] Received ", $msg->body, "\n";
$channel->basic_consume($QKey, '', false, true, false, false, $callback);

// 無限ループ
    // ここでブロックしながらデキューする
// ここに来ない
// $channel->close();
// $connection->close();




「How on PHP cancel consumer in RabbitMQ?」(stack overflow)

「PHP RabbitMQ setTimeout or other option to stop waiting for queue」(stack overflow)

また usi が効かない

ex.) hello, world
Enqueue? > usi
04:17:55 594 34_cs (P)___ enqueue begin [usi]
04:17:55 594 34_cs (P)___ channel.open begin
04:17:55 594 34_cs (P)___ connection.open begin
04:17:55 607 34_cs (P)___ connection.open end
04:17:55 609 34_cs (P)___ channel.open end
04:17:55 615 34_cs (P)___ enqueue end
04:17:55 615 34_cs (P)___ connection.close begin
04:17:55 615 34_cs (P)___ channel.close begin
04:17:55 622 34_cs (P)___ channel.close end
04:17:55 627 34_cs (P)___ connection.close end
Enqueue? > usi
04:18:05 465 34_cs (P)___ enqueue begin [usi]
04:18:05 465 34_cs (P)___ channel.open begin
04:18:05 465 34_cs (P)___ connection.open begin
04:18:05 475 34_cs (P)___ connection.open end
04:18:05 478 34_cs (P)___ channel.open end
04:18:05 478 34_cs (P)___ enqueue end
04:18:05 478 34_cs (P)___ connection.close begin
04:18:05 478 34_cs (P)___ channel.close begin
04:18:05 480 34_cs (P)___ channel.close end
04:18:05 480 34_cs (P)___ connection.close end


# ps aux | grep php
root      6091  0.0  0.0 311040   324 ?        Ss   Mar07   4:28 php-fpm: master process (/etc/php/7.0/fpm/php-fpm.conf)
www-data  6094  0.0  0.0 311040     0 ?        S    Mar07   0:00 php-fpm: pool www
www-data  6095  0.0  0.0 311040     0 ?        S    Mar07   0:00 php-fpm: pool www
root     19662  0.0  0.0  12936   988 pts/0    S+   04:20   0:00 grep --color=auto php

これが PHP の受信ループだったりしないんだろうか?

# kill 6095
# kill 6094
ex.) hello, world
Enqueue? > usi
04:21:47 062 34_cs (P)___ enqueue begin [usi]
04:21:47 062 34_cs (P)___ channel.open begin
04:21:47 062 34_cs (P)___ connection.open begin
04:21:47 071 34_cs (P)___ connection.open end
04:21:47 072 34_cs (P)___ channel.open end
04:21:47 077 34_cs (P)___ enqueue end
04:21:47 077 34_cs (P)___ connection.close begin
04:21:47 077 34_cs (P)___ channel.close begin
04:21:47 081 34_cs (P)___ channel.close end
04:21:47 087 34_cs (P)___ connection.close end
Enqueue? > 04:21:47 095 34_cs <-------- [interrupt!] Dequeue(^q^) id name ukamuse_SDT4
id author Hiraoka Takuya

option name Best_Book_Move type check default false
option name Book_File type string default book/20150503/book.bin
option name Byoyomi_Margin type spin default 500 min 0 max 2147483647
option name Clear_Hash type button
option name Draw_Ply type spin default 256 min 1 max 2147483647
option name Engine_Name type string default ukamuse_SDT4
option name Max_Book_Ply type spin default 32767 min 0 max 32767
option name Max_Random_Score_Diff type spin default 0 min 0 max 32600
option name Max_Random_Score_Diff_Ply type spin default 32767 min 0 max 32767
option name Min_Book_Ply type spin default 32767 min 0 max 32767
option name Min_Book_Score type spin default -180 min -32601 max 32601
option name Minimum_Thinking_Time type spin default 20 min 0 max 2147483647
option name Move_Overhead type spin default 30 min 0 max 5000
option name MultiPV type spin default 1 min 1 max 594
option name OwnBook type check default true
option name Slow_Mover type spin default 89 min 1 max 1000
option name Slow_Mover_10 type spin default 10 min 1 max 1000
option name Slow_Mover_16 type spin default 20 min 1 max 1000
option name Slow_Mover_20 type spin default 40 min 1 max 1000
option name Threads type spin default 2 min 1 max 256
option name Time_Margin type spin default 4500 min 0 max 2147483647
option name USI_Hash type spin default 256 min 1 max 1048576
option name USI_Ponder type check default true



position や go を送っても反応がなくなったときは、
isready を送ってみると 反応が返ってくることがある。

初回より後の isready は早い。

position、go 以外に何かステップがあっただろうか?

> isready
< readyok
> usinewgame
> position ~
> go 

のサイクルだと回る。なんで isready が要るのかは分からない。


(セマフォ (semaphore))「「分かりそう」で「分からない」でも「分かった」気になれるIT用語辞典」


  • 浮かむ瀬にアクセスできるのは 同時に1人
  • それ以外の人は エラーで 追い払われる



  • 浮かむ瀬にアクセスできる人数に上限は特にない。RabbitMQがくたばるまで
  • アクセスしてもブラウザは待機中になる
  • 指し手が返ってきても、それが自分の局面の指し手か分からない(混線)



RabbitMQを用いたプロセス間通信が 逆に弱点になるケースとして、ロック/アンロック 形式は向いていない。

なにしろ、キュー なのだ。キューは「しばらくお待ちください」という原理だ。

強いて言えば 3人並んでいるからまだ大丈夫、と100人ぐらいが思っていて同時にアクセスして 103人の行列ができる、というのが 工夫の要らない使い方だ。

「RabbitMQ Tutorials」(RabbitMQ)

上のサイトの図を見て分かるように、「P」は1つで「C」は1~複数だ。赤いチューブは 待ち行列で、「P」はいくつでもメッセージを送ってください、という思想だ。「P」を追い返す思想は 向いていない。

だから、ロック/アンロックを用いて 入場数制限をするための セマフォ は別途用意する必要がある。

とりあえず同時アクセスは そんなに起こらないだろう、という想定にする

RabbitMQ で、キューに溜まっているメッセージ数を調べる方法はあるだろうか。
また、キューに溜まっているのはメッセージ数であって、接続者すうではない。1手指すために isready、usinewgame、position、go の4メッセージを蓄えることになる。ホントは4メッセージも要らないんだが動きが怪しかったのでこの4メッセージを使っている。

ところで PHP側のソースコード


require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// URLの例
// http://★サイト/tamesi26a4.php?sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves

// GETクエリ文字列を取得
$QMsg = urldecode($_SERVER['QUERY_STRING']);
if( "" === $QMsg )
    // 指定が無ければ平手初期局面、先手
    $QMsg = "sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves";

// > isready

// キューを選ぶ
//    $connection 接続
//    $channel    チャンネル
//    $QName      キュー名
//    queue_declare(
//        $queue       = ''   , キュー名
//        $passive     = false, キューの存在確認をするだけ
//        $durable     = false, ブローカーが停止してもキューは残す
//        $exclusive   = false, この接続の間だけメッセージを残す
//        $auto_delete = true , コンシューマがいなくなるとメッセージを消す
//        $nowait      = false, 
//        $arguments   = null , 
//        $ticket      = null   
//    )
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();
$QName      = '1112';
$channel->queue_declare($QName, false, true, false, true);

// メッセージをエンキューする
$msgObj = new AMQPMessage( 'isready' . "\n" );
$channel->basic_publish($msgObj, '', $QName);

// リソースを開放する
$channel   ->close();

// < readyok

// 接続は使いまわさず、やり直す。
// queue_declare(
//     $queue       = ''   ,
//     $passive     = false,
//     $durable     = false,
//     $exclusive   = false,
//     $auto_delete = true ,
//     $nowait      = false,
//     $arguments   = null ,
//     $ticket      = null  
// )
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();
$QName      = '1113';
$channel->queue_declare($QName, false, true, false, false);

$callback = function($msg) {
    // メッセージをデキューしたタイミングですることをここに書く
    echo $msg->body . "\n";
$channel->basic_consume($QName, '', false, true, false, false, $callback);

// ここで、メッセージをデキューするまでブロック

$channel   ->close();

// > usinewgame

// あとはメッセージの送受信の繰り返し
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();
$QName      = '1112';
$channel->queue_declare($QName, false, true, false, true);

$msgObj = new AMQPMessage( 'usinewgame' . "\n" );
$channel->basic_publish($msgObj, '', $QName);

$channel   ->close();

// > position ~

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();
$QName      = '1112';
$channel->queue_declare($QName, false, true, false, true);

$msgObj = new AMQPMessage( 'position ' . $QMsg . "\n" );
$channel->basic_publish($msgObj, '', $QName);

$channel   ->close();

// > go

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();
$QName      = '1112';
$channel->queue_declare($QName, false, true, false, true);

$msgObj = new AMQPMessage( 'go' . "\n" );
$channel->basic_publish($msgObj, '', $QName);

$channel   ->close();

// < bestmove ~

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();
$QName      = '1113';
$channel->queue_declare($QName, false, true, false, false);

$callback = function($msg) {
    echo $msg->body . "\n";
$channel->basic_consume($QName, '', false, true, false, false, $callback);




ここから 欲しいメソッドを探すのがむずかしい。

クライアント側の関数から 何件のメッセージが溜まっているか 見えないんじゃないか?

仮に PHP のAPIではキューのメッセージ件数が見れないとしたら、何言語、何スクリプトだったら見えるんだ?

「[C#][PHP][C++] AMQP-CPP の使い方(RabbitMQ、プロセス間通信、メッセージキュー)」(Qiita)

C++なら channel.declareQueue のonSuccessハンドラの引数でメッセージ数は見れるようだ。
phpからこれを expect で叩く仕組みを作ればいいだろうか?

じゃあひとまず C++ で作ってみよう。



.PHONY: all clean

all: tamesi37a1.exe

tamesi37a1_msgq.o: tamesi37a1_msgq.cpp tamesi37a1_msgq.hpp
        g++ -std=c++11 -c tamesi37a1_msgq.cpp -o tamesi37a1_msgq.o -lev -lamqpcpp -pthread

tamesi37a1.o: tamesi37a1.cpp tamesi37a1_msgq.hpp
        g++ -std=c++11 -c tamesi37a1.cpp -o tamesi37a1.o -lev -lamqpcpp -pthread

tamesi37a1.exe: tamesi37a1.o tamesi37a1_msgq.o
        g++ -std=c++11 tamesi37a1.o tamesi37a1_msgq.o -o tamesi37a1.exe -lev -lamqpcpp -pthread

        rm -f tamesi37a1.d tamesi37a1.o tamesi37a1_msgq.o tamesi37a1.exe tamesi37a1.out.log tamesi37a1.err.log


// This program for Ubuntu 16.04.

// #pragma once

#include <string>
#include <mutex>

// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

namespace msgq
    static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
    static std::string exchange_name = "myexchange";
    static std::string routing_key = "";

    // AMQP-CPPでの実装 :
    // AMQP::durable=[1] RabbitMQが止まってもキューを残す
    // AMQP::autodelete=[2] コンシューマーが1人も接続していなかったら消す
    // AMQP::passive=[8] キューが存在するかどうかチェックするだけ。中身見ない時これ
    // AMQP::exclusive=[512] この接続でだけ使える。この接続が切れたら消す
    typedef int LifeSpan_t;

    class Settings
        std::string name_queue_;
        std::mutex _mutex_queueName;
        msgq::LifeSpan_t lifeSpan_queue_;
        std::mutex _mutex_lifeSpan;


        void setQueueName(std::string value);
        std::string getQueueName();
        void setQueueLifeSpan(msgq::LifeSpan_t value);
        msgq::LifeSpan_t getQueueLifeSpan();

        bool parseCommandlineArgs(int argc, char* argv[]);

        std::string Dump();

    // 接続はシングルトンにします
    static struct ev_loop* pLoop_ev = ev_loop_new();
    static AMQP::LibEvHandler* pHandler_ev = new AMQP::LibEvHandler(pLoop_ev);

    static AMQP::TcpConnection* pConnection_ev = nullptr;
    static AMQP::TcpConnection* getConnection()
        if (nullptr == pConnection_ev) {
            pConnection_ev = new AMQP::TcpConnection(pHandler_ev, ADDRESS);
        return pConnection_ev;
    static void closeChannel();
    static void closeConnection()
        if (nullptr != pConnection_ev) {
            // チャンネルにもヌルのフラグを入れる

            pConnection_ev = nullptr;

    // チャンネルはシングルトンにします。
    static AMQP::TcpChannel* pChannel_ev = nullptr;
    static AMQP::TcpChannel* getChannel()
        if (nullptr == pChannel_ev)
            pChannel_ev = new AMQP::TcpChannel(getConnection());
        return pChannel_ev;
    static void closeChannel()
        if (nullptr != pChannel_ev) {
            pChannel_ev = nullptr;

#endif // #ifndef TAMESI37A1_MSGQ_HPP


// This program for Ubuntu 16.04.

// #include "stdafx.h"
#include "tamesi37a1_msgq.hpp"

#include <iostream> // std::cout
#include <sstream> // std::ostringstream

namespace msgq
    static LifeSpan_t LifeSpanString_To_Int(std::string lifeSpan)
        if ("durable" == lifeSpan) {
            return AMQP::durable;
        else if ("autodelete" == lifeSpan) {
            return AMQP::autodelete;
        else if ("passive" == lifeSpan) {
            return AMQP::passive;
        else if ("exclusive" == lifeSpan) {
            return AMQP::exclusive;
        else {
            std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";

        name_queue_ = "";
        lifeSpan_queue_ = (LifeSpan_t)0;

    void Settings::setQueueName(std::string value) {
        std::lock_guard<std::mutex> lock(_mutex_queueName);
        name_queue_ = value;
    std::string Settings::getQueueName() {
        std::lock_guard<std::mutex> lock(_mutex_queueName);
        return name_queue_;
    void Settings::setQueueLifeSpan(LifeSpan_t value) {
        std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
        lifeSpan_queue_ = value;
    LifeSpan_t Settings::getQueueLifeSpan() {
        std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
        return lifeSpan_queue_;

    bool Settings::parseCommandlineArgs(int argc, char* argv[])
        std::string name_queue = "";
        LifeSpan_t lifeSpan_queue = 0;

        if (0<argc)
            // プログラム名を省き、コマンドライン引数だけをつなげる。
            std::string cmdArg;
            for (int i = 1; i < argc; ++i)
                cmdArg += std::string(argv[i]);
                if (i < argc) {
                    cmdArg += " ";
            std::istringstream data(cmdArg);

            // 与件
            // 「--queue 1112 durable autodelete」
            // 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」

            // 受け皿
            // std::string name_queue = "";
            // LifeSpan_t lifeSpan_queue = 0;

            // 記憶
            int m0 = -1; // queue index.
            int m1 = -1; // lifespans index.

                         // 解析器
            std::string a;
            while (data >> a) {

                if ("--" == a.substr(0, 2)) {//先頭の2文字が「--」の場合
                    if ("--queue" == a) { m0 = 0;   m1 = -1; }
                    else { break; }
                else if (
                    ">" == a.substr(0, 1)//先頭の1文字が「>」の場合
                    || "<" == a.substr(0, 1)//「<」
                    || ">" == a.substr(1, 1)//先頭から2文字目が「>」の場合
                    ) {
                    // 標準入力、標準出力、標準エラーを無視
                else if (0 == m0) { // キュー名
                    if (-1 == m1) {
                        name_queue = a;
                        lifeSpan_queue = 0;
                    else {
                        lifeSpan_queue |= LifeSpanString_To_Int(a);
                    // 欲しい内容がくるまでスルー

        if ("" != name_queue) {
            return true;

        std::cerr << "コマンドライン引数の「--queue 送信先キュー名 寿命」を漏れなく指定してください。" << std::endl
            << "例: --queue 1113 durable autodelete" << std::endl
            << "寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」" << std::endl
            << "name_queue=[" << name_queue << "]" << std::endl;
        return false;

    std::string Settings::Dump()
        static std::ostringstream sb;
        sb << "Dump" << std::endl
            << "    name_queue     =[" << getQueueName() << "]" << std::endl
            << "    lifeSpan_queue =[" << getQueueLifeSpan() << "]" << std::endl
            << "    ----" << std::endl;
        return sb.str();



// OS      : Windows10 : It not work. This program for Ubuntu 16.04.
// OS      : Ubuntu 16.04
// Library : libev
//         : Install   : Command  : sudo apt-get update
//                                : sudo apt-get install libev-dev
// Service : RabbitMQ
//         : Install   : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
//         : Reference : Web site : Top page http://www.rabbitmq.com/
//                     : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
//                     : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
//         : Manual    : Command  : man rabbitmqctl
//         : Start     : Command  : (1) rabbitmq-server
//                                : (2) [Ctrl] + [Z]
//                                : (3) bg (Job Number)
//         : Stop      : Command  : rabbitmqctl stop
//         : Check     : Command  : rabbitmqctl status
//         :           : Command  : rabbitmqctl list_queues
//                                : // Count messages in queues.
// Library : AMQP-CPP
// Program : this
//         : Author                         : Satoshi TAKAHASHI (Handle. Muzudho)
//         : License                        : MIT License https://opensource.org/licenses/MIT
//         : Explain                        : This program count messages in queue.
//                                          : Please settings queue to command line argument.
//         : Compile   : Command            : make
//                                : Explain : Please, read Makefile.
//         : Execute   : Command            : ./tamesi37a1.exe --queue 1112 durable autodelete
//                                          : Run on the foreground.
//                                : Explain :     ./tamesi37a1.exe         Executable file
//                                          :     --queue                  queue settings section
//                                          :     1112                     queue name                  (string ok)
//                                          :     durable                  queue life span             (multiple)
//                                          :     autodelete               queue life span
//                                : Explain : command line argument of life span of queue (Compositable)
//                                          :     durable     The queue exists, Even if the RabbitMQ service closed.
//                                          :     autodelete  The queue delete when consumers to be nothing.
//                                          :     passive     Just check what queue exists. Don't use read/write to queue.
//                                          :     exclusive   The queue delete when connection closed. The queue exists is this connection only.
//                                : FAQ     : Failures case: Message conflict when starting multiple processes.
//                                : Example : Windows 10   (This program not work) default queue life span.  durable
//                                          : Ubuntu 16.04                         default queue life span.  durable autodelete
//         : Stop      : Typing   : [Ctrl]+[C]
// Referrences :
//             : AMQP-CPP : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
//                                   : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
//                                   : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
//                                   : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
//                                   : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
//                                   : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//             : This     : Web site : ラムダ式(C++11) (cpprefjp - C++日本語リファレンス) https ://cpprefjp.github.io/lang/cpp11/lambda_expressions.html

#include <string> // std::string
#include <iostream> // std::cout

// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

#include "tamesi37a1_msgq.hpp"

int main(int argc, char* argv[])
    msgq::Settings settings;

    // 引数の解析
    if (!settings.parseCommandlineArgs(argc, argv))

    std::string         qn      = settings.getQueueName();
    msgq::LifeSpan_t    life    = settings.getQueueLifeSpan();

    AMQP::TcpChannel* pChannel = msgq::getChannel();

    // 指定したキューの残りメッセージ数を返す
    auto callback = [](const std::string &name, int msgcount, int consumercount) {
        std::cout << "queueName = \"" << name << "\"" << std::endl
            << "messages = " << msgcount << std::endl
            << "nconsumers = " << consumercount << std::endl;
        // 接続を切って、このイベントハンドラから抜けることで、ev_runループから抜ける

    // ループ
    ev_run(msgq::pLoop_ev, 0);

    return 0;


#!/usr/bin/expect --
# コマンドライン引数の例
# --queue 1112 durable autodelete

# カレント・ディレクトリを設定
cd /home/★user/★project/cpp_service/t37a1.d

# Expectをタイムアウトさせない
set timeout -1

spawn -noecho ./tamesi37a1.exe ${argv}

expect -re "queueName = "
expect -re "messages = "
expect -re "consumers = "


// 処理時間計測

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// URLの例
// http://★サイト/tamesi26a5.php?sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves

// GETクエリ文字列を取得
$QMsg = urldecode($_SERVER['QUERY_STRING']);
if( '' === $QMsg )
    $QMsg = 'sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves';

// busy確認

exec('/usr/bin/expect /home/★user/★project/expect_service/tamesi37a1.expect --queue 1112 durable autodelete',$o);
foreach($o as $v)
    if(0===strpos($v,'messages = '))
        $messages = intval( substr($v,11) );
        if( 0 < $messages ){
            die( 'error = "busy"' . "\n" . 'busy = ' . $messages . "\n" );
    //    echo 'v = ' . $v . "\n";

// > isready

// キューを開く
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1112';
// queue_declare(
//        $queue       = '',
//        $passive     = false,
//        $durable     = false,
//        $exclusive   = false,
//        $auto_delete = true,
//        $nowait      = false,
//        $arguments   = null,
//        $ticket      = null
$channel->queue_declare($QName, false, true, false, true);

// エンキューする
$msgObj = new AMQPMessage( 'isready' . "\n" );
$channel->basic_publish($msgObj, '', $QName);

// リソースを開放する

// < readyok

// 接続は使いまわさず、やり直す。
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1113';
// queue_declare(
//        $queue       = '',
//        $passive     = false,
//        $durable     = false,
//        $exclusive   = false,
//        $auto_delete = true,
//        $nowait      = false,
//        $arguments   = null,
//        $ticket      = null
$channel->queue_declare($QName, false, true, false, false);

// キューを開いてしまえば、あとは無限ループ!

// デキューしたタイミングですることをここに書く
$callback = function($msg) {
    if('readyok' !== rtrim($msg->body))
        die( "error = \"no readyok.\"\n" . $msg->body . "\n" );
$channel->basic_consume($QName, '', false, true, false, false, $callback);

// ここでブロックしながらデキューする。強制終了するまで続く
// wait($allowed_methods=null, $non_blocking = false, $timeout = 0)
//$timeoutSeconds = 2;
//$channel->wait(null, false, timeoutSeconds);


// > usinewgame

// キューを開く
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1112';
$channel->queue_declare($QName, false, true, false, true);

// エンキューする
$msgObj = new AMQPMessage( 'usinewgame' . "\n" );
$channel->basic_publish($msgObj, '', $QName);

// リソースを開放する

// > position ~

// キューを開く
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1112';
$channel->queue_declare($QName, false, true, false, true);

// エンキューする
$msgObj = new AMQPMessage( 'position ' . $QMsg . "\n" );
$channel->basic_publish($msgObj, '', $QName);

// リソースを開放する

// > go

// キューを開く
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1112';
$channel->queue_declare($QName, false, true, false, true);

// エンキューする
$msgObj = new AMQPMessage( 'go' . "\n" );
$channel->basic_publish($msgObj, '', $QName);

// リソースを開放する

// < bestmove ~
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1113';
$channel->queue_declare($QName, false, true, false, false);

$callback = function($msg) {
    echo $msg->body . "\n";
$channel->basic_consume($QName, '', false, true, false, false, $callback);

// ここでブロックしながらデキューする。強制終了するまで続く


// 処理時間表示
echo '#&lt;br /&gt;<br />' . "\ntime = " . (microtime(true) - $time);



// (2017-03-22 Modify begin) 標準入出力は全て変更
// (2017-03-22 Modify end)



    // (2017-03-22 Modify begin) 標準入出力は全て変更
    if (nyugyokuWin)
        SERVIO_BEGIN << "bestmove win" << SERVIO_END_FLUSH(searcher, "NyugyokuWin");
    else if (!bestThread->rootMoves[0].pv[0])
        SERVIO_BEGIN << "bestmove resign" << SERVIO_END_FLUSH(searcher, "Resign");
    else {
        if (bestThread->rootMoves[0].pv.size() > 1 || bestThread->rootMoves[0].extractPonderFromTT(pos)) {
            // ポンダー付きの場合
            SERVIO_BEGIN << "bestmove " << bestThread->rootMoves[0].pv[0].toUSI() << " ponder " << bestThread->rootMoves[0].pv[1].toUSI() << SERVIO_END_FLUSH(searcher, "BestmoveAndPonder");
        else {
            SERVIO_BEGIN << "bestmove " << bestThread->rootMoves[0].pv[0].toUSI() << SERVIO_END_FLUSH(searcher, "Bestmove");
    // (2017-03-22 Modify end)

こんな風に、標準出力ストリームがあるところは 全て置き換えた、あるいは見た目上変わりはないが、マクロの中身が変わったと考えていい。


標準出力の改造より、標準入力の改造の方が 大がかりだ。


void Searcher::doUSICommandLoop(int argc, char* argv[]) {
    // 引数の解析
    std::string             name_enqueue = "";
    service_io::LifeSpan_t  lifeSpan_enqueue = 0;
    std::string             name_dequeue = "";
    service_io::LifeSpan_t  lifeSpan_dequeue = 0;
        if (!service_io::parseArgs(argc, argv, name_enqueue, lifeSpan_enqueue, name_dequeue, lifeSpan_dequeue))
        thisptr->serviceIo2.SetQueueName    (service_io::ENQUEUE_INDEX, name_enqueue);
        thisptr->serviceIo2.SetQueueLifeSpan(service_io::ENQUEUE_INDEX, lifeSpan_enqueue);
        thisptr->serviceIo2.SetQueueName    (service_io::DEQUEUE_INDEX, name_dequeue);
        thisptr->serviceIo2.SetQueueLifeSpan(service_io::DEQUEUE_INDEX, lifeSpan_dequeue);

    // デキューの常時監視をスタート
    auto th1 = std::thread([] { service_io::workConsume(*thisptr); });

    bool dialogue = false;
    // 「./apery」のように引数無しで打鍵して実行された場合に限り、なんども標準入力を待つ。
    if (argc == 1)
        dialogue = true;
    bool evalTableIsRead = false;
    Position pos(DefaultStartPositionSFEN, threads.main(), thisptr);

    std::string cmd;
    std::string token;

    // プログラム名を省き、コマンドライン引数だけをつなげる。
    for (int i = 1; i < argc; ++i)
        cmd += std::string(argv[i]) + " ";

    do {
        // 文字が打たれたらその文字を、空文字を打たれたら "quit" 扱いとする。
        if (dialogue)
            cmd = service_io::dequeue();

        // 半角スペース区切りの最初の1トークンを拾う
        std::istringstream ssCmd(cmd);
        ssCmd >> std::skipws >> token;

        if (token == "--enqueue" || token == "--dequeue") {
            dialogue = true;
            std::cerr << "Command line parameter error : \"--msgqueue\" option for UBUNTU.";
//      else if (token == "tu") {
//#if UBUNTU
//          // tu と飛んできたら ka と返す
//          service_io::oss << "ka";
//          service_io::flushMessage();
//          std::cerr << "Command error : \"tu\" command for UBUNTU.";
//      }
        else if (token == "quit" || token == "stop" || token == "ponderhit" || token == "gameover") {
            if (token != "ponderhit" || signals.stopOnPonderHit) {
                signals.stop = true;
                limits.ponder = false;
            if (token == "ponderhit" && limits.moveTime != 0)
                limits.moveTime += timeManager.elapsed();
        else if (token == "go") go(pos, ssCmd);
        else if (token == "position") setPosition(pos, ssCmd);
        else if (token == "usinewgame"); // isready で準備は出来たので、対局開始時に特にする事はない。
        else if (token == "usi")
            SERVIO_BEGIN << "id name " << std::string(options["Engine_Name"])
                << "\nid author Hiraoka Takuya"
                << "\n" << options
                << "\nusiok" << SERVIO_END_FLUSH(thisptr, "Usi");
        else if (token == "isready") { // 対局開始前の準備。
            threads.main()->previousScore = ScoreInfinite;
            if (!evalTableIsRead) {
                // 一時オブジェクトを生成して Evaluator::init() を呼んだ直後にオブジェクトを破棄する。
                // 評価関数の次元下げをしたデータを格納する分のメモリが無駄な為、
                std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
                evalTableIsRead = true;
            SERVIO_BEGIN << "readyok" << SERVIO_END_FLUSH(thisptr, "Isready");
        else if (token == "setoption") setOption(ssCmd);
        else if (token == "write_eval") { // 対局で使う為の評価関数バイナリをファイルに書き出す。
            if (!evalTableIsRead)
                std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
#if defined LEARN
        else if (token == "l") {
            auto learner = std::unique_ptr<Learner>(new Learner);
            learner->learn(pos, ssCmd);
        else if (token == "make_teacher") {
            if (!evalTableIsRead) {
                std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
                evalTableIsRead = true;
        else if (token == "use_teacher") {
            if (!evalTableIsRead) {
                std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
                evalTableIsRead = true;
            use_teacher(pos, ssCmd);
        else if (token == "check_teacher") {
        else if (token == "print") printEvalTable(SQ88, f_gold + SQ78, f_gold, false);
#if !defined MINIMUL
        // 以下、デバッグ用
        else if (token == "bench") {
            if (!evalTableIsRead) {
                std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
                evalTableIsRead = true;
        else if (token == "key") { SYNCCOUT << pos.getKey() << SYNCENDL; }
        else if (token == "tosfen") { SYNCCOUT << pos.toSFEN() << SYNCENDL; }
        else if (token == "eval") { SYNCCOUT << evaluateUnUseDiff(pos) / FVScale << SYNCENDL; }
        else if (token == "d") { pos.print(); }
        else if (token == "s") { measureGenerateMoves(pos); }
        else if (token == "t") { SYNCCOUT << pos.mateMoveIn1Ply().toCSA() << SYNCENDL; }
        else if (token == "b") { makeBook(pos, ssCmd); }
        else { SYNCCOUT << "unknown command: [" << cmd << "]" << SYNCENDL; }
    } while (token != "quit" && dialogue);

    // th1 スレッドの終了を待つ



メッセージの受信元をデキュー と呼ぶことにした。


という部分で ブラウザから送られてくるメッセージを取得している。

また apery はコマンドライン引数を 単にコマンドと解釈してループの中で解析するので、
コマンドライン引数「--enqueue ~略~ --dequeue ~略~」を使いたいときに困った。
そこで 改造を加えたが、これが悪影響を与えていないものかどうか。

グローバル変数 searcher

コンピュータ将棋はマルチスレッドで動いているので、グローバル変数を用意しても、別スレッドからは見えないので グローバルには利用できない。

そこで、どのスレッドからも利用できるグローバル変数のように利用されている Searcher 構造体に、メッセージ・キュー名等の、グローバルに利用したいデータを追加した。


// (2017-03-20 Add begin)
#ifdef UBUNTU
class ServiceIo2
    std::string name_queues_[2];// = { "", "" };
    std::mutex _mutex_queueName;
    service_io::LifeSpan_t lifeSpan_queues_[2];// = { (service_io::LifeSpan_t)0, (service_io::LifeSpan_t)0 };
    std::mutex _mutex_lifeSpan;

    void SetQueueName(int index, std::string value);
    std::string GetQueueName(int index);
    void SetQueueLifeSpan(int index, service_io::LifeSpan_t value);
    service_io::LifeSpan_t GetQueueLifeSpan(int index);
// (2017-03-20 Add end)


    // (2017-03-20 Add begin)
#ifdef UBUNTU
    STATIC ServiceIo2 serviceIo2;
    // (2017-03-20 Add end)


// (2017-03-20 Add begin)
namespace service_io
#ifdef UBUNTU
    // メッセージキューにエンキュー
    // 出力ストリームのカンマ区切り「,,,;」で使われ、何か数字を返す必要があって必然性はない数字 0 を返している
    static int flushMessage(Searcher* pSearcher, std::string hint)
        std::string message = service_io::oss.str();
        oss.str(std::string());// 空文字列を入れてクリアーする

        std::string name_enqueue = pSearcher->serviceIo2.GetQueueName(ENQUEUE_INDEX);
        LifeSpan_t lifeSpan_enqueue = pSearcher->serviceIo2.GetQueueLifeSpan(ENQUEUE_INDEX);
        if ("" == name_enqueue)
            std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.SM_ queue name is empty=[" << name_enqueue << "] ENQUEUE_INDEX=[" << ENQUEUE_INDEX << "] lifeSpan=[" << lifeSpan_enqueue << "] hint=[" << hint << "]" << "\n";

        // チャンネルを宣言
        getChannel(ENQUEUE_INDEX)->declareQueue(name_enqueue, lifeSpan_enqueue)
            .onError([=](const char* errMsg) {
            std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.SM_.declareQueue.onError [" << errMsg << "] queue name=[" << name_enqueue << "] lifeSpan=[" << lifeSpan_enqueue << "]" << "\n";

        // チャンネルを束縛
        getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_enqueue, routing_key)
            // ev_run から1回だけ呼び出される。コミットを開始してこのハンドラから抜けること。コミットの成功時に接続を切ることでev_runから抜ける。
            .onSuccess([=]() {//&message

                              // トランザクション開始
            // パブリッシュ開始
            if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {

                // コミットを開始
                    .onSuccess([=]() {
                    // コミット成功

                    // 接続を切ったあと、トランザクションのコミットが実行されるようだ。
                    .onError([=](const char *errMsg) {//&message
                    std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
            else {
                std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;

        }).onError([](const char* errMsg) {
            std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;

        // ここで、バインドキューのオンサクセスの処理へ。
        ev_run(pLoop_ev[ENQUEUE_INDEX], 0);

        return 0;

    // メッセージ・キューの監視を開始
    static void workConsume(Searcher& searcher)
        std::string name_dequeue = searcher.serviceIo2.GetQueueName(DEQUEUE_INDEX);
        LifeSpan_t lifeSpan_dequeue = searcher.serviceIo2.GetQueueLifeSpan(DEQUEUE_INDEX);

        // I will go to the front of the box named "1111".
        getChannel(DEQUEUE_INDEX)->declareQueue(name_dequeue, lifeSpan_dequeue);

        // I look inside the box.
        auto errorCb = [&lifeSpan_dequeue](const char *errMsg) {
            std::cerr << getTimeslipNow() << "(>_<)cpp ___(C) CNS.onError My ID watching failed [" << errMsg << "] lifeSpan_dequeue=[" << lifeSpan_dequeue << "]" << std::endl;
        auto messageCb = [](const AMQP::Message &amqpMessage, uint64_t deliveryTag, bool redelivered) {
            std::string message(amqpMessage.body(), amqpMessage.bodySize());



        // I will keep on forever.

        // I will not come here.
    static int flushMessage(Searcher* pSearcher, std::string hint) {
        std::cout << IOLock << service_io::oss.str() << std::endl << IOUnlock;
        service_io::oss.str(std::string());// 空文字列を入れてクリアーする
        return 0;
// (2017-03-20 Add end)


// (2017-03-20 Add begin)
#ifdef UBUNTU
ServiceIo2 Searcher::serviceIo2;
// (2017-03-20 Add end)

init( )

// (2017-03-20 Add begin)
#ifdef UBUNTU
    serviceIo2.name_queues_[service_io::ENQUEUE_INDEX] = "";
    serviceIo2.name_queues_[service_io::DEQUEUE_INDEX] = "";
    serviceIo2.lifeSpan_queues_[service_io::ENQUEUE_INDEX] = (service_io::LifeSpan_t)0;
    serviceIo2.lifeSpan_queues_[service_io::DEQUEUE_INDEX] = (service_io::LifeSpan_t)0;
// (2017-03-20 Add end)


// (2017-03-20 Add begin)
#ifdef UBUNTU
void ServiceIo2::SetQueueName(int index, std::string value) {
    std::lock_guard<std::mutex> lock(_mutex_queueName);
    name_queues_[index] = value;
std::string ServiceIo2::GetQueueName(int index) {
    std::lock_guard<std::mutex> lock(_mutex_queueName);
    return name_queues_[index];
void ServiceIo2::SetQueueLifeSpan(int index, service_io::LifeSpan_t value) {
    std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
    lifeSpan_queues_[index] = value;
service_io::LifeSpan_t ServiceIo2::GetQueueLifeSpan(int index) {
    std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
    return lifeSpan_queues_[index];
// (2017-03-20 Add end)



// OS      : Windows 10
// IDE     : Visual Studio 2015
//           Install : NuGet   : Install-Package RabbitMQ.Client -Version 4.1.1
// OS      : Ubuntu 16.04

// Service : RabbitMQ
//         : Install   : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
//         : Reference : Web site : Top page http://www.rabbitmq.com/
//                     : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
//                     : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
//         : Manual    : Command  : man rabbitmqctl
//         : Start     : Command  : (1) rabbitmq-server
//                                : (2) [Ctrl] + [Z]
//                                : (3) bg (Job Number)
//         : Stop      : Command  : rabbitmqctl stop
//         : Check     : Command  : rabbitmqctl status
//                     : Command  : rabbitmqctl list_queues
//                                : // Count messages in queues.
// Program : this
//         : Compile   : Command  : make profgen_sse
//                     :          : mv apery ../bin/apery
//         : Execute   : Command  : ./apery --enqueue 1113 durable --dequeue 1112 durable autodelete > ./apery.out.log 2> ./apery.err.log < /dev/null &
// 対局
// ====
//     usi
//         usiok
//     isready
//         readyok
//     usinewgame
//     position sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves
//     go
//         例) bestmove 2g2f ponder 1c1d
//     quit


#include <string> // std::string
#include <iostream> // std::cout
#include <sstream> // std::ostringstream
#include "common.hpp"

// #define UBUNTU

// 入出力を、サービスIOでラッピングします
#define SERVIO_BEGIN service_io::oss
#define SERVIO_END_CLEAN std::endl, service_io::cleanMessage()
#define SERVIO_END_FLUSH(pSearcher,hint) std::endl, service_io::flushMessage(pSearcher,hint)

namespace service_io
    // 文字列ストリーム出力を、文字列に置換するもの
    static std::ostringstream oss;

#ifdef UBUNTU
// OS      : Ubuntu 16.04
// Library : libev
//         : Install   : Command  : sudo apt-get update
//                                : sudo apt-get install libev-dev
// Service : RabbitMQ
//         : Reference : Web site : Top page http://www.rabbitmq.com/
//         : Install   : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
//         : Manual    : Command  : man rabbitmqctl
//         : Start     : Command  : rabbitmq-server
//         : Stop      : Command  : rabbitmqctl stop
//         : Check     : Command  : rabbitmqctl status
//         :           : Command  : rabbitmqctl list_queues
// Library : AMQP-CPP
//         : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
// service_io.hpp

// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

// バックグラウンド・プロセスに切り替えて使用する可能性も考慮した入出力API
namespace service_io
    // 時計はずれているが、現在時刻が返ってくる
    static std::string getTimeslipNow()
        // どこか外国の 1969年12月31日19時ぐらいから数えたミリ秒だろうか?
        std::chrono::milliseconds ms = std::chrono::duration_cast< std::chrono::milliseconds >(

        long long hours = ms.count() / 1000 / 60 / 60 % 24;
        long long minutes = ms.count() / 1000 / 60 % 60;
        long long seconds = ms.count() / 1000 % 60;
        long long milliseconds = ms.count() % 1000;

        std::ostringstream oss;
        oss << std::setfill('0') << std::setw(2) << hours << ":" << std::setfill('0') << std::setw(2) << minutes << ":" << std::setfill('0') << std::setw(2) << seconds << "'" << std::setfill('0') << std::setw(3) << milliseconds << " ";
        return oss.str();

    // AMQP-CPPでの実装 :
    // AMQP::durable=[1] RabbitMQが止まってもキューを残す
    // AMQP::autodelete=[2] コンシューマーが1人も接続していなかったら消す
    // AMQP::passive=[8] キューが存在するかどうかチェックするだけ。中身見ない時これ
    // AMQP::exclusive=[512] この接続でだけ使える。この接続が切れたら消す
    typedef int LifeSpan_t;

    // 0 : enqueue用設定
    // 1 : dequeue用設定
    static const int ENQUEUE_INDEX = 0;
    static const int DEQUEUE_INDEX = 1;
    static const int NUM_INDEX = 2;

    static LifeSpan_t LifeSpanString_To_Int(std::string lifeSpan)
        if ("durable" == lifeSpan) {
            return AMQP::durable;
        else if ("autodelete" == lifeSpan) {
            return AMQP::autodelete;
        else if ("passive" == lifeSpan) {
            return AMQP::passive;
        else if ("exclusive" == lifeSpan) {
            return AMQP::exclusive;
        else {
            std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";

    static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
    static std::string exchange_name = "myexchange";
    static std::string routing_key = "";

    // 接続はシングルトンにします
    static struct ev_loop* pLoop_ev[2] = { ev_loop_new(), ev_loop_new() };
    static AMQP::LibEvHandler* pHandler_ev[2] = { new AMQP::LibEvHandler(pLoop_ev[ENQUEUE_INDEX]), new AMQP::LibEvHandler(pLoop_ev[DEQUEUE_INDEX]) };

    static AMQP::TcpConnection* pConnection_ev[2] = { nullptr,nullptr };
    static AMQP::TcpConnection* getConnection(int index)
        if (nullptr == pConnection_ev[index]) {
            pConnection_ev[index] = new AMQP::TcpConnection(pHandler_ev[index], ADDRESS);
        return pConnection_ev[index];
    static void closeChannel(int index);
    static void closeConnection(int index)
        if (nullptr != pConnection_ev[index]) {
            // チャンネルにもヌルのフラグを入れる

            pConnection_ev[index] = nullptr;

    // チャンネルはシングルトンにします。
    static AMQP::TcpChannel* pChannel_ev[2] = { nullptr,nullptr };
    static AMQP::TcpChannel* getChannel(int index)
        if (nullptr == pChannel_ev[index])
            pChannel_ev[index] = new AMQP::TcpChannel(getConnection(index));
        return pChannel_ev[index];
    static void closeChannel(int index)
        if (nullptr != pChannel_ev[index]) {
            pChannel_ev[index] = nullptr;

    /// <summary>
    /// 回転式バッファー。
    /// これはメイン・スレッドに置く。
    /// デキューのスレッドでエンキューすることはできない。
    /// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
    /// </summary>
    namespace rotationBuffer
        const int bufferSize = 1024;
        static std::string buffer[bufferSize] = {};
        static int bufferCursors[2] = { 0, 0 };
        static int busy = 0;
        const int PUT_INDEX = 0;
        const int GET_INDEX = 1;

        static std::mutex _mutex;
        // 返り値: 追加しなかったとき 偽、それ以外のとき 真。
        static bool push_inner(std::string message)
            std::lock_guard<std::mutex> lock(_mutex);

            if (busy + 1 == bufferSize || bufferSize * 99 / 100 < busy) {
                // busy
                return false;

            buffer[bufferCursors[PUT_INDEX]] = message;

            if (!(bufferCursors[PUT_INDEX] < bufferSize))
                bufferCursors[PUT_INDEX] = 0;

            return true;
        static std::string pop_inner()
            std::lock_guard<std::mutex> lock(_mutex);

            if ("" != buffer[bufferCursors[GET_INDEX]])
                std::string message = buffer[bufferCursors[GET_INDEX]];

                buffer[bufferCursors[GET_INDEX]] = "";

                if (!(bufferCursors[GET_INDEX] < bufferSize))
                    bufferCursors[GET_INDEX] = 0;

                return message;
            // buffer is empty
            return "";

        // 追加に成功するまで、ブロックする
        static void push_block(std::string message)
            while (!rotationBuffer::push_inner(message))
                // busy
        // 何か取れるまで、ブロックする
        static std::string pop_block()
            std::string message;
            while ("" == (message = rotationBuffer::pop_inner()))
                // buffer is empty
            return message;

    static int cleanMessage() {
        service_io::oss.str(std::string());// 空文字列を入れてクリアーする
        return 0;

    // service_io::workConsume() を別スレッドで走らせておくこと。
    static std::string dequeue() {
        return rotationBuffer::pop_block();

    static bool parseArgs(int argc, char* argv[], std::string& name_enqueue, LifeSpan_t& lifeSpan_enqueue, std::string& name_dequeue, LifeSpan_t& lifeSpan_dequeue)
        name_enqueue = "";
        name_dequeue = "";

        if (0<argc)
            // プログラム名を省き、コマンドライン引数だけをつなげる。
            std::string cmdArg;
            for (int i = 1; i < argc; ++i)
                cmdArg += std::string(argv[i]);
                if (i < argc) {
                    cmdArg += " ";
            std::istringstream data(cmdArg);

            // 与件
            // 「--enqueue 1113 durable --dequeue 1112 durable autodelete」
            // 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」

            // 受け皿
            // 参照渡しの引数に返す

            // 記憶
            int m0 = -1; // enqueue or dequeue index.
            int m1 = -1; // lifespans index.

                         // 解析器
            std::string a;
            while (data >> a) {

                if ("--" == a.substr(0, 2)) {//先頭の2文字が「--」の場合
                    if ("--enqueue" == a) { m0 = ENQUEUE_INDEX; m1 = -1; }
                    else if ("--dequeue" == a) { m0 = DEQUEUE_INDEX; m1 = -1; }
                    else { break; }
                else if (
                    ">" == a.substr(0, 1)//先頭の1文字が「>」の場合
                    || "<" == a.substr(0, 1)//「<」
                    || ">" == a.substr(1, 1)//先頭から2文字目が「>」の場合
                    ) {
                    // 標準入力、標準出力、標準エラーを無視
                else if (0 == m0) { // エンキュー先キュー名
                    if (-1 == m1) {
                        name_enqueue = a;
                        lifeSpan_enqueue = 0;
                    else {
                        lifeSpan_enqueue |= LifeSpanString_To_Int(a);
                else if (1 == m0) { // デキュー元キュー名
                    if (-1 == m1) {
                        name_dequeue = a;
                        lifeSpan_dequeue = 0;
                    else {
                        lifeSpan_dequeue |= LifeSpanString_To_Int(a);
                    // 欲しい内容がくるまでスルー

            if ("" == name_enqueue || "" == name_dequeue) {
                goto gt_CommandlineParameterError;
            goto gt_CommandlineParameterError;
        return true;

        std::cerr << getTimeslipNow() << "コマンドライン引数の「--enqueue 送信先キュー名 寿命」「--dequeue 受信元キュー名 寿命」を漏れなく指定してください。" << std::endl
            << "例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete" << std::endl
            << "寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」" << std::endl
            << "name_enqueue=[" << name_enqueue << "]" << std::endl
            << "name_dequeue=[" << name_dequeue << "]" << std::endl;
        return false;


// バックグラウンド・プロセスに切り替えて使用する可能性も考慮した入出力API
namespace service_io
    static int cleanMessage() {
        service_io::oss.str(std::string());// 空文字列を入れてクリアーする
        return 0;

    static std::string dequeue() {
        std::string cmd;
        if (!std::getline(std::cin, cmd))
            cmd = "quit";
        return cmd;

class ServiceIo

#endif // #ifndef APERY_SERVICE_IO_HPP


//#include "stdafx.h"
#include "service_io.hpp"



C++言語を使い慣れない間に 改造した。



