前回の記事 : http://qiita.com/muzudho1/items/4de318c07521718857bb
バックグラウンドで動く浮かむ瀬はできたのだった。
次は PHP と 浮かむ瀬 をプロセス間通信させたい。
PHP でのプロセス間通信のやり方は 前に書いたのだった。
http://qiita.com/muzudho1/items/5a36ecdcd9b0ba444577
送る方
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// GETクエリ文字列を取得
$QMsg = urldecode($_SERVER['QUERY_STRING']);
if( "" === $QMsg )
{
$QMsg = "hello, world";
}
// 別のクエリーも送れることを説明
echo '(^q^)' . $QMsg . '<br />';
echo 'これを1回クリックしろだぜ☆m9(^~^)!<br />';
echo '<a href="http://★Webサイト/tamesi14.php?ClickSitanDaze">http://★Webサイト/tamesi14.php?ClickSitanDaze</a><br />';
// キューを開く
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QKey = '1111';
// queue_declare(
// $queue = '',
// $passive = false,
// $durable = false,
// $exclusive = false,
// $auto_delete = true,
// $nowait = false,
// $arguments = null,
// $ticket = null
$channel->queue_declare($QKey, false, false, false, false);
// エンキューする
$msgObj = new AMQPMessage( $QMsg );
$channel->basic_publish($msgObj, '', $QKey);
echo "対局プログラムの近くに届けておくぜ☆(^▽^)v ['" . $QMsg . "']\n";
// リソースを開放する
$channel->close();
$connection->close();
受け取る方
<?php
// デキュー用
// サーバーの外部からはアクセスできないようにすること。
//
// このプログラムは、コマンドラインで実行する
// php loop6.php
// あるいは
// nohup /usr/bin/php /home/★user/★project/php_service/loop6.php > /home/★user/★project/php_log/loop6.out.log 2> /home/★user/★project/php_log/loop6.err.log < /dev/null &
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// プロセス間通信の前準備をするぜ☆(^~^)
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QKey = '1111';
$channel->queue_declare($QKey, false, false, false, false);
// キューを開いてしまえば、あとは無限ループ!
// デキューしたタイミングですることをここに書く
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume($QKey, '', false, true, false, false, $callback);
// 無限ループ
while(count($channel->callbacks))
{
// ここでブロックしながらデキューする
$channel->wait();
}
// ここに来ない
// $channel->close();
// $connection->close();
無限ループからどうやって抜けんの?
「RabbitMQにPHPクライアント(php-amqplib)からメッセージを送信、受信する」(Symfoware)
http://symfoware.blog68.fc2.com/blog-entry-1490.html
うーむ。
「How on PHP cancel consumer in RabbitMQ?」(stack overflow)
http://stackoverflow.com/questions/24017760/how-on-php-cancel-consumer-in-rabbitmq
「PHP RabbitMQ setTimeout or other option to stop waiting for queue」(stack overflow)
http://stackoverflow.com/questions/33930923/php-rabbitmq-settimeout-or-other-option-to-stop-waiting-for-queue
また usi が効かない
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > usi
04:17:55 594 34_cs (P)___ enqueue begin [usi]
04:17:55 594 34_cs (P)___ channel.open begin
04:17:55 594 34_cs (P)___ connection.open begin
04:17:55 607 34_cs (P)___ connection.open end
04:17:55 609 34_cs (P)___ channel.open end
04:17:55 615 34_cs (P)___ enqueue end
04:17:55 615 34_cs (P)___ connection.close begin
04:17:55 615 34_cs (P)___ channel.close begin
04:17:55 622 34_cs (P)___ channel.close end
04:17:55 627 34_cs (P)___ connection.close end
Enqueue? > usi
04:18:05 465 34_cs (P)___ enqueue begin [usi]
04:18:05 465 34_cs (P)___ channel.open begin
04:18:05 465 34_cs (P)___ connection.open begin
04:18:05 475 34_cs (P)___ connection.open end
04:18:05 478 34_cs (P)___ channel.open end
04:18:05 478 34_cs (P)___ enqueue end
04:18:05 478 34_cs (P)___ connection.close begin
04:18:05 478 34_cs (P)___ channel.close begin
04:18:05 480 34_cs (P)___ channel.close end
04:18:05 480 34_cs (P)___ connection.close end
プロセスを調べるか。
# ps aux | grep php
root 6091 0.0 0.0 311040 324 ? Ss Mar07 4:28 php-fpm: master process (/etc/php/7.0/fpm/php-fpm.conf)
www-data 6094 0.0 0.0 311040 0 ? S Mar07 0:00 php-fpm: pool www
www-data 6095 0.0 0.0 311040 0 ? S Mar07 0:00 php-fpm: pool www
root 19662 0.0 0.0 12936 988 pts/0 S+ 04:20 0:00 grep --color=auto php
これが PHP の受信ループだったりしないんだろうか?
# kill 6095
# kill 6094
終了するときは[Ctrl]+[C]キーを押せだぜ☆(^~^)
エンキューするときはメッセージを打ち込んで[Enter]キーを押せだぜ☆(^◇^)
ex.) hello, world
Enqueue? > usi
04:21:47 062 34_cs (P)___ enqueue begin [usi]
04:21:47 062 34_cs (P)___ channel.open begin
04:21:47 062 34_cs (P)___ connection.open begin
04:21:47 071 34_cs (P)___ connection.open end
04:21:47 072 34_cs (P)___ channel.open end
04:21:47 077 34_cs (P)___ enqueue end
04:21:47 077 34_cs (P)___ connection.close begin
04:21:47 077 34_cs (P)___ channel.close begin
04:21:47 081 34_cs (P)___ channel.close end
04:21:47 087 34_cs (P)___ connection.close end
Enqueue? > 04:21:47 095 34_cs <-------- [interrupt!] Dequeue(^q^) id name ukamuse_SDT4
id author Hiraoka Takuya
option name Best_Book_Move type check default false
option name Book_File type string default book/20150503/book.bin
option name Byoyomi_Margin type spin default 500 min 0 max 2147483647
option name Clear_Hash type button
option name Draw_Ply type spin default 256 min 1 max 2147483647
option name Engine_Name type string default ukamuse_SDT4
option name Max_Book_Ply type spin default 32767 min 0 max 32767
option name Max_Random_Score_Diff type spin default 0 min 0 max 32600
option name Max_Random_Score_Diff_Ply type spin default 32767 min 0 max 32767
option name Min_Book_Ply type spin default 32767 min 0 max 32767
option name Min_Book_Score type spin default -180 min -32601 max 32601
option name Minimum_Thinking_Time type spin default 20 min 0 max 2147483647
option name Move_Overhead type spin default 30 min 0 max 5000
option name MultiPV type spin default 1 min 1 max 594
option name OwnBook type check default true
option name Slow_Mover type spin default 89 min 1 max 1000
option name Slow_Mover_10 type spin default 10 min 1 max 1000
option name Slow_Mover_16 type spin default 20 min 1 max 1000
option name Slow_Mover_20 type spin default 40 min 1 max 1000
option name Threads type spin default 2 min 1 max 256
option name Time_Margin type spin default 4500 min 0 max 2147483647
option name USI_Hash type spin default 256 min 1 max 1048576
option name USI_Ponder type check default true
usiok
あっ、効いた。
isready
position や go を送っても反応がなくなったときは、
isready を送ってみると 反応が返ってくることがある。
初回より後の isready は早い。
position、go 以外に何かステップがあっただろうか?
> isready
< readyok
> usinewgame
> position ~
> go
のサイクルだと回る。なんで isready が要るのかは分からない。
セマフォを作ろう
(セマフォ (semaphore))「「分かりそう」で「分からない」でも「分かった」気になれるIT用語辞典」
http://wa3.i-3-i.info/word13357.html
ブラウザで多人数同時アクセスが起こるわけだが、とりあえず段階的に
- 浮かむ瀬にアクセスできるのは 同時に1人
- それ以外の人は エラーで 追い払われる
という単純な実装を目指そう。
現状では
- 浮かむ瀬にアクセスできる人数に上限は特にない。RabbitMQがくたばるまで
- アクセスしてもブラウザは待機中になる
- 指し手が返ってきても、それが自分の局面の指し手か分からない(混線)
といった感じになっている。
どうやってセマフォを作るのか?
RabbitMQを用いたプロセス間通信が 逆に弱点になるケースとして、ロック/アンロック 形式は向いていない。
なにしろ、キュー なのだ。キューは「しばらくお待ちください」という原理だ。
強いて言えば 3人並んでいるからまだ大丈夫、と100人ぐらいが思っていて同時にアクセスして 103人の行列ができる、というのが 工夫の要らない使い方だ。
「RabbitMQ Tutorials」(RabbitMQ)
http://www.rabbitmq.com/getstarted.html
上のサイトの図を見て分かるように、「P」は1つで「C」は1~複数だ。赤いチューブは 待ち行列で、「P」はいくつでもメッセージを送ってください、という思想だ。「P」を追い返す思想は 向いていない。
だから、ロック/アンロックを用いて 入場数制限をするための セマフォ は別途用意する必要がある。
とりあえず同時アクセスは そんなに起こらないだろう、という想定にする
RabbitMQ で、キューに溜まっているメッセージ数を調べる方法はあるだろうか。
また、キューに溜まっているのはメッセージ数であって、接続者すうではない。1手指すために isready、usinewgame、position、go の4メッセージを蓄えることになる。ホントは4メッセージも要らないんだが動きが怪しかったのでこの4メッセージを使っている。
ところで PHP側のソースコード
貼り付け忘れていたので。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//------------------------------------------------------------------------------
// URLの例
//------------------------------------------------------------------------------
//
// http://★サイト/tamesi26a4.php?sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves
//
// GETクエリ文字列を取得
$QMsg = urldecode($_SERVER['QUERY_STRING']);
if( "" === $QMsg )
{
// 指定が無ければ平手初期局面、先手
$QMsg = "sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves";
}
//------------------------------------------------------------------------------
// > isready
//------------------------------------------------------------------------------
// キューを選ぶ
// $connection 接続
// $channel チャンネル
// $QName キュー名
// queue_declare(
// $queue = '' , キュー名
// $passive = false, キューの存在確認をするだけ
// $durable = false, ブローカーが停止してもキューは残す
// $exclusive = false, この接続の間だけメッセージを残す
// $auto_delete = true , コンシューマがいなくなるとメッセージを消す
// $nowait = false,
// $arguments = null ,
// $ticket = null
// )
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1112';
$channel->queue_declare($QName, false, true, false, true);
// メッセージをエンキューする
$msgObj = new AMQPMessage( 'isready' . "\n" );
$channel->basic_publish($msgObj, '', $QName);
// リソースを開放する
$channel ->close();
$connection->close();
//------------------------------------------------------------------------------
// < readyok
//------------------------------------------------------------------------------
// 接続は使いまわさず、やり直す。
// queue_declare(
// $queue = '' ,
// $passive = false,
// $durable = false,
// $exclusive = false,
// $auto_delete = true ,
// $nowait = false,
// $arguments = null ,
// $ticket = null
// )
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1113';
$channel->queue_declare($QName, false, true, false, false);
$callback = function($msg) {
// メッセージをデキューしたタイミングですることをここに書く
echo $msg->body . "\n";
};
$channel->basic_consume($QName, '', false, true, false, false, $callback);
// ここで、メッセージをデキューするまでブロック
$channel->wait();
$channel ->close();
$connection->close();
//------------------------------------------------------------------------------
// > usinewgame
//------------------------------------------------------------------------------
// あとはメッセージの送受信の繰り返し
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1112';
$channel->queue_declare($QName, false, true, false, true);
$msgObj = new AMQPMessage( 'usinewgame' . "\n" );
$channel->basic_publish($msgObj, '', $QName);
$channel ->close();
$connection->close();
//------------------------------------------------------------------------------
// > position ~
//------------------------------------------------------------------------------
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1112';
$channel->queue_declare($QName, false, true, false, true);
$msgObj = new AMQPMessage( 'position ' . $QMsg . "\n" );
$channel->basic_publish($msgObj, '', $QName);
$channel ->close();
$connection->close();
//------------------------------------------------------------------------------
// > go
//------------------------------------------------------------------------------
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1112';
$channel->queue_declare($QName, false, true, false, true);
$msgObj = new AMQPMessage( 'go' . "\n" );
$channel->basic_publish($msgObj, '', $QName);
$channel ->close();
$connection->close();
//------------------------------------------------------------------------------
// < bestmove ~
//------------------------------------------------------------------------------
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1113';
$channel->queue_declare($QName, false, true, false, false);
$callback = function($msg) {
echo $msg->body . "\n";
};
$channel->basic_consume($QName, '', false, true, false, false, $callback);
$channel->wait();
$channel->close();
$connection->close();
「php-amqplib」(php-amqplib/php-amqplib)
https://github.com/php-amqplib/php-amqplib
ここから 欲しいメソッドを探すのがむずかしい。
クライアント側の関数から 何件のメッセージが溜まっているか 見えないんじゃないか?
仮に PHP のAPIではキューのメッセージ件数が見れないとしたら、何言語、何スクリプトだったら見えるんだ?
「[C#][PHP][C++] AMQP-CPP の使い方(RabbitMQ、プロセス間通信、メッセージキュー)」(Qiita)
http://qiita.com/muzudho1/items/5a36ecdcd9b0ba444577
C++なら channel.declareQueue のonSuccessハンドラの引数でメッセージ数は見れるようだ。
phpからこれを expect で叩く仕組みを作ればいいだろうか?
じゃあひとまず C++ で作ってみよう。
作った
Makefile
.PHONY: all clean
all: tamesi37a1.exe
tamesi37a1_msgq.o: tamesi37a1_msgq.cpp tamesi37a1_msgq.hpp
g++ -std=c++11 -c tamesi37a1_msgq.cpp -o tamesi37a1_msgq.o -lev -lamqpcpp -pthread
tamesi37a1.o: tamesi37a1.cpp tamesi37a1_msgq.hpp
g++ -std=c++11 -c tamesi37a1.cpp -o tamesi37a1.o -lev -lamqpcpp -pthread
tamesi37a1.exe: tamesi37a1.o tamesi37a1_msgq.o
g++ -std=c++11 tamesi37a1.o tamesi37a1_msgq.o -o tamesi37a1.exe -lev -lamqpcpp -pthread
clean:
rm -f tamesi37a1.d tamesi37a1.o tamesi37a1_msgq.o tamesi37a1.exe tamesi37a1.out.log tamesi37a1.err.log
tamesi37a1_msgq.hpp
// This program for Ubuntu 16.04.
#ifndef TAMESI37A1_MSGQ_HPP
#define TAMESI37A1_MSGQ_HPP
// #pragma once
#include <string>
#include <mutex>
// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
namespace msgq
{
static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
static std::string exchange_name = "myexchange";
static std::string routing_key = "";
// AMQP-CPPでの実装 :
// AMQP::durable=[1] RabbitMQが止まってもキューを残す
// AMQP::autodelete=[2] コンシューマーが1人も接続していなかったら消す
// AMQP::passive=[8] キューが存在するかどうかチェックするだけ。中身見ない時これ
// AMQP::exclusive=[512] この接続でだけ使える。この接続が切れたら消す
typedef int LifeSpan_t;
class Settings
{
std::string name_queue_;
std::mutex _mutex_queueName;
msgq::LifeSpan_t lifeSpan_queue_;
std::mutex _mutex_lifeSpan;
public:
Settings();
~Settings();
void setQueueName(std::string value);
std::string getQueueName();
void setQueueLifeSpan(msgq::LifeSpan_t value);
msgq::LifeSpan_t getQueueLifeSpan();
bool parseCommandlineArgs(int argc, char* argv[]);
std::string Dump();
};
// 接続はシングルトンにします
static struct ev_loop* pLoop_ev = ev_loop_new();
static AMQP::LibEvHandler* pHandler_ev = new AMQP::LibEvHandler(pLoop_ev);
static AMQP::TcpConnection* pConnection_ev = nullptr;
static AMQP::TcpConnection* getConnection()
{
if (nullptr == pConnection_ev) {
pConnection_ev = new AMQP::TcpConnection(pHandler_ev, ADDRESS);
}
return pConnection_ev;
}
static void closeChannel();
static void closeConnection()
{
if (nullptr != pConnection_ev) {
// チャンネルにもヌルのフラグを入れる
closeChannel();
pConnection_ev->close();
pConnection_ev = nullptr;
}
}
// チャンネルはシングルトンにします。
static AMQP::TcpChannel* pChannel_ev = nullptr;
static AMQP::TcpChannel* getChannel()
{
if (nullptr == pChannel_ev)
{
pChannel_ev = new AMQP::TcpChannel(getConnection());
}
return pChannel_ev;
}
static void closeChannel()
{
if (nullptr != pChannel_ev) {
pChannel_ev->close();
pChannel_ev = nullptr;
}
}
}
#endif // #ifndef TAMESI37A1_MSGQ_HPP
tamesi37a1_msgq.cpp
// This program for Ubuntu 16.04.
// #include "stdafx.h"
#include "tamesi37a1_msgq.hpp"
#include <iostream> // std::cout
#include <sstream> // std::ostringstream
namespace msgq
{
static LifeSpan_t LifeSpanString_To_Int(std::string lifeSpan)
{
if ("durable" == lifeSpan) {
return AMQP::durable;
}
else if ("autodelete" == lifeSpan) {
return AMQP::autodelete;
}
else if ("passive" == lifeSpan) {
return AMQP::passive;
}
else if ("exclusive" == lifeSpan) {
return AMQP::exclusive;
}
else {
std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
exit(11);
}
}
Settings::Settings()
{
name_queue_ = "";
lifeSpan_queue_ = (LifeSpan_t)0;
}
Settings::~Settings()
{
}
void Settings::setQueueName(std::string value) {
std::lock_guard<std::mutex> lock(_mutex_queueName);
name_queue_ = value;
}
std::string Settings::getQueueName() {
std::lock_guard<std::mutex> lock(_mutex_queueName);
return name_queue_;
}
void Settings::setQueueLifeSpan(LifeSpan_t value) {
std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
lifeSpan_queue_ = value;
}
LifeSpan_t Settings::getQueueLifeSpan() {
std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
return lifeSpan_queue_;
}
bool Settings::parseCommandlineArgs(int argc, char* argv[])
{
std::string name_queue = "";
LifeSpan_t lifeSpan_queue = 0;
if (0<argc)
{
// プログラム名を省き、コマンドライン引数だけをつなげる。
std::string cmdArg;
for (int i = 1; i < argc; ++i)
{
cmdArg += std::string(argv[i]);
if (i < argc) {
cmdArg += " ";
}
}
std::istringstream data(cmdArg);
// 与件
// 「--queue 1112 durable autodelete」
// 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」
// 受け皿
// std::string name_queue = "";
// LifeSpan_t lifeSpan_queue = 0;
// 記憶
int m0 = -1; // queue index.
int m1 = -1; // lifespans index.
// 解析器
std::string a;
while (data >> a) {
if ("--" == a.substr(0, 2)) {//先頭の2文字が「--」の場合
if ("--queue" == a) { m0 = 0; m1 = -1; }
else { break; }
}
else if (
">" == a.substr(0, 1)//先頭の1文字が「>」の場合
|| "<" == a.substr(0, 1)//「<」
|| ">" == a.substr(1, 1)//先頭から2文字目が「>」の場合
) {
// 標準入力、標準出力、標準エラーを無視
break;
}
else if (0 == m0) { // キュー名
if (-1 == m1) {
name_queue = a;
lifeSpan_queue = 0;
m1++;
}
else {
lifeSpan_queue |= LifeSpanString_To_Int(a);
}
}
else
{
// 欲しい内容がくるまでスルー
}
}
}
if ("" != name_queue) {
setQueueName(name_queue);
setQueueLifeSpan(lifeSpan_queue);
return true;
}
std::cerr << "コマンドライン引数の「--queue 送信先キュー名 寿命」を漏れなく指定してください。" << std::endl
<< "例: --queue 1113 durable autodelete" << std::endl
<< "寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」" << std::endl
<< "name_queue=[" << name_queue << "]" << std::endl;
return false;
}
std::string Settings::Dump()
{
static std::ostringstream sb;
sb << "Dump" << std::endl
<< " name_queue =[" << getQueueName() << "]" << std::endl
<< " lifeSpan_queue =[" << getQueueLifeSpan() << "]" << std::endl
<< " ----" << std::endl;
return sb.str();
}
}
tamesi37a1.cpp
//--------------------------------------------------------------------------------
// OS : Windows10 : It not work. This program for Ubuntu 16.04.
//
// OS : Ubuntu 16.04
// Library : libev
// : Install : Command : sudo apt-get update
// : sudo apt-get install libev-dev
// Service : RabbitMQ
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
// : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
// : Manual : Command : man rabbitmqctl
// : Start : Command : (1) rabbitmq-server
// : (2) [Ctrl] + [Z]
// : (3) bg (Job Number)
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : : Command : rabbitmqctl list_queues
// : // Count messages in queues.
// Library : AMQP-CPP
//
// Program : this
// : Author : Satoshi TAKAHASHI (Handle. Muzudho)
// : License : MIT License https://opensource.org/licenses/MIT
// : Explain : This program count messages in queue.
// : Please settings queue to command line argument.
// : Compile : Command : make
// : Explain : Please, read Makefile.
// : Execute : Command : ./tamesi37a1.exe --queue 1112 durable autodelete
// : Run on the foreground.
// : Explain : ./tamesi37a1.exe Executable file
// : --queue queue settings section
// : 1112 queue name (string ok)
// : durable queue life span (multiple)
// : autodelete queue life span
// : Explain : command line argument of life span of queue (Compositable)
// : durable The queue exists, Even if the RabbitMQ service closed.
// : autodelete The queue delete when consumers to be nothing.
// : passive Just check what queue exists. Don't use read/write to queue.
// : exclusive The queue delete when connection closed. The queue exists is this connection only.
// : FAQ : Failures case: Message conflict when starting multiple processes.
// : Example : Windows 10 (This program not work) default queue life span. durable
// : Ubuntu 16.04 default queue life span. durable autodelete
// : Stop : Typing : [Ctrl]+[C]
//
// Referrences :
// : AMQP-CPP : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
// : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
// : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
// : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
// : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
// : AMQP-CPP (docsforge.com) http://docsforge.com/11/AMQP-CPP/docs/latest/namespace-AMQP/class-TcpConnection/
// : This : Web site : ラムダ式(C++11) (cpprefjp - C++日本語リファレンス) https ://cpprefjp.github.io/lang/cpp11/lambda_expressions.html
//
//--------------------------------------------------------------------------------
#include <string> // std::string
#include <iostream> // std::cout
// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include "tamesi37a1_msgq.hpp"
int main(int argc, char* argv[])
{
msgq::Settings settings;
// 引数の解析
if (!settings.parseCommandlineArgs(argc, argv))
{
exit(12);
}
std::string qn = settings.getQueueName();
msgq::LifeSpan_t life = settings.getQueueLifeSpan();
AMQP::TcpChannel* pChannel = msgq::getChannel();
//----------------------------------------
// 指定したキューの残りメッセージ数を返す
//----------------------------------------
auto callback = [](const std::string &name, int msgcount, int consumercount) {
std::cout << "queueName = \"" << name << "\"" << std::endl
<< "messages = " << msgcount << std::endl
<< "nconsumers = " << consumercount << std::endl;
// 接続を切って、このイベントハンドラから抜けることで、ev_runループから抜ける
msgq::closeConnection();
};
pChannel->declareQueue(qn,life)
.onSuccess(callback);
//----------------------------------------
// ループ
//----------------------------------------
ev_run(msgq::pLoop_ev, 0);
return 0;
}
tamesi37a1.expect
#!/usr/bin/expect --
# コマンドライン引数の例
# --queue 1112 durable autodelete
# カレント・ディレクトリを設定
cd /home/★user/★project/cpp_service/t37a1.d
# Expectをタイムアウトさせない
set timeout -1
spawn -noecho ./tamesi37a1.exe ${argv}
expect -re "queueName = "
expect -re "messages = "
expect -re "consumers = "
tamesi26a5.php
<?php
// 処理時間計測
$time=microtime(true);
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//------------------------------------------------------------------------------
// URLの例
//------------------------------------------------------------------------------
//
// http://★サイト/tamesi26a5.php?sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves
//
// GETクエリ文字列を取得
$QMsg = urldecode($_SERVER['QUERY_STRING']);
if( '' === $QMsg )
{
$QMsg = 'sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves';
}
//------------------------------------------------------------------------------
// busy確認
//------------------------------------------------------------------------------
exec('/usr/bin/expect /home/★user/★project/expect_service/tamesi37a1.expect --queue 1112 durable autodelete',$o);
foreach($o as $v)
{
if(0===strpos($v,'messages = '))
{
$messages = intval( substr($v,11) );
if( 0 < $messages ){
die( 'error = "busy"' . "\n" . 'busy = ' . $messages . "\n" );
}
}
//else
//{
// echo 'v = ' . $v . "\n";
//}
}
//------------------------------------------------------------------------------
// > isready
//------------------------------------------------------------------------------
// キューを開く
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1112';
// queue_declare(
// $queue = '',
// $passive = false,
// $durable = false,
// $exclusive = false,
// $auto_delete = true,
// $nowait = false,
// $arguments = null,
// $ticket = null
$channel->queue_declare($QName, false, true, false, true);
// エンキューする
$msgObj = new AMQPMessage( 'isready' . "\n" );
$channel->basic_publish($msgObj, '', $QName);
// リソースを開放する
$channel->close();
$connection->close();
//------------------------------------------------------------------------------
// < readyok
//------------------------------------------------------------------------------
// 接続は使いまわさず、やり直す。
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1113';
// queue_declare(
// $queue = '',
// $passive = false,
// $durable = false,
// $exclusive = false,
// $auto_delete = true,
// $nowait = false,
// $arguments = null,
// $ticket = null
$channel->queue_declare($QName, false, true, false, false);
// キューを開いてしまえば、あとは無限ループ!
// デキューしたタイミングですることをここに書く
$callback = function($msg) {
if('readyok' !== rtrim($msg->body))
{
die( "error = \"no readyok.\"\n" . $msg->body . "\n" );
}
};
$channel->basic_consume($QName, '', false, true, false, false, $callback);
// ここでブロックしながらデキューする。強制終了するまで続く
// wait($allowed_methods=null, $non_blocking = false, $timeout = 0)
//$timeoutSeconds = 2;
//$channel->wait(null, false, timeoutSeconds);
$channel->wait();
$channel->close();
$connection->close();
//------------------------------------------------------------------------------
// > usinewgame
//------------------------------------------------------------------------------
// キューを開く
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1112';
$channel->queue_declare($QName, false, true, false, true);
// エンキューする
$msgObj = new AMQPMessage( 'usinewgame' . "\n" );
$channel->basic_publish($msgObj, '', $QName);
// リソースを開放する
$channel->close();
$connection->close();
//------------------------------------------------------------------------------
// > position ~
//------------------------------------------------------------------------------
// キューを開く
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1112';
$channel->queue_declare($QName, false, true, false, true);
// エンキューする
$msgObj = new AMQPMessage( 'position ' . $QMsg . "\n" );
$channel->basic_publish($msgObj, '', $QName);
// リソースを開放する
$channel->close();
$connection->close();
//------------------------------------------------------------------------------
// > go
//------------------------------------------------------------------------------
// キューを開く
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1112';
$channel->queue_declare($QName, false, true, false, true);
// エンキューする
$msgObj = new AMQPMessage( 'go' . "\n" );
$channel->basic_publish($msgObj, '', $QName);
// リソースを開放する
$channel->close();
$connection->close();
//------------------------------------------------------------------------------
// < bestmove ~
//------------------------------------------------------------------------------
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QName = '1113';
$channel->queue_declare($QName, false, true, false, false);
$callback = function($msg) {
echo $msg->body . "\n";
};
$channel->basic_consume($QName, '', false, true, false, false, $callback);
// ここでブロックしながらデキューする。強制終了するまで続く
$channel->wait();
$channel->close();
$connection->close();
//------------------------------------------------------------------------------
// 処理時間表示
//------------------------------------------------------------------------------
echo '#<br /><br />' . "\ntime = " . (microtime(true) - $time);
浮かむ瀬をどのように改造したのか
common.hpp
// (2017-03-22 Modify begin) 標準入出力は全て変更
#define SYNCCOUT SERVIO_BEGIN
#define SYNCENDL SERVIO_END_CLEAN
// (2017-03-22 Modify end)
標準入出力は全て変更した。量が多いので1つ1つは見ていけないが……。
search.cpp
// (2017-03-22 Modify begin) 標準入出力は全て変更
if (nyugyokuWin)
{
SERVIO_BEGIN << "bestmove win" << SERVIO_END_FLUSH(searcher, "NyugyokuWin");
}
else if (!bestThread->rootMoves[0].pv[0])
{
SERVIO_BEGIN << "bestmove resign" << SERVIO_END_FLUSH(searcher, "Resign");
}
else {
if (bestThread->rootMoves[0].pv.size() > 1 || bestThread->rootMoves[0].extractPonderFromTT(pos)) {
// ポンダー付きの場合
SERVIO_BEGIN << "bestmove " << bestThread->rootMoves[0].pv[0].toUSI() << " ponder " << bestThread->rootMoves[0].pv[1].toUSI() << SERVIO_END_FLUSH(searcher, "BestmoveAndPonder");
}
else {
SERVIO_BEGIN << "bestmove " << bestThread->rootMoves[0].pv[0].toUSI() << SERVIO_END_FLUSH(searcher, "Bestmove");
}
}
// (2017-03-22 Modify end)
こんな風に、標準出力ストリームがあるところは 全て置き換えた、あるいは見た目上変わりはないが、マクロの中身が変わったと考えていい。
USIループも変えた
標準出力の改造より、標準入力の改造の方が 大がかりだ。
usi.cpp
void Searcher::doUSICommandLoop(int argc, char* argv[]) {
#if UBUNTU
// 引数の解析
std::string name_enqueue = "";
service_io::LifeSpan_t lifeSpan_enqueue = 0;
std::string name_dequeue = "";
service_io::LifeSpan_t lifeSpan_dequeue = 0;
{
if (!service_io::parseArgs(argc, argv, name_enqueue, lifeSpan_enqueue, name_dequeue, lifeSpan_dequeue))
{
exit(12);
}
thisptr->serviceIo2.SetQueueName (service_io::ENQUEUE_INDEX, name_enqueue);
thisptr->serviceIo2.SetQueueLifeSpan(service_io::ENQUEUE_INDEX, lifeSpan_enqueue);
thisptr->serviceIo2.SetQueueName (service_io::DEQUEUE_INDEX, name_dequeue);
thisptr->serviceIo2.SetQueueLifeSpan(service_io::DEQUEUE_INDEX, lifeSpan_dequeue);
}
// デキューの常時監視をスタート
auto th1 = std::thread([] { service_io::workConsume(*thisptr); });
#endif
bool dialogue = false;
// 「./apery」のように引数無しで打鍵して実行された場合に限り、なんども標準入力を待つ。
if (argc == 1)
{
dialogue = true;
}
bool evalTableIsRead = false;
Position pos(DefaultStartPositionSFEN, threads.main(), thisptr);
std::string cmd;
std::string token;
// プログラム名を省き、コマンドライン引数だけをつなげる。
for (int i = 1; i < argc; ++i)
{
cmd += std::string(argv[i]) + " ";
}
do {
// 文字が打たれたらその文字を、空文字を打たれたら "quit" 扱いとする。
if (dialogue)
{
cmd = service_io::dequeue();
}
// 半角スペース区切りの最初の1トークンを拾う
std::istringstream ssCmd(cmd);
ssCmd >> std::skipws >> token;
if (token == "--enqueue" || token == "--dequeue") {
#if UBUNTU
dialogue = true;
#else
std::cerr << "Command line parameter error : \"--msgqueue\" option for UBUNTU.";
#endif
}
// else if (token == "tu") {
//#if UBUNTU
// // tu と飛んできたら ka と返す
// service_io::oss << "ka";
// service_io::flushMessage();
//#else
// std::cerr << "Command error : \"tu\" command for UBUNTU.";
//#endif
// }
else if (token == "quit" || token == "stop" || token == "ponderhit" || token == "gameover") {
if (token != "ponderhit" || signals.stopOnPonderHit) {
signals.stop = true;
threads.main()->startSearching(true);
}
else
limits.ponder = false;
if (token == "ponderhit" && limits.moveTime != 0)
limits.moveTime += timeManager.elapsed();
}
else if (token == "go") go(pos, ssCmd);
else if (token == "position") setPosition(pos, ssCmd);
else if (token == "usinewgame"); // isready で準備は出来たので、対局開始時に特にする事はない。
else if (token == "usi")
{
SERVIO_BEGIN << "id name " << std::string(options["Engine_Name"])
<< "\nid author Hiraoka Takuya"
<< "\n" << options
<< "\nusiok" << SERVIO_END_FLUSH(thisptr, "Usi");
}
else if (token == "isready") { // 対局開始前の準備。
tt.clear();
threads.main()->previousScore = ScoreInfinite;
if (!evalTableIsRead) {
// 一時オブジェクトを生成して Evaluator::init() を呼んだ直後にオブジェクトを破棄する。
// 評価関数の次元下げをしたデータを格納する分のメモリが無駄な為、
std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
evalTableIsRead = true;
}
SERVIO_BEGIN << "readyok" << SERVIO_END_FLUSH(thisptr, "Isready");
}
else if (token == "setoption") setOption(ssCmd);
else if (token == "write_eval") { // 対局で使う為の評価関数バイナリをファイルに書き出す。
if (!evalTableIsRead)
std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
Evaluator::writeSynthesized(Evaluator::evalDir);
}
#if defined LEARN
else if (token == "l") {
auto learner = std::unique_ptr<Learner>(new Learner);
learner->learn(pos, ssCmd);
}
else if (token == "make_teacher") {
if (!evalTableIsRead) {
std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
evalTableIsRead = true;
}
make_teacher(ssCmd);
}
else if (token == "use_teacher") {
if (!evalTableIsRead) {
std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
evalTableIsRead = true;
}
use_teacher(pos, ssCmd);
}
else if (token == "check_teacher") {
check_teacher(ssCmd);
}
else if (token == "print") printEvalTable(SQ88, f_gold + SQ78, f_gold, false);
#endif
#if !defined MINIMUL
// 以下、デバッグ用
else if (token == "bench") {
if (!evalTableIsRead) {
std::unique_ptr<Evaluator>(new Evaluator)->init(Evaluator::evalDir, true);
evalTableIsRead = true;
}
benchmark(pos);
}
else if (token == "key") { SYNCCOUT << pos.getKey() << SYNCENDL; }
else if (token == "tosfen") { SYNCCOUT << pos.toSFEN() << SYNCENDL; }
else if (token == "eval") { SYNCCOUT << evaluateUnUseDiff(pos) / FVScale << SYNCENDL; }
else if (token == "d") { pos.print(); }
else if (token == "s") { measureGenerateMoves(pos); }
else if (token == "t") { SYNCCOUT << pos.mateMoveIn1Ply().toCSA() << SYNCENDL; }
else if (token == "b") { makeBook(pos, ssCmd); }
#endif
else { SYNCCOUT << "unknown command: [" << cmd << "]" << SYNCENDL; }
} while (token != "quit" && dialogue);
#if UBUNTU
// th1 スレッドの終了を待つ
th1.join();
#endif
threads.main()->waitForSearchFinished();
}
バックグラウンドに常駐させる前提なので、標準入出力は塞ぎ、通信はメッセージ・キューを用いたプロセス間通信に置き換えた。
メッセージの送信先をエンキュー、
メッセージの受信元をデキュー と呼ぶことにした。
service_io::dequeue();
という部分で ブラウザから送られてくるメッセージを取得している。
また apery はコマンドライン引数を 単にコマンドと解釈してループの中で解析するので、
コマンドライン引数「--enqueue ~略~ --dequeue ~略~」を使いたいときに困った。
そこで 改造を加えたが、これが悪影響を与えていないものかどうか。
グローバル変数 searcher
コンピュータ将棋はマルチスレッドで動いているので、グローバル変数を用意しても、別スレッドからは見えないので グローバルには利用できない。
そこで、どのスレッドからも利用できるグローバル変数のように利用されている Searcher 構造体に、メッセージ・キュー名等の、グローバルに利用したいデータを追加した。
search.hpp
// (2017-03-20 Add begin)
#ifdef UBUNTU
class ServiceIo2
{
public:
std::string name_queues_[2];// = { "", "" };
std::mutex _mutex_queueName;
service_io::LifeSpan_t lifeSpan_queues_[2];// = { (service_io::LifeSpan_t)0, (service_io::LifeSpan_t)0 };
std::mutex _mutex_lifeSpan;
void SetQueueName(int index, std::string value);
std::string GetQueueName(int index);
void SetQueueLifeSpan(int index, service_io::LifeSpan_t value);
service_io::LifeSpan_t GetQueueLifeSpan(int index);
};
#endif
// (2017-03-20 Add end)
構造体の中
// (2017-03-20 Add begin)
#ifdef UBUNTU
public:
STATIC ServiceIo2 serviceIo2;
#endif
// (2017-03-20 Add end)
ファイルの末尾
// (2017-03-20 Add begin)
namespace service_io
{
#ifdef UBUNTU
// メッセージキューにエンキュー
// 出力ストリームのカンマ区切り「,,,;」で使われ、何か数字を返す必要があって必然性はない数字 0 を返している
static int flushMessage(Searcher* pSearcher, std::string hint)
{
std::string message = service_io::oss.str();
oss.str(std::string());// 空文字列を入れてクリアーする
std::string name_enqueue = pSearcher->serviceIo2.GetQueueName(ENQUEUE_INDEX);
LifeSpan_t lifeSpan_enqueue = pSearcher->serviceIo2.GetQueueLifeSpan(ENQUEUE_INDEX);
if ("" == name_enqueue)
{
std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.SM_ queue name is empty=[" << name_enqueue << "] ENQUEUE_INDEX=[" << ENQUEUE_INDEX << "] lifeSpan=[" << lifeSpan_enqueue << "] hint=[" << hint << "]" << "\n";
exit(1);
}
// チャンネルを宣言
getChannel(ENQUEUE_INDEX)->declareQueue(name_enqueue, lifeSpan_enqueue)
.onError([=](const char* errMsg) {
std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.SM_.declareQueue.onError [" << errMsg << "] queue name=[" << name_enqueue << "] lifeSpan=[" << lifeSpan_enqueue << "]" << "\n";
});
// チャンネルを束縛
getChannel(ENQUEUE_INDEX)->bindQueue(exchange_name, name_enqueue, routing_key)
// ev_run から1回だけ呼び出される。コミットを開始してこのハンドラから抜けること。コミットの成功時に接続を切ることでev_runから抜ける。
.onSuccess([=]() {//&message
// トランザクション開始
getChannel(ENQUEUE_INDEX)->startTransaction();
// パブリッシュ開始
if (getChannel(ENQUEUE_INDEX)->publish(exchange_name, routing_key, message.c_str(), message.size())) {
// コミットを開始
getChannel(ENQUEUE_INDEX)->commitTransaction()
.onSuccess([=]() {
// コミット成功
// 接続を切ったあと、トランザクションのコミットが実行されるようだ。
closeConnection(ENQUEUE_INDEX);
})
.onError([=](const char *errMsg) {//&message
std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.BQ_.OS_ CT.onError failuerCommit errMsg=[" << errMsg << "] message=[" << message << "]" << std::endl;
});
}
else {
std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.BQ_.OS_ else failuer message=[" << message << "]" << std::endl;
getChannel(ENQUEUE_INDEX)->rollbackTransaction();
}
}).onError([](const char* errMsg) {
std::cerr << getTimeslipNow() << "(>_<)ukamuse_sdt4_child4 (P)___ WKP.bindQueue.onError [" << errMsg << "]" << std::endl;
});
// ここで、バインドキューのオンサクセスの処理へ。
ev_run(pLoop_ev[ENQUEUE_INDEX], 0);
return 0;
}
// メッセージ・キューの監視を開始
static void workConsume(Searcher& searcher)
{
std::string name_dequeue = searcher.serviceIo2.GetQueueName(DEQUEUE_INDEX);
LifeSpan_t lifeSpan_dequeue = searcher.serviceIo2.GetQueueLifeSpan(DEQUEUE_INDEX);
// I will go to the front of the box named "1111".
getChannel(DEQUEUE_INDEX)->declareQueue(name_dequeue, lifeSpan_dequeue);
// I look inside the box.
auto errorCb = [&lifeSpan_dequeue](const char *errMsg) {
std::cerr << getTimeslipNow() << "(>_<)cpp ___(C) CNS.onError My ID watching failed [" << errMsg << "] lifeSpan_dequeue=[" << lifeSpan_dequeue << "]" << std::endl;
};
auto messageCb = [](const AMQP::Message &amqpMessage, uint64_t deliveryTag, bool redelivered) {
std::string message(amqpMessage.body(), amqpMessage.bodySize());
rotationBuffer::push_block(message);
getChannel(DEQUEUE_INDEX)->ack(deliveryTag);
};
getChannel(DEQUEUE_INDEX)->consume(name_dequeue)
.onReceived(messageCb)
.onError(errorCb);
// I will keep on forever.
ev_run(pLoop_ev[DEQUEUE_INDEX]);
// I will not come here.
return;
}
#else
static int flushMessage(Searcher* pSearcher, std::string hint) {
std::cout << IOLock << service_io::oss.str() << std::endl << IOUnlock;
service_io::oss.str(std::string());// 空文字列を入れてクリアーする
return 0;
}
#endif
}
// (2017-03-20 Add end)
search.cpp
// (2017-03-20 Add begin)
#ifdef UBUNTU
ServiceIo2 Searcher::serviceIo2;
#endif
// (2017-03-20 Add end)
init( )
// (2017-03-20 Add begin)
#ifdef UBUNTU
serviceIo2.name_queues_[service_io::ENQUEUE_INDEX] = "";
serviceIo2.name_queues_[service_io::DEQUEUE_INDEX] = "";
serviceIo2.lifeSpan_queues_[service_io::ENQUEUE_INDEX] = (service_io::LifeSpan_t)0;
serviceIo2.lifeSpan_queues_[service_io::DEQUEUE_INDEX] = (service_io::LifeSpan_t)0;
#endif
// (2017-03-20 Add end)
ファイル末尾
// (2017-03-20 Add begin)
#ifdef UBUNTU
void ServiceIo2::SetQueueName(int index, std::string value) {
std::lock_guard<std::mutex> lock(_mutex_queueName);
name_queues_[index] = value;
}
std::string ServiceIo2::GetQueueName(int index) {
std::lock_guard<std::mutex> lock(_mutex_queueName);
return name_queues_[index];
}
void ServiceIo2::SetQueueLifeSpan(int index, service_io::LifeSpan_t value) {
std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
lifeSpan_queues_[index] = value;
}
service_io::LifeSpan_t ServiceIo2::GetQueueLifeSpan(int index) {
std::lock_guard<std::mutex> lock(_mutex_lifeSpan);
return lifeSpan_queues_[index];
}
#endif
// (2017-03-20 Add end)
また、ファイルも2つ追加した。
service_io.hpp
// OS : Windows 10
// IDE : Visual Studio 2015
// Install : NuGet : Install-Package RabbitMQ.Client -Version 4.1.1
//
// OS : Ubuntu 16.04
// Service : RabbitMQ
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Web site : RabbitMQ管理コマンド(rabbitmqctl)使い方 (Qiita) http://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
// : Web site : amqpを使ってRabbitMQのキューを操作する (Qiita) http://qiita.com/tamikura@github/items/a268afa51c5537ca4fe6
// : Manual : Command : man rabbitmqctl
// : Start : Command : (1) rabbitmq-server
// : (2) [Ctrl] + [Z]
// : (3) bg (Job Number)
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : Command : rabbitmqctl list_queues
// : // Count messages in queues.
//
// Program : this
// : Compile : Command : make profgen_sse
// : : mv apery ../bin/apery
// : Execute : Command : ./apery --enqueue 1113 durable --dequeue 1112 durable autodelete > ./apery.out.log 2> ./apery.err.log < /dev/null &
// 対局
// ====
// usi
// usiok
// isready
// readyok
// usinewgame
// position sfen lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1 moves
// go
// 例) bestmove 2g2f ponder 1c1d
// quit
#ifndef APERY_SERVICE_IO_HPP
#define APERY_SERVICE_IO_HPP
#include <string> // std::string
#include <iostream> // std::cout
#include <sstream> // std::ostringstream
#include "common.hpp"
// #define UBUNTU
// 入出力を、サービスIOでラッピングします
#define SERVIO_BEGIN service_io::oss
#define SERVIO_END_CLEAN std::endl, service_io::cleanMessage()
#define SERVIO_END_FLUSH(pSearcher,hint) std::endl, service_io::flushMessage(pSearcher,hint)
namespace service_io
{
// 文字列ストリーム出力を、文字列に置換するもの
static std::ostringstream oss;
}
#ifdef UBUNTU
// OS : Ubuntu 16.04
// Library : libev
// : Install : Command : sudo apt-get update
// : sudo apt-get install libev-dev
// Service : RabbitMQ
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Manual : Command : man rabbitmqctl
// : Start : Command : rabbitmq-server
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : : Command : rabbitmqctl list_queues
// Library : AMQP-CPP
// : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
//
// service_io.hpp
// プロセス間通信用
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
// バックグラウンド・プロセスに切り替えて使用する可能性も考慮した入出力API
namespace service_io
{
// 時計はずれているが、現在時刻が返ってくる
static std::string getTimeslipNow()
{
// どこか外国の 1969年12月31日19時ぐらいから数えたミリ秒だろうか?
std::chrono::milliseconds ms = std::chrono::duration_cast< std::chrono::milliseconds >(
std::chrono::system_clock::now().time_since_epoch()
);
long long hours = ms.count() / 1000 / 60 / 60 % 24;
long long minutes = ms.count() / 1000 / 60 % 60;
long long seconds = ms.count() / 1000 % 60;
long long milliseconds = ms.count() % 1000;
std::ostringstream oss;
oss << std::setfill('0') << std::setw(2) << hours << ":" << std::setfill('0') << std::setw(2) << minutes << ":" << std::setfill('0') << std::setw(2) << seconds << "'" << std::setfill('0') << std::setw(3) << milliseconds << " ";
return oss.str();
}
// AMQP-CPPでの実装 :
// AMQP::durable=[1] RabbitMQが止まってもキューを残す
// AMQP::autodelete=[2] コンシューマーが1人も接続していなかったら消す
// AMQP::passive=[8] キューが存在するかどうかチェックするだけ。中身見ない時これ
// AMQP::exclusive=[512] この接続でだけ使える。この接続が切れたら消す
typedef int LifeSpan_t;
// 0 : enqueue用設定
// 1 : dequeue用設定
static const int ENQUEUE_INDEX = 0;
static const int DEQUEUE_INDEX = 1;
static const int NUM_INDEX = 2;
static LifeSpan_t LifeSpanString_To_Int(std::string lifeSpan)
{
if ("durable" == lifeSpan) {
return AMQP::durable;
}
else if ("autodelete" == lifeSpan) {
return AMQP::autodelete;
}
else if ("passive" == lifeSpan) {
return AMQP::passive;
}
else if ("exclusive" == lifeSpan) {
return AMQP::exclusive;
}
else {
std::cerr << "未対応のlifeSpan [" << lifeSpan << "]";
exit(11);
}
}
static AMQP::Address ADDRESS{ "amqp://localhost:5672" };
static std::string exchange_name = "myexchange";
static std::string routing_key = "";
// 接続はシングルトンにします
static struct ev_loop* pLoop_ev[2] = { ev_loop_new(), ev_loop_new() };
static AMQP::LibEvHandler* pHandler_ev[2] = { new AMQP::LibEvHandler(pLoop_ev[ENQUEUE_INDEX]), new AMQP::LibEvHandler(pLoop_ev[DEQUEUE_INDEX]) };
static AMQP::TcpConnection* pConnection_ev[2] = { nullptr,nullptr };
static AMQP::TcpConnection* getConnection(int index)
{
if (nullptr == pConnection_ev[index]) {
pConnection_ev[index] = new AMQP::TcpConnection(pHandler_ev[index], ADDRESS);
}
return pConnection_ev[index];
}
static void closeChannel(int index);
static void closeConnection(int index)
{
if (nullptr != pConnection_ev[index]) {
// チャンネルにもヌルのフラグを入れる
closeChannel(index);
pConnection_ev[index]->close();
pConnection_ev[index] = nullptr;
}
}
// チャンネルはシングルトンにします。
static AMQP::TcpChannel* pChannel_ev[2] = { nullptr,nullptr };
static AMQP::TcpChannel* getChannel(int index)
{
if (nullptr == pChannel_ev[index])
{
pChannel_ev[index] = new AMQP::TcpChannel(getConnection(index));
}
return pChannel_ev[index];
}
static void closeChannel(int index)
{
if (nullptr != pChannel_ev[index]) {
pChannel_ev[index]->close();
pChannel_ev[index] = nullptr;
}
}
/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
namespace rotationBuffer
{
const int bufferSize = 1024;
static std::string buffer[bufferSize] = {};
static int bufferCursors[2] = { 0, 0 };
static int busy = 0;
const int PUT_INDEX = 0;
const int GET_INDEX = 1;
static std::mutex _mutex;
// 返り値: 追加しなかったとき 偽、それ以外のとき 真。
static bool push_inner(std::string message)
{
std::lock_guard<std::mutex> lock(_mutex);
if (busy + 1 == bufferSize || bufferSize * 99 / 100 < busy) {
// busy
return false;
}
buffer[bufferCursors[PUT_INDEX]] = message;
bufferCursors[PUT_INDEX]++;
busy++;
if (!(bufferCursors[PUT_INDEX] < bufferSize))
{
bufferCursors[PUT_INDEX] = 0;
}
return true;
}
static std::string pop_inner()
{
std::lock_guard<std::mutex> lock(_mutex);
if ("" != buffer[bufferCursors[GET_INDEX]])
{
std::string message = buffer[bufferCursors[GET_INDEX]];
buffer[bufferCursors[GET_INDEX]] = "";
bufferCursors[GET_INDEX]++;
busy--;
if (!(bufferCursors[GET_INDEX] < bufferSize))
{
bufferCursors[GET_INDEX] = 0;
}
return message;
}
// buffer is empty
return "";
}
// 追加に成功するまで、ブロックする
static void push_block(std::string message)
{
while (!rotationBuffer::push_inner(message))
{
// busy
std::this_thread::sleep_for(std::chrono::milliseconds(40));
}
}
// 何か取れるまで、ブロックする
static std::string pop_block()
{
std::string message;
while ("" == (message = rotationBuffer::pop_inner()))
{
// buffer is empty
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
return message;
}
}
static int cleanMessage() {
service_io::oss.str(std::string());// 空文字列を入れてクリアーする
return 0;
}
// service_io::workConsume() を別スレッドで走らせておくこと。
static std::string dequeue() {
return rotationBuffer::pop_block();
}
static bool parseArgs(int argc, char* argv[], std::string& name_enqueue, LifeSpan_t& lifeSpan_enqueue, std::string& name_dequeue, LifeSpan_t& lifeSpan_dequeue)
{
name_enqueue = "";
name_dequeue = "";
if (0<argc)
{
// プログラム名を省き、コマンドライン引数だけをつなげる。
std::string cmdArg;
for (int i = 1; i < argc; ++i)
{
cmdArg += std::string(argv[i]);
if (i < argc) {
cmdArg += " ";
}
}
std::istringstream data(cmdArg);
// 与件
// 「--enqueue 1113 durable --dequeue 1112 durable autodelete」
// 寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」
// 受け皿
// 参照渡しの引数に返す
// 記憶
int m0 = -1; // enqueue or dequeue index.
int m1 = -1; // lifespans index.
// 解析器
std::string a;
while (data >> a) {
if ("--" == a.substr(0, 2)) {//先頭の2文字が「--」の場合
if ("--enqueue" == a) { m0 = ENQUEUE_INDEX; m1 = -1; }
else if ("--dequeue" == a) { m0 = DEQUEUE_INDEX; m1 = -1; }
else { break; }
}
else if (
">" == a.substr(0, 1)//先頭の1文字が「>」の場合
|| "<" == a.substr(0, 1)//「<」
|| ">" == a.substr(1, 1)//先頭から2文字目が「>」の場合
) {
// 標準入力、標準出力、標準エラーを無視
break;
}
else if (0 == m0) { // エンキュー先キュー名
if (-1 == m1) {
name_enqueue = a;
lifeSpan_enqueue = 0;
m1++;
}
else {
lifeSpan_enqueue |= LifeSpanString_To_Int(a);
}
}
else if (1 == m0) { // デキュー元キュー名
if (-1 == m1) {
name_dequeue = a;
lifeSpan_dequeue = 0;
m1++;
}
else {
lifeSpan_dequeue |= LifeSpanString_To_Int(a);
}
}
else
{
// 欲しい内容がくるまでスルー
}
}
if ("" == name_enqueue || "" == name_dequeue) {
goto gt_CommandlineParameterError;
}
}
else
{
goto gt_CommandlineParameterError;
}
return true;
gt_CommandlineParameterError:
std::cerr << getTimeslipNow() << "コマンドライン引数の「--enqueue 送信先キュー名 寿命」「--dequeue 受信元キュー名 寿命」を漏れなく指定してください。" << std::endl
<< "例: --enqueue 1117 durable autodelete --dequeue 1116 durable autodelete" << std::endl
<< "寿命は可変個数設定可能「durable」「autodelete」「passive」「exclusive」" << std::endl
<< "name_enqueue=[" << name_enqueue << "]" << std::endl
<< "name_dequeue=[" << name_dequeue << "]" << std::endl;
return false;
}
}
#else
// バックグラウンド・プロセスに切り替えて使用する可能性も考慮した入出力API
namespace service_io
{
static int cleanMessage() {
service_io::oss.str(std::string());// 空文字列を入れてクリアーする
return 0;
}
static std::string dequeue() {
std::string cmd;
if (!std::getline(std::cin, cmd))
{
cmd = "quit";
}
return cmd;
}
}
#endif
class ServiceIo
{
public:
ServiceIo();
~ServiceIo();
};
#endif // #ifndef APERY_SERVICE_IO_HPP
service_io.cpp
//#include "stdafx.h"
#include "service_io.hpp"
ServiceIo::ServiceIo()
{
}
ServiceIo::~ServiceIo()
{
}
C++言語を使い慣れない間に 改造した。