1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

コンピュータ将棋ソフトとの対局サーバーを立てよう<その18>

Last updated at Posted at 2017-03-20

前回の記事 : http://qiita.com/muzudho1/items/4de318c07521718857bb

バックグラウンドで動く浮かむ瀬はできたのだった。

次は PHP と 浮かむ瀬 をプロセス間通信させたい。

PHP でのプロセス間通信のやり方は 前に書いたのだった。
http://qiita.com/muzudho1/items/5a36ecdcd9b0ba444577

送る方

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// GETクエリ文字列を取得
$QMsg = urldecode($_SERVER['QUERY_STRING']);
if( "" === $QMsg )
{
    $QMsg = "hello, world";
}

// 別のクエリーも送れることを説明
echo '(^q^)' . $QMsg . '<br />';
echo 'これを1回クリックしろだぜ☆m9(^~^)!<br />';
echo '<a href="http://★Webサイト/tamesi14.php?ClickSitanDaze">http://★Webサイト/tamesi14.php?ClickSitanDaze</a><br />';

// キューを開く
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QKey = '1111';
// queue_declare(
//        $queue       = '',
//        $passive     = false,
//        $durable     = false,
//        $exclusive   = false,
//        $auto_delete = true,
//        $nowait      = false,
//        $arguments   = null,
//        $ticket      = null
$channel->queue_declare($QKey, false, false, false, false);

// エンキューする
$msgObj = new AMQPMessage( $QMsg );
$channel->basic_publish($msgObj, '', $QKey);

echo "対局プログラムの近くに届けておくぜ☆(^▽^)v ['" . $QMsg . "']\n";

// リソースを開放する
$channel->close();
$connection->close();

受け取る方

<?php
// デキュー用
// サーバーの外部からはアクセスできないようにすること。
//
// このプログラムは、コマンドラインで実行する
// php loop6.php
// あるいは
// nohup /usr/bin/php /home/★user/★project/php_service/loop6.php > /home/★user/★project/php_log/loop6.out.log 2> /home/★user/★project/php_log/loop6.err.log < /dev/null &

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// プロセス間通信の前準備をするぜ☆(^~^)
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QKey = '1111';
$channel->queue_declare($QKey, false, false, false, false);

// キューを開いてしまえば、あとは無限ループ!

// デキューしたタイミングですることをここに書く
$callback = function($msg) {
    echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume($QKey, '', false, true, false, false, $callback);

// 無限ループ
while(count($channel->callbacks))
{
    // ここでブロックしながらデキューする
    $channel->wait();
}
// ここに来ない
// $channel->close();
// $connection->close();

無限ループからどうやって抜けんの?

「RabbitMQにPHPクライアント(php-amqplib)からメッセージを送信、受信する」(Symfoware)
http://symfoware.blog68.fc2.com/blog-entry-1490.html

うーむ。

「How on PHP cancel consumer in RabbitMQ?」(stack overflow)
http://stackoverflow.com/questions/24017760/how-on-php-cancel-consumer-in-rabbitmq

「PHP RabbitMQ setTimeout or other option to stop waiting for queue」(stack overflow)
http://stackoverflow.com/questions/33930923/php-rabbitmq-settimeout-or-other-option-to-stop-waiting-for-queue

また usi が効かない

終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > usi
04:17:55 594 34_cs (P)___ enqueue begin [usi]
04:17:55 594 34_cs (P)___ channel.open begin
04:17:55 594 34_cs (P)___ connection.open begin
04:17:55 607 34_cs (P)___ connection.open end
04:17:55 609 34_cs (P)___ channel.open end
04:17:55 615 34_cs (P)___ enqueue end
04:17:55 615 34_cs (P)___ connection.close begin
04:17:55 615 34_cs (P)___ channel.close begin
04:17:55 622 34_cs (P)___ channel.close end
04:17:55 627 34_cs (P)___ connection.close end
Enqueue? > usi
04:18:05 465 34_cs (P)___ enqueue begin [usi]
04:18:05 465 34_cs (P)___ channel.open begin
04:18:05 465 34_cs (P)___ connection.open begin
04:18:05 475 34_cs (P)___ connection.open end
04:18:05 478 34_cs (P)___ channel.open end
04:18:05 478 34_cs (P)___ enqueue end
04:18:05 478 34_cs (P)___ connection.close begin
04:18:05 478 34_cs (P)___ channel.close begin
04:18:05 480 34_cs (P)___ channel.close end
04:18:05 480 34_cs (P)___ connection.close end

プロセスを調べるか。

# ps aux | grep php
root      6091  0.0  0.0 311040   324 ?        Ss   Mar07   4:28 php-fpm: master process (/etc/php/7.0/fpm/php-fpm.conf)
www-data  6094  0.0  0.0 311040     0 ?        S    Mar07   0:00 php-fpm: pool www
www-data  6095  0.0  0.0 311040     0 ?        S    Mar07   0:00 php-fpm: pool www
root     19662  0.0  0.0  12936   988 pts/0    S+   04:20   0:00 grep --color=auto php

これが PHP の受信ループだったりしないんだろうか?

# kill 6095
# kill 6094
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > usi
04:21:47 062 34_cs (P)___ enqueue begin [usi]
04:21:47 062 34_cs (P)___ channel.open begin
04:21:47 062 34_cs (P)___ connection.open begin
04:21:47 071 34_cs (P)___ connection.open end
04:21:47 072 34_cs (P)___ channel.open end
04:21:47 077 34_cs (P)___ enqueue end
04:21:47 077 34_cs (P)___ connection.close begin
04:21:47 077 34_cs (P)___ channel.close begin
04:21:47 081 34_cs (P)___ channel.close end
04:21:47 087 34_cs (P)___ connection.close end
Enqueue? > 04:21:47 095 34_cs <-------- [interrupt!] Dequeue(^q^) id name ukamuse_SDT4
id author Hiraoka Takuya

option name Best_Book_Move type check default false
option name Book_File type string default book/20150503/book.bin
option name Byoyomi_Margin type spin default 500 min 0 max 2147483647
option name Clear_Hash type button
option name Draw_Ply type spin default 256 min 1 max 2147483647
option name Engine_Name type string default ukamuse_SDT4
option name Max_Book_Ply type spin default 32767 min 0 max 32767
option name Max_Random_Score_Diff type spin default 0 min 0 max 32600
option name Max_Random_Score_Diff_Ply type spin default 32767 min 0 max 32767
option name Min_Book_Ply type spin default 32767 min 0 max 32767
option name Min_Book_Score type spin default -180 min -32601 max 32601
option name Minimum_Thinking_Time type spin default 20 min 0 max 2147483647
option name Move_Overhead type spin default 30 min 0 max 5000
option name MultiPV type spin default 1 min 1 max 594
option name OwnBook type check default true
option name Slow_Mover type spin default 89 min 1 max 1000
option name Slow_Mover_10 type spin default 10 min 1 max 1000
option name Slow_Mover_16 type spin default 20 min 1 max 1000
option name Slow_Mover_20 type spin default 40 min 1 max 1000
option name Threads type spin default 2 min 1 max 256
option name Time_Margin type spin default 4500 min 0 max 2147483647
option name USI_Hash type spin default 256 min 1 max 1048576
option name USI_Ponder type check default true
usiok

あっ、効いた。

isready

position や go を送っても反応がなくなったときは、
isready を送ってみると 反応が返ってくることがある。

初回より後の isready は早い。

position、go 以外に何かステップがあっただろうか?

> isready
< readyok
> usinewgame
> position ~
> go 

のサイクルだと回る。なんで isready が要るのかは分からない。

セマフォを作ろう

(セマフォ (semaphore))「「分かりそう」で「分からない」でも「分かった」気になれるIT用語辞典」
http://wa3.i-3-i.info/word13357.html

ブラウザで多人数同時アクセスが起こるわけだが、とりあえず段階的に

  • 浮かむ瀬にアクセスできるのは 同時に1人
  • それ以外の人は エラーで 追い払われる

という単純な実装を目指そう。

現状では

  • 浮かむ瀬にアクセスできる人数に上限は特にない。RabbitMQがくたばるまで
  • アクセスしてもブラウザは待機中になる
  • 指し手が返ってきても、それが自分の局面の指し手か分からない(混線)

といった感じになっている。

どうやってセマフォを作るのか?

RabbitMQを用いたプロセス間通信が 逆に弱点になるケースとして、ロック/アンロック 形式は向いていない。

なにしろ、キュー なのだ。キューは「しばらくお待ちください」という原理だ。

強いて言えば 3人並んでいるからまだ大丈夫、と100人ぐらいが思っていて同時にアクセスして 103人の行列ができる、というのが 工夫の要らない使い方だ。

「RabbitMQ Tutorials」(RabbitMQ)
http://www.rabbitmq.com/getstarted.html

上のサイトの図を見て分かるように、「P」は1つで「C」は1~複数だ。赤いチューブは 待ち行列で、「P」はいくつでもメッセージを送ってください、という思想だ。「P」を追い返す思想は 向いていない。

だから、ロック/アンロックを用いて 入場数制限をするための セマフォ は別途用意する必要がある。

とりあえず同時アクセスは そんなに起こらないだろう、という想定にする

RabbitMQ で、キューに溜まっているメッセージ数を調べる方法はあるだろうか。
また、キューに溜まっているのはメッセージ数であって、接続者すうではない。1手指すために isready、usinewgame、position、go の4メッセージを蓄えることになる。ホントは4メッセージも要らないんだが動きが怪しかったのでこの4メッセージを使っている。

ところで PHP側のソースコード

貼り付け忘れていたので。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//------------------------------------------------------------------------------
// URLの例
//------------------------------------------------------------------------------
//
// http://★サイト/tamesi26a4.php?sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves
//

// GETクエリ文字列を取得
$QMsg = urldecode($_SERVER['QUERY_STRING']);
if( "" === $QMsg )
{
    // 指定が無ければ平手初期局面、先手
    $QMsg = "sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves";
}

//------------------------------------------------------------------------------
// > isready
//------------------------------------------------------------------------------

// キューを選ぶ
//    $connection 接続
//    $channel    チャンネル
//    $QName      キュー名
//    queue_declare(
//        $queue       = ''   , キュー名
//        $passive     = false, キューの存在確認をするだけ
//        $durable     = false, ブローカーが停止してもキューは残す
//        $exclusive   = false, この接続の間だけメッセージを残す
//        $auto_delete = true , コンシューマがいなくなるとメッセージを消す
//        $nowait      = false, 
//        $arguments   = null , 
//        $ticket      = null   
//    )
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();
$QName      = '1112';
$channel->queue_declare($QName, false, true, false, true);

// メッセージをエンキューする
$msgObj = new AMQPMessage( 'isready' . "\n" );
$channel->basic_publish($msgObj, '', $QName);

// リソースを開放する
$channel   ->close();
$connection->close();

//------------------------------------------------------------------------------
// < readyok
//------------------------------------------------------------------------------

// 接続は使いまわさず、やり直す。
// queue_declare(
//     $queue       = ''   ,
//     $passive     = false,
//     $durable     = false,
//     $exclusive   = false,
//     $auto_delete = true ,
//     $nowait      = false,
//     $arguments   = null ,
//     $ticket      = null  
// )
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();
$QName      = '1113';
$channel->queue_declare($QName, false, true, false, false);

$callback = function($msg) {
    // メッセージをデキューしたタイミングですることをここに書く
    echo $msg->body . "\n";
};
$channel->basic_consume($QName, '', false, true, false, false, $callback);

// ここで、メッセージをデキューするまでブロック
$channel->wait();

$channel   ->close();
$connection->close();

//------------------------------------------------------------------------------
// > usinewgame
//------------------------------------------------------------------------------

// あとはメッセージの送受信の繰り返し
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();
$QName      = '1112';
$channel->queue_declare($QName, false, true, false, true);

$msgObj = new AMQPMessage( 'usinewgame' . "\n" );
$channel->basic_publish($msgObj, '', $QName);

$channel   ->close();
$connection->close();

//------------------------------------------------------------------------------
// > position ~
//------------------------------------------------------------------------------

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();
$QName      = '1112';
$channel->queue_declare($QName, false, true, false, true);

$msgObj = new AMQPMessage( 'position ' . $QMsg . "\n" );
$channel->basic_publish($msgObj, '', $QName);

$channel   ->close();
$connection->close();

//------------------------------------------------------------------------------
// > go
//------------------------------------------------------------------------------

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();
$QName      = '1112';
$channel->queue_declare($QName, false, true, false, true);

$msgObj = new AMQPMessage( 'go' . "\n" );
$channel->basic_publish($msgObj, '', $QName);

$channel   ->close();
$connection->close();

//------------------------------------------------------------------------------
// < bestmove ~
//------------------------------------------------------------------------------

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();
$QName      = '1113';
$channel->queue_declare($QName, false, true, false, false);

$callback = function($msg) {
    echo $msg->body . "\n";
};
$channel->basic_consume($QName, '', false, true, false, false, $callback);

$channel->wait();

$channel->close();
$connection->close();

「php-amqplib」(php-amqplib/php-amqplib)
https://github.com/php-amqplib/php-amqplib

ここから 欲しいメソッドを探すのがむずかしい。

クライアント側の関数から 何件のメッセージが溜まっているか 見えないんじゃないか?

仮に PHP のAPIではキューのメッセージ件数が見れないとしたら、何言語、何スクリプトだったら見えるんだ?

「[C#][PHP][C++] AMQP-CPP の使い方(RabbitMQ、プロセス間通信、メッセージキュー)」(Qiita)
http://qiita.com/muzudho1/items/5a36ecdcd9b0ba444577

C++なら channel.declareQueue のonSuccessハンドラの引数でメッセージ数は見れるようだ。
phpからこれを expect で叩く仕組みを作ればいいだろうか?

じゃあひとまず C++ で作ってみよう。

作った

Makefile

.PHONY: all clean

all: tamesi37a1.exe

tamesi37a1_msgq.o: tamesi37a1_msgq.cpp tamesi37a1_msgq.hpp
        g++ -std=c++11 -c tamesi37a1_msgq.cpp -o tamesi37a1_msgq.o -lev -lamqpcpp -pthread

tamesi37a1.o: tamesi37a1.cpp tamesi37a1_msgq.hpp
        g++ -std=c++11 -c tamesi37a1.cpp -o tamesi37a1.o -lev -lamqpcpp -pthread

tamesi37a1.exe: tamesi37a1.o tamesi37a1_msgq.o
        g++ -std=c++11 tamesi37a1.o tamesi37a1_msgq.o -o tamesi37a1.exe -lev -lamqpcpp -pthread

clean:
        rm -f tamesi37a1.d tamesi37a1.o tamesi37a1_msgq.o tamesi37a1.exe tamesi37a1.out.log tamesi37a1.err.log

tamesi37a1_msgq.hpp

// This program for Ubuntu 16.04.

#ifndef TAMESI37A1_MSGQ_HPP
#define TAMESI37A1_MSGQ_HPP
// #pragma once

#include <string>
#include <mutex>

// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

namespace msgq
{
	static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
	static std::string exchange_name = "myexchange";
	static std::string routing_key = "";

	// AMQP-CPPでの実装 :
	// AMQP::durable=[1] RabbitMQが止まってもキューを残す
	// AMQP::autodelete=[2] コンシューマーが1人も接続していなかったら消す
	// AMQP::passive=[8] キューが存在するかどうかチェックするだけ。中身見ない時これ
	// AMQP::exclusive=[512] この接続でだけ使える。この接続が切れたら消す
	typedef int LifeSpan_t;

	class Settings
	{
		std::string name_queue_;
		std::mutex _mutex_queueName;
		msgq::LifeSpan_t lifeSpan_queue_;
		std::mutex _mutex_lifeSpan;

	public:
		Settings();
		~Settings();

		void setQueueName(std::string value);
		std::string getQueueName();
		void setQueueLifeSpan(msgq::LifeSpan_t value);
		msgq::LifeSpan_t getQueueLifeSpan();

		bool parseCommandlineArgs(int argc, char* argv[]);

		std::string Dump();
	};

	// 接続はシングルトンにします
	static struct ev_loop* pLoop_ev = ev_loop_new();
	static AMQP::LibEvHandler* pHandler_ev = new AMQP::LibEvHandler(pLoop_ev);

	static AMQP::TcpConnection* pConnection_ev = nullptr;
	static AMQP::TcpConnection* getConnection()
	{
		if (nullptr == pConnection_ev) {
			pConnection_ev = new AMQP::TcpConnection(pHandler_ev, ADDRESS);
		}
		return pConnection_ev;
	}
	static void closeChannel();
	static void closeConnection()
	{
		if (nullptr != pConnection_ev) {
			// チャンネルにもヌルのフラグを入れる
			closeChannel();

			pConnection_ev->close();
			pConnection_ev = nullptr;
		}
	}

	// チャンネルはシングルトンにします。
	static AMQP::TcpChannel* pChannel_ev = nullptr;
	static AMQP::TcpChannel* getChannel()
	{
		if (nullptr == pChannel_ev)
		{
			pChannel_ev = new AMQP::TcpChannel(getConnection());
		}
		return pChannel_ev;
	}
	static void closeChannel()
	{
		if (nullptr != pChannel_ev) {
			pChannel_ev->close();
			pChannel_ev = nullptr;
		}
	}
}

#endif // #ifndef TAMESI37A1_MSGQ_HPP

tamesi37a1_msgq.cpp

// This program for Ubuntu 16.04.

// #include "stdafx.h"
#include "tamesi37a1_msgq.hpp"

#include <iostream> // std::cout
#include <sstream> // std::ostringstream

namespace msgq
{
	static LifeSpan_t LifeSpanString_To_Int(std::string lifeSpan)
	{
		if ("durable" == lifeSpan) {
			return AMQP::durable;
		}
		else if ("autodelete" == lifeSpan) {
			return AMQP::autodelete;
		}
		else if ("passive" == lifeSpan) {
			return AMQP::passive;
		}
		else if ("exclusive" == lifeSpan) {
			return AMQP::exclusive;
		}
		else {
			std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
			exit(11);
		}
	}

	Settings::Settings()
	{
		name_queue_ = "";
		lifeSpan_queue_ = (LifeSpan_t)0;
	}
	Settings::~Settings()
	{
	}

	void Settings::setQueueName(std::string value) {
		std::lock_guard<std::mutex> lock(_mutex_queueName);
		name_queue_ = value;
	}
	std::string Settings::getQueueName() {
		std::lock_guard<std::mutex> lock(_mutex_queueName);
		return name_queue_;
	}
	void Settings::setQueueLifeSpan(LifeSpan_t value) {
		std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
		lifeSpan_queue_ = value;
	}
	LifeSpan_t Settings::getQueueLifeSpan() {
		std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
		return lifeSpan_queue_;
	}

	bool Settings::parseCommandlineArgs(int argc, char* argv[])
	{
		std::string name_queue = "";
		LifeSpan_t lifeSpan_queue = 0;

		if (0<argc)
		{
			// プログラム名を省き、コマンドライン引数だけをつなげる。
			std::string cmdArg;
			for (int i = 1; i < argc; ++i)
			{
				cmdArg += std::string(argv[i]);
				if (i < argc) {
					cmdArg += " ";
				}
			}
			std::istringstream data(cmdArg);

			// 与件
			// 「--queue 1112 durable autodelete」
			// 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」

			// 受け皿
			// std::string name_queue = "";
			// LifeSpan_t lifeSpan_queue = 0;

			// 記憶
			int m0 = -1; // queue index.
			int m1 = -1; // lifespans index.

						 // 解析器
			std::string a;
			while (data >> a) {

				if ("--" == a.substr(0, 2)) {//先頭の2文字が「--」の場合
					if ("--queue" == a) { m0 = 0;	m1 = -1; }
					else { break; }
				}
				else if (
					">" == a.substr(0, 1)//先頭の1文字が「>」の場合
					|| "<" == a.substr(0, 1)//「<」
					|| ">" == a.substr(1, 1)//先頭から2文字目が「>」の場合
					) {
					// 標準入力、標準出力、標準エラーを無視
					break;
				}
				else if (0 == m0) { // キュー名
					if (-1 == m1) {
						name_queue = a;
						lifeSpan_queue = 0;
						m1++;
					}
					else {
						lifeSpan_queue |= LifeSpanString_To_Int(a);
					}
				}
				else
				{
					// 欲しい内容がくるまでスルー
				}
			}
		}

		if ("" != name_queue) {
			setQueueName(name_queue);
			setQueueLifeSpan(lifeSpan_queue);
			return true;
		}

		std::cerr << "コマンドライン引数の「--queue 送信先キュー名 寿命」を漏れなく指定してください。" << std::endl
			<< "例: --queue 1113 durable autodelete" << std::endl
			<< "寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」" << std::endl
			<< "name_queue=[" << name_queue << "]" << std::endl;
		return false;
	}

	std::string Settings::Dump()
	{
		static std::ostringstream sb;
		sb << "Dump" << std::endl
			<< "    name_queue     =[" << getQueueName() << "]" << std::endl
			<< "    lifeSpan_queue =[" << getQueueLifeSpan() << "]" << std::endl
			<< "    ----" << std::endl;
		return sb.str();
	}

}

tamesi37a1.cpp

//--------------------------------------------------------------------------------
// OS      : Windows10 : It not work. This program for Ubuntu 16.04.
//
// OS      : Ubuntu 16.04
// Library : libev
//         : Install   : Command  : sudo apt-get update
//                                : sudo apt-get install libev-dev
// Service : RabbitMQ
//         : Install   : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
//         : Reference : Web site : Top page http://www.rabbitmq.com/
//                     : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
//                     : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
//         : Manual    : Command  : man rabbitmqctl
//         : Start     : Command  : (1) rabbitmq-server
//                                : (2) [Ctrl] + [Z]
//                                : (3) bg (Job Number)
//         : Stop      : Command  : rabbitmqctl stop
//         : Check     : Command  : rabbitmqctl status
//         :           : Command  : rabbitmqctl list_queues
//                                : // Count messages in queues.
// Library : AMQP-CPP
//
// Program : this
//         : Author                         : Satoshi TAKAHASHI (Handle. Muzudho)
//         : License                        : MIT License https://opensource.org/licenses/MIT
//         : Explain                        : This program count messages in queue.
//                                          : Please settings queue to command line argument.
//         : Compile   : Command            : make
//                                : Explain : Please, read Makefile.
//         : Execute   : Command            : ./tamesi37a1.exe --queue 1112 durable autodelete
//                                          : Run on the foreground.
//                                : Explain :     ./tamesi37a1.exe         Executable file
//                                          :     --queue                  queue settings section
//                                          :     1112                     queue name                  (string ok)
//                                          :     durable                  queue life span             (multiple)
//                                          :     autodelete               queue life span
//                                : Explain : command line argument of life span of queue (Compositable)
//                                          :     durable     The queue exists, Even if the RabbitMQ service closed.
//                                          :     autodelete  The queue delete when consumers to be nothing.
//                                          :     passive     Just check what queue exists. Don't use read/write to queue.
//                                          :     exclusive   The queue delete when connection closed. The queue exists is this connection only.
//                                : FAQ     : Failures case: Message conflict when starting multiple processes.
//                                : Example : Windows 10   (This program not work) default queue life span.  durable
//                                          : Ubuntu 16.04                         default queue life span.  durable autodelete
//         : Stop      : Typing   : [Ctrl]+[C]
//
// Referrences :
//             : AMQP-CPP : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
//                                   : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
//                                   : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
//                                   : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
//                                   : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
//                                   : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
//             : This     : Web site : ラムダ式(C++11) (cpprefjp - C++日本語リファレンス) https ://cpprefjp.github.io/lang/cpp11/lambda_expressions.html
//
//--------------------------------------------------------------------------------

#include <string> // std::string
#include <iostream> // std::cout

// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

#include "tamesi37a1_msgq.hpp"

int main(int argc, char* argv[])
{
	msgq::Settings settings;

	// 引数の解析
	if (!settings.parseCommandlineArgs(argc, argv))
	{
		exit(12);
	}

	std::string			qn		= settings.getQueueName();
	msgq::LifeSpan_t	life	= settings.getQueueLifeSpan();

	AMQP::TcpChannel* pChannel = msgq::getChannel();

	//----------------------------------------
	// 指定したキューの残りメッセージ数を返す
	//----------------------------------------
	auto callback = [](const std::string &name, int msgcount, int consumercount) {
		std::cout << "queueName = \"" << name << "\"" << std::endl
			<< "messages = " << msgcount << std::endl
			<< "nconsumers = " << consumercount << std::endl;
		// 接続を切って、このイベントハンドラから抜けることで、ev_runループから抜ける
		msgq::closeConnection();
	};
	pChannel->declareQueue(qn,life)
		.onSuccess(callback);

	//----------------------------------------
	// ループ
	//----------------------------------------
	ev_run(msgq::pLoop_ev, 0);

	return 0;
}

tamesi37a1.expect

#!/usr/bin/expect --
# コマンドライン引数の例
# --queue 1112 durable autodelete

# カレント・ディレクトリを設定
cd /home/★user/★project/cpp_service/t37a1.d

# Expectをタイムアウトさせない
set timeout -1

spawn -noecho ./tamesi37a1.exe ${argv}

expect -re "queueName = "
expect -re "messages = "
expect -re "consumers = "

tamesi26a5.php

<?php
// 処理時間計測
$time=microtime(true);

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//------------------------------------------------------------------------------
// URLの例
//------------------------------------------------------------------------------
//
// http://★サイト/tamesi26a5.php?sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves
//

// GETクエリ文字列を取得
$QMsg = urldecode($_SERVER['QUERY_STRING']);
if( '' === $QMsg )
{
    $QMsg = 'sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves';
}

//------------------------------------------------------------------------------
// busy確認
//------------------------------------------------------------------------------

exec('/usr/bin/expect /home/★user/★project/expect_service/tamesi37a1.expect --queue 1112 durable autodelete',$o);
foreach($o as $v)
{
    if(0===strpos($v,'messages = '))
    {
        $messages = intval( substr($v,11) );
        if( 0 < $messages ){
            die( 'error = "busy"' . "\n" . 'busy = ' . $messages . "\n" );
        }
    }
    //else
    //{
    //    echo 'v = ' . $v . "\n";
    //}
}

//------------------------------------------------------------------------------
// > isready
//------------------------------------------------------------------------------

// キューを開く
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1112';
// queue_declare(
//        $queue       = '',
//        $passive     = false,
//        $durable     = false,
//        $exclusive   = false,
//        $auto_delete = true,
//        $nowait      = false,
//        $arguments   = null,
//        $ticket      = null
$channel->queue_declare($QName, false, true, false, true);

// エンキューする
$msgObj = new AMQPMessage( 'isready' . "\n" );
$channel->basic_publish($msgObj, '', $QName);

// リソースを開放する
$channel->close();
$connection->close();

//------------------------------------------------------------------------------
// < readyok
//------------------------------------------------------------------------------

// 接続は使いまわさず、やり直す。
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1113';
// queue_declare(
//        $queue       = '',
//        $passive     = false,
//        $durable     = false,
//        $exclusive   = false,
//        $auto_delete = true,
//        $nowait      = false,
//        $arguments   = null,
//        $ticket      = null
$channel->queue_declare($QName, false, true, false, false);

// キューを開いてしまえば、あとは無限ループ!

// デキューしたタイミングですることをここに書く
$callback = function($msg) {
    if('readyok' !== rtrim($msg->body))
    {
        die( "error = \"no readyok.\"\n" . $msg->body . "\n" );
    }
};
$channel->basic_consume($QName, '', false, true, false, false, $callback);

// ここでブロックしながらデキューする。強制終了するまで続く
// wait($allowed_methods=null, $non_blocking = false, $timeout = 0)
//$timeoutSeconds = 2;
//$channel->wait(null, false, timeoutSeconds);
$channel->wait();

$channel->close();
$connection->close();

//------------------------------------------------------------------------------
// > usinewgame
//------------------------------------------------------------------------------

// キューを開く
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1112';
$channel->queue_declare($QName, false, true, false, true);

// エンキューする
$msgObj = new AMQPMessage( 'usinewgame' . "\n" );
$channel->basic_publish($msgObj, '', $QName);

// リソースを開放する
$channel->close();
$connection->close();

//------------------------------------------------------------------------------
// > position ~
//------------------------------------------------------------------------------

// キューを開く
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1112';
$channel->queue_declare($QName, false, true, false, true);

// エンキューする
$msgObj = new AMQPMessage( 'position ' . $QMsg . "\n" );
$channel->basic_publish($msgObj, '', $QName);

// リソースを開放する
$channel->close();
$connection->close();

//------------------------------------------------------------------------------
// > go
//------------------------------------------------------------------------------

// キューを開く
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1112';
$channel->queue_declare($QName, false, true, false, true);

// エンキューする
$msgObj = new AMQPMessage( 'go' . "\n" );
$channel->basic_publish($msgObj, '', $QName);

// リソースを開放する
$channel->close();
$connection->close();

//------------------------------------------------------------------------------
// < bestmove ~
//------------------------------------------------------------------------------
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1113';
$channel->queue_declare($QName, false, true, false, false);

$callback = function($msg) {
    echo $msg->body . "\n";
};
$channel->basic_consume($QName, '', false, true, false, false, $callback);

// ここでブロックしながらデキューする。強制終了するまで続く
$channel->wait();

$channel->close();
$connection->close();

//------------------------------------------------------------------------------
// 処理時間表示
//------------------------------------------------------------------------------
echo '#&lt;br /&gt;<br />' . "\ntime = " . (microtime(true) - $time);

浮かむ瀬をどのように改造したのか

common.hpp

// (2017-03-22 Modify begin) 標準入出力は全て変更
#define SYNCCOUT SERVIO_BEGIN
#define SYNCENDL SERVIO_END_CLEAN
// (2017-03-22 Modify end)

標準入出力は全て変更した。量が多いので1つ1つは見ていけないが……。

search.cpp

	// (2017-03-22 Modify begin) 標準入出力は全て変更
	if (nyugyokuWin)
	{
		SERVIO_BEGIN << "bestmove win" << SERVIO_END_FLUSH(searcher, "NyugyokuWin");
	}
    else if (!bestThread->rootMoves[0].pv[0])
	{
		SERVIO_BEGIN << "bestmove resign" << SERVIO_END_FLUSH(searcher, "Resign");
	}
    else {
		if (bestThread->rootMoves[0].pv.size() > 1 || bestThread->rootMoves[0].extractPonderFromTT(pos)) {
			// ポンダー付きの場合
			SERVIO_BEGIN << "bestmove " << bestThread->rootMoves[0].pv[0].toUSI() << " ponder " << bestThread->rootMoves[0].pv[1].toUSI() << SERVIO_END_FLUSH(searcher, "BestmoveAndPonder");
		}
		else {
			SERVIO_BEGIN << "bestmove " << bestThread->rootMoves[0].pv[0].toUSI() << SERVIO_END_FLUSH(searcher, "Bestmove");
		}
    }
	// (2017-03-22 Modify end)

こんな風に、標準出力ストリームがあるところは 全て置き換えた、あるいは見た目上変わりはないが、マクロの中身が変わったと考えていい。

USIループも変えた

標準出力の改造より、標準入力の改造の方が 大がかりだ。

usi.cpp

void Searcher::doUSICommandLoop(int argc, char* argv[]) {
#if UBUNTU
	// 引数の解析
	std::string             name_enqueue = "";
	service_io::LifeSpan_t  lifeSpan_enqueue = 0;
	std::string             name_dequeue = "";
	service_io::LifeSpan_t  lifeSpan_dequeue = 0;
	{
		if (!service_io::parseArgs(argc, argv, name_enqueue, lifeSpan_enqueue, name_dequeue, lifeSpan_dequeue))
		{
			exit(12);
		}
		thisptr->serviceIo2.SetQueueName    (service_io::ENQUEUE_INDEX, name_enqueue);
		thisptr->serviceIo2.SetQueueLifeSpan(service_io::ENQUEUE_INDEX, lifeSpan_enqueue);
		thisptr->serviceIo2.SetQueueName    (service_io::DEQUEUE_INDEX, name_dequeue);
		thisptr->serviceIo2.SetQueueLifeSpan(service_io::DEQUEUE_INDEX, lifeSpan_dequeue);
	}

	// デキューの常時監視をスタート
	auto th1 = std::thread([] { service_io::workConsume(*thisptr); });
#endif

	bool dialogue = false;
	// 「./apery」のように引数無しで打鍵して実行された場合に限り、なんども標準入力を待つ。
	if (argc == 1)
	{
		dialogue = true;
	}
	bool evalTableIsRead = false;
	Position pos(DefaultStartPositionSFEN, threads.main(), thisptr);

	std::string cmd;
	std::string token;

	// プログラム名を省き、コマンドライン引数だけをつなげる。
	for (int i = 1; i < argc; ++i)
	{
		cmd += std::string(argv[i]) + " ";
	}

	do {
		// 文字が打たれたらその文字を、空文字を打たれたら "quit" 扱いとする。
		if (dialogue)
		{
			cmd = service_io::dequeue();
		}

		// 半角スペース区切りの最初の1トークンを拾う
		std::istringstream ssCmd(cmd);
		ssCmd >> std::skipws >> token;

		if (token == "--enqueue" || token == "--dequeue") {
#if UBUNTU
			dialogue = true;
#else
			std::cerr << "Command line parameter error : \"--msgqueue\" option for UBUNTU.";
#endif
		}
//		else if (token == "tu") {
//#if UBUNTU
//			// tu と飛んできたら ka と返す
//			service_io::oss << "ka";
//			service_io::flushMessage();
//#else
//			std::cerr << "Command error : \"tu\" command for UBUNTU.";
//#endif
//		}
		else if (token == "quit" || token == "stop" || token == "ponderhit" || token == "gameover") {
			if (token != "ponderhit" || signals.stopOnPonderHit) {
				signals.stop = true;
				threads.main()->startSearching(true);
			}
			else
				limits.ponder = false;
			if (token == "ponderhit" && limits.moveTime != 0)
				limits.moveTime += timeManager.elapsed();
		}
		else if (token == "go") go(pos, ssCmd);
		else if (token == "position") setPosition(pos, ssCmd);
		else if (token == "usinewgame"); // isready で準備は出来たので、対局開始時に特にする事はない。
		else if (token == "usi")
		{
			SERVIO_BEGIN << "id name " << std::string(options["Engine_Name"])
				<< "\nid author Hiraoka Takuya"
				<< "\n" << options
				<< "\nusiok" << SERVIO_END_FLUSH(thisptr, "Usi");
		}
		else if (token == "isready") { // 対局開始前の準備。
			tt.clear();
			threads.main()->previousScore = ScoreInfinite;
			if (!evalTableIsRead) {
				// 一時オブジェクトを生成して Evaluator::init() を呼んだ直後にオブジェクトを破棄する。
				// 評価関数の次元下げをしたデータを格納する分のメモリが無駄な為、
				std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
				evalTableIsRead = true;
			}
			SERVIO_BEGIN << "readyok" << SERVIO_END_FLUSH(thisptr, "Isready");
		}
		else if (token == "setoption") setOption(ssCmd);
		else if (token == "write_eval") { // 対局で使う為の評価関数バイナリをファイルに書き出す。
			if (!evalTableIsRead)
				std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
			Evaluator::writeSynthesized(Evaluator::evalDir);
		}
#if defined LEARN
		else if (token == "l") {
			auto learner = std::unique_ptr<Learner>(new Learner);
			learner->learn(pos, ssCmd);
		}
		else if (token == "make_teacher") {
			if (!evalTableIsRead) {
				std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
				evalTableIsRead = true;
			}
			make_teacher(ssCmd);
		}
		else if (token == "use_teacher") {
			if (!evalTableIsRead) {
				std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
				evalTableIsRead = true;
			}
			use_teacher(pos, ssCmd);
		}
		else if (token == "check_teacher") {
			check_teacher(ssCmd);
		}
		else if (token == "print") printEvalTable(SQ88, f_gold + SQ78, f_gold, false);
#endif
#if !defined MINIMUL
		// 以下、デバッグ用
		else if (token == "bench") {
			if (!evalTableIsRead) {
				std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
				evalTableIsRead = true;
			}
			benchmark(pos);
		}
		else if (token == "key") { SYNCCOUT << pos.getKey() << SYNCENDL; }
		else if (token == "tosfen") { SYNCCOUT << pos.toSFEN() << SYNCENDL; }
		else if (token == "eval") { SYNCCOUT << evaluateUnUseDiff(pos) / FVScale << SYNCENDL; }
		else if (token == "d") { pos.print(); }
		else if (token == "s") { measureGenerateMoves(pos); }
		else if (token == "t") { SYNCCOUT << pos.mateMoveIn1Ply().toCSA() << SYNCENDL; }
		else if (token == "b") { makeBook(pos, ssCmd); }
#endif
		else { SYNCCOUT << "unknown command: [" << cmd << "]" << SYNCENDL; }
	} while (token != "quit" && dialogue);

#if UBUNTU
	// th1 スレッドの終了を待つ
	th1.join();
#endif

	threads.main()->waitForSearchFinished();
}

バックグラウンドに常駐させる前提なので、標準入出力は塞ぎ、通信はメッセージ・キューを用いたプロセス間通信に置き換えた。

メッセージの送信先をエンキュー、
メッセージの受信元をデキュー と呼ぶことにした。

service_io::dequeue();

という部分で ブラウザから送られてくるメッセージを取得している。

また apery はコマンドライン引数を 単にコマンドと解釈してループの中で解析するので、
コマンドライン引数「--enqueue ~略~ --dequeue ~略~」を使いたいときに困った。
そこで 改造を加えたが、これが悪影響を与えていないものかどうか。

グローバル変数 searcher

コンピュータ将棋はマルチスレッドで動いているので、グローバル変数を用意しても、別スレッドからは見えないので グローバルには利用できない。

そこで、どのスレッドからも利用できるグローバル変数のように利用されている Searcher 構造体に、メッセージ・キュー名等の、グローバルに利用したいデータを追加した。

search.hpp

// (2017-03-20 Add begin)
#ifdef UBUNTU
class ServiceIo2
{
public:
	std::string name_queues_[2];// = { "", "" };
	std::mutex _mutex_queueName;
	service_io::LifeSpan_t lifeSpan_queues_[2];// = { (service_io::LifeSpan_t)0, (service_io::LifeSpan_t)0 };
	std::mutex _mutex_lifeSpan;

	void SetQueueName(int index, std::string value);
	std::string GetQueueName(int index);
	void SetQueueLifeSpan(int index, service_io::LifeSpan_t value);
	service_io::LifeSpan_t GetQueueLifeSpan(int index);
};
#endif
// (2017-03-20 Add end)

構造体の中

	// (2017-03-20 Add begin)
#ifdef UBUNTU
public:
	STATIC ServiceIo2 serviceIo2;
#endif
	// (2017-03-20 Add end)

ファイルの末尾

// (2017-03-20 Add begin)
namespace service_io
{
#ifdef UBUNTU
	// メッセージキューにエンキュー
	// 出力ストリームのカンマ区切り「,,,;」で使われ、何か数字を返す必要があって必然性はない数字 0 を返している
	static int flushMessage(Searcher* pSearcher, std::string hint)
	{
		std::string message = service_io::oss.str();
		oss.str(std::string());// 空文字列を入れてクリアーする

		std::string name_enqueue = pSearcher->serviceIo2.GetQueueName(ENQUEUE_INDEX);
		LifeSpan_t lifeSpan_enqueue = pSearcher->serviceIo2.GetQueueLifeSpan(ENQUEUE_INDEX);
		if ("" == name_enqueue)
		{
			std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.SM_ queue name is empty=[" << name_enqueue << "] ENQUEUE_INDEX=[" << ENQUEUE_INDEX << "] lifeSpan=[" << lifeSpan_enqueue << "] hint=[" << hint << "]" << "\n";
			exit(1);
		}

		// チャンネルを宣言
		getChannel(ENQUEUE_INDEX)->declareQueue(name_enqueue, lifeSpan_enqueue)
			.onError([=](const char* errMsg) {
			std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.SM_.declareQueue.onError [" << errMsg << "] queue name=[" << name_enqueue << "] lifeSpan=[" << lifeSpan_enqueue << "]" << "\n";
		});

		// チャンネルを束縛
		getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_enqueue, routing_key)
			// ev_run から1回だけ呼び出される。コミットを開始してこのハンドラから抜けること。コミットの成功時に接続を切ることでev_runから抜ける。
			.onSuccess([=]() {//&message

							  // トランザクション開始
			getChannel(ENQUEUE_INDEX)->startTransaction();
			// パブリッシュ開始
			if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {

				// コミットを開始
				getChannel(ENQUEUE_INDEX)->commitTransaction()
					.onSuccess([=]() {
					// コミット成功

					// 接続を切ったあと、トランザクションのコミットが実行されるようだ。
					closeConnection(ENQUEUE_INDEX);
				})
					.onError([=](const char *errMsg) {//&message
					std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
				});
			}
			else {
				std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
				getChannel(ENQUEUE_INDEX)->rollbackTransaction();
			}

		}).onError([](const char* errMsg) {
			std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
		});

		// ここで、バインドキューのオンサクセスの処理へ。
		ev_run(pLoop_ev[ENQUEUE_INDEX], 0);

		return 0;
	}

	// メッセージ・キューの監視を開始
	static void workConsume(Searcher& searcher)
	{
		std::string name_dequeue = searcher.serviceIo2.GetQueueName(DEQUEUE_INDEX);
		LifeSpan_t lifeSpan_dequeue = searcher.serviceIo2.GetQueueLifeSpan(DEQUEUE_INDEX);

		// I will go to the front of the box named "1111".
		getChannel(DEQUEUE_INDEX)->declareQueue(name_dequeue, lifeSpan_dequeue);

		// I look inside the box.
		auto errorCb = [&lifeSpan_dequeue](const char *errMsg) {
			std::cerr << getTimeslipNow() << "(>_<)cpp ___(C) CNS.onError My ID watching failed [" << errMsg << "] lifeSpan_dequeue=[" << lifeSpan_dequeue << "]" << std::endl;
		};
		auto messageCb = [](const AMQP::Message &amqpMessage, uint64_t deliveryTag, bool redelivered) {
			std::string message(amqpMessage.body(), amqpMessage.bodySize());

			rotationBuffer::push_block(message);

			getChannel(DEQUEUE_INDEX)->ack(deliveryTag);
		};
		getChannel(DEQUEUE_INDEX)->consume(name_dequeue)
			.onReceived(messageCb)
			.onError(errorCb);

		// I will keep on forever.
		ev_run(pLoop_ev[DEQUEUE_INDEX]);

		// I will not come here.
		return;
	}
#else
	static int flushMessage(Searcher* pSearcher, std::string hint) {
		std::cout << IOLock << service_io::oss.str() << std::endl << IOUnlock;
		service_io::oss.str(std::string());// 空文字列を入れてクリアーする
		return 0;
	}
#endif
}
// (2017-03-20 Add end)

search.cpp

// (2017-03-20 Add begin)
#ifdef UBUNTU
ServiceIo2 Searcher::serviceIo2;
#endif
// (2017-03-20 Add end)

init( )

// (2017-03-20 Add begin)
#ifdef UBUNTU
	serviceIo2.name_queues_[service_io::ENQUEUE_INDEX] = "";
	serviceIo2.name_queues_[service_io::DEQUEUE_INDEX] = "";
	serviceIo2.lifeSpan_queues_[service_io::ENQUEUE_INDEX] = (service_io::LifeSpan_t)0;
	serviceIo2.lifeSpan_queues_[service_io::DEQUEUE_INDEX] = (service_io::LifeSpan_t)0;
#endif
// (2017-03-20 Add end)

ファイル末尾

// (2017-03-20 Add begin)
#ifdef UBUNTU
void ServiceIo2::SetQueueName(int index, std::string value) {
	std::lock_guard<std::mutex> lock(_mutex_queueName);
	name_queues_[index] = value;
}
std::string ServiceIo2::GetQueueName(int index) {
	std::lock_guard<std::mutex> lock(_mutex_queueName);
	return name_queues_[index];
}
void ServiceIo2::SetQueueLifeSpan(int index, service_io::LifeSpan_t value) {
	std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
	lifeSpan_queues_[index] = value;
}
service_io::LifeSpan_t ServiceIo2::GetQueueLifeSpan(int index) {
	std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
	return lifeSpan_queues_[index];
}
#endif
// (2017-03-20 Add end)

また、ファイルも2つ追加した。

service_io.hpp

// OS      : Windows 10
// IDE     : Visual Studio 2015
//           Install : NuGet   : Install-Package RabbitMQ.Client -Version 4.1.1
//
// OS      : Ubuntu 16.04

// Service : RabbitMQ
//         : Install   : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
//         : Reference : Web site : Top page http://www.rabbitmq.com/
//                     : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
//                     : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
//         : Manual    : Command  : man rabbitmqctl
//         : Start     : Command  : (1) rabbitmq-server
//                                : (2) [Ctrl] + [Z]
//                                : (3) bg (Job Number)
//         : Stop      : Command  : rabbitmqctl stop
//         : Check     : Command  : rabbitmqctl status
//                     : Command  : rabbitmqctl list_queues
//                                : // Count messages in queues.
//
// Program : this
//         : Compile   : Command  : make profgen_sse
//                     :          : mv apery ../bin/apery
//         : Execute   : Command  : ./apery --enqueue 1113 durable --dequeue 1112 durable autodelete > ./apery.out.log 2> ./apery.err.log < /dev/null &
// 対局
// ====
//     usi
//         usiok
//     isready
//         readyok
//     usinewgame
//     position sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves
//     go
//         例) bestmove 2g2f ponder 1c1d
//     quit

#ifndef APERY_SERVICE_IO_HPP
#define APERY_SERVICE_IO_HPP

#include <string> // std::string
#include <iostream> // std::cout
#include <sstream> // std::ostringstream
#include "common.hpp"


// #define UBUNTU


// 入出力を、サービスIOでラッピングします
#define SERVIO_BEGIN service_io::oss
#define SERVIO_END_CLEAN std::endl, service_io::cleanMessage()
#define SERVIO_END_FLUSH(pSearcher,hint) std::endl, service_io::flushMessage(pSearcher,hint)

namespace service_io
{
	// 文字列ストリーム出力を、文字列に置換するもの
	static std::ostringstream oss;
}


#ifdef UBUNTU
// OS      : Ubuntu 16.04
// Library : libev
//         : Install   : Command  : sudo apt-get update
//                                : sudo apt-get install libev-dev
// Service : RabbitMQ
//         : Reference : Web site : Top page http://www.rabbitmq.com/
//         : Install   : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
//         : Manual    : Command  : man rabbitmqctl
//         : Start     : Command  : rabbitmq-server
//         : Stop      : Command  : rabbitmqctl stop
//         : Check     : Command  : rabbitmqctl status
//         :           : Command  : rabbitmqctl list_queues
// Library : AMQP-CPP
//         : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
//
// service_io.hpp

// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

// バックグラウンド・プロセスに切り替えて使用する可能性も考慮した入出力API
namespace service_io
{
	// 時計はずれているが、現在時刻が返ってくる
	static std::string getTimeslipNow()
	{
		// どこか外国の 1969年12月31日19時ぐらいから数えたミリ秒だろうか?
		std::chrono::milliseconds ms = std::chrono::duration_cast< std::chrono::milliseconds >(
			std::chrono::system_clock::now().time_since_epoch()
			);

		long long hours = ms.count() / 1000 / 60 / 60 % 24;
		long long minutes = ms.count() / 1000 / 60 % 60;
		long long seconds = ms.count() / 1000 % 60;
		long long milliseconds = ms.count() % 1000;

		std::ostringstream oss;
		oss << std::setfill('0') << std::setw(2) << hours << ":" << std::setfill('0') << std::setw(2) << minutes << ":" << std::setfill('0') << std::setw(2) << seconds << "'" << std::setfill('0') << std::setw(3) << milliseconds << " ";
		return oss.str();
	}

	// AMQP-CPPでの実装 :
	// AMQP::durable=[1] RabbitMQが止まってもキューを残す
	// AMQP::autodelete=[2] コンシューマーが1人も接続していなかったら消す
	// AMQP::passive=[8] キューが存在するかどうかチェックするだけ。中身見ない時これ
	// AMQP::exclusive=[512] この接続でだけ使える。この接続が切れたら消す
	typedef int LifeSpan_t;

	// 0 : enqueue用設定
	// 1 : dequeue用設定
	static const int ENQUEUE_INDEX = 0;
	static const int DEQUEUE_INDEX = 1;
	static const int NUM_INDEX = 2;

	static LifeSpan_t LifeSpanString_To_Int(std::string lifeSpan)
	{
		if ("durable" == lifeSpan) {
			return AMQP::durable;
		}
		else if ("autodelete" == lifeSpan) {
			return AMQP::autodelete;
		}
		else if ("passive" == lifeSpan) {
			return AMQP::passive;
		}
		else if ("exclusive" == lifeSpan) {
			return AMQP::exclusive;
		}
		else {
			std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
			exit(11);
		}
	}

	static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
	static std::string exchange_name = "myexchange";
	static std::string routing_key = "";

	// 接続はシングルトンにします
	static struct ev_loop* pLoop_ev[2] = { ev_loop_new(), ev_loop_new() };
	static AMQP::LibEvHandler* pHandler_ev[2] = { new AMQP::LibEvHandler(pLoop_ev[ENQUEUE_INDEX]), new AMQP::LibEvHandler(pLoop_ev[DEQUEUE_INDEX]) };

	static AMQP::TcpConnection* pConnection_ev[2] = { nullptr,nullptr };
	static AMQP::TcpConnection* getConnection(int index)
	{
		if (nullptr == pConnection_ev[index]) {
			pConnection_ev[index] = new AMQP::TcpConnection(pHandler_ev[index], ADDRESS);
		}
		return pConnection_ev[index];
	}
	static void closeChannel(int index);
	static void closeConnection(int index)
	{
		if (nullptr != pConnection_ev[index]) {
			// チャンネルにもヌルのフラグを入れる
			closeChannel(index);

			pConnection_ev[index]->close();
			pConnection_ev[index] = nullptr;
		}
	}

	// チャンネルはシングルトンにします。
	static AMQP::TcpChannel* pChannel_ev[2] = { nullptr,nullptr };
	static AMQP::TcpChannel* getChannel(int index)
	{
		if (nullptr == pChannel_ev[index])
		{
			pChannel_ev[index] = new AMQP::TcpChannel(getConnection(index));
		}
		return pChannel_ev[index];
	}
	static void closeChannel(int index)
	{
		if (nullptr != pChannel_ev[index]) {
			pChannel_ev[index]->close();
			pChannel_ev[index] = nullptr;
		}
	}

	/// <summary>
	/// 回転式バッファー。
	/// これはメイン・スレッドに置く。
	/// デキューのスレッドでエンキューすることはできない。
	/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
	/// </summary>
	namespace rotationBuffer
	{
		const int bufferSize = 1024;
		static std::string buffer[bufferSize] = {};
		static int bufferCursors[2] = { 0, 0 };
		static int busy = 0;
		const int PUT_INDEX = 0;
		const int GET_INDEX = 1;

		static std::mutex _mutex;
		// 返り値: 追加しなかったとき 偽、それ以外のとき 真。
		static bool push_inner(std::string message)
		{
			std::lock_guard<std::mutex> lock(_mutex);

			if (busy + 1 == bufferSize || bufferSize * 99 / 100 < busy) {
				// busy
				return false;
			}

			buffer[bufferCursors[PUT_INDEX]] = message;
			bufferCursors[PUT_INDEX]++;
			busy++;

			if (!(bufferCursors[PUT_INDEX] < bufferSize))
			{
				bufferCursors[PUT_INDEX] = 0;
			}

			return true;
		}
		static std::string pop_inner()
		{
			std::lock_guard<std::mutex> lock(_mutex);

			if ("" != buffer[bufferCursors[GET_INDEX]])
			{
				std::string message = buffer[bufferCursors[GET_INDEX]];

				buffer[bufferCursors[GET_INDEX]] = "";
				bufferCursors[GET_INDEX]++;
				busy--;

				if (!(bufferCursors[GET_INDEX] < bufferSize))
				{
					bufferCursors[GET_INDEX] = 0;
				}

				return message;
			}
			// buffer is empty
			return "";
		}

		// 追加に成功するまで、ブロックする
		static void push_block(std::string message)
		{
			while (!rotationBuffer::push_inner(message))
			{
				// busy
				std::this_thread::sleep_for(std::chrono::milliseconds(40));
			}
		}
		// 何か取れるまで、ブロックする
		static std::string pop_block()
		{
			std::string message;
			while ("" == (message = rotationBuffer::pop_inner()))
			{
				// buffer is empty
				std::this_thread::sleep_for(std::chrono::milliseconds(20));
			}
			return message;
		}
	}

	static int cleanMessage() {
		service_io::oss.str(std::string());// 空文字列を入れてクリアーする
		return 0;
	}

	// service_io::workConsume() を別スレッドで走らせておくこと。
	static std::string dequeue() {
		return rotationBuffer::pop_block();
	}

	static bool parseArgs(int argc, char* argv[], std::string& name_enqueue, LifeSpan_t& lifeSpan_enqueue, std::string& name_dequeue, LifeSpan_t& lifeSpan_dequeue)
	{
		name_enqueue = "";
		name_dequeue = "";

		if (0<argc)
		{
			// プログラム名を省き、コマンドライン引数だけをつなげる。
			std::string cmdArg;
			for (int i = 1; i < argc; ++i)
			{
				cmdArg += std::string(argv[i]);
				if (i < argc) {
					cmdArg += " ";
				}
			}
			std::istringstream data(cmdArg);

			// 与件
			// 「--enqueue 1113 durable --dequeue 1112 durable autodelete」
			// 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」

			// 受け皿
			// 参照渡しの引数に返す

			// 記憶
			int m0 = -1; // enqueue or dequeue index.
			int m1 = -1; // lifespans index.

						 // 解析器
			std::string a;
			while (data >> a) {

				if ("--" == a.substr(0, 2)) {//先頭の2文字が「--」の場合
					if ("--enqueue" == a) { m0 = ENQUEUE_INDEX;	m1 = -1; }
					else if ("--dequeue" == a) { m0 = DEQUEUE_INDEX; m1 = -1; }
					else { break; }
				}
				else if (
					">" == a.substr(0, 1)//先頭の1文字が「>」の場合
					|| "<" == a.substr(0, 1)//「<」
					|| ">" == a.substr(1, 1)//先頭から2文字目が「>」の場合
					) {
					// 標準入力、標準出力、標準エラーを無視
					break;
				}
				else if (0 == m0) { // エンキュー先キュー名
					if (-1 == m1) {
						name_enqueue = a;
						lifeSpan_enqueue = 0;
						m1++;
					}
					else {
						lifeSpan_enqueue |= LifeSpanString_To_Int(a);
					}
				}
				else if (1 == m0) { // デキュー元キュー名
					if (-1 == m1) {
						name_dequeue = a;
						lifeSpan_dequeue = 0;
						m1++;
					}
					else {
						lifeSpan_dequeue |= LifeSpanString_To_Int(a);
					}
				}
				else
				{
					// 欲しい内容がくるまでスルー
				}
			}

			if ("" == name_enqueue || "" == name_dequeue) {
				goto gt_CommandlineParameterError;
			}
		}
		else
		{
			goto gt_CommandlineParameterError;
		}
		return true;

	gt_CommandlineParameterError:
		std::cerr << getTimeslipNow() << "コマンドライン引数の「--enqueue 送信先キュー名 寿命」「--dequeue 受信元キュー名 寿命」を漏れなく指定してください。" << std::endl
			<< "例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete" << std::endl
			<< "寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」" << std::endl
			<< "name_enqueue=[" << name_enqueue << "]" << std::endl
			<< "name_dequeue=[" << name_dequeue << "]" << std::endl;
		return false;
	}
}

#else

// バックグラウンド・プロセスに切り替えて使用する可能性も考慮した入出力API
namespace service_io
{
	static int cleanMessage() {
		service_io::oss.str(std::string());// 空文字列を入れてクリアーする
		return 0;
	}

	static std::string dequeue() {
		std::string cmd;
		if (!std::getline(std::cin, cmd))
		{
			cmd = "quit";
		}
		return cmd;
	}
}
#endif

class ServiceIo
{
public:
	ServiceIo();
	~ServiceIo();
};

#endif // #ifndef APERY_SERVICE_IO_HPP

service_io.cpp

//#include "stdafx.h"
#include "service_io.hpp"

ServiceIo::ServiceIo()
{
}

ServiceIo::~ServiceIo()
{
}

C++言語を使い慣れない間に 改造した。

記事が長くなったので次へ

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?