関連記事 : http://qiita.com/muzudho1/items/aaedf39c171738358003
雑に説明する。
Ubuntu16.04 で プロセスA が プロセスB に文字列を送ろう、というものだ。
From | To | Result |
---|---|---|
C++ | C# | OK |
C# | C++ | OK |
C# | C# | OK |
C# | PHP | OK |
PHP | C++ | OK |
PHP | C# | OK |
PHP | PHP | OK |
C++、C#、PHPのキューに入れる、出すは単体テストできたので、多分、組み合わせは全部いけるんじゃないか。 |
RabbitMQ をインストールする
「Installing on Debian / Ubuntu」 (RabbitMQ)
https://www.rabbitmq.com/install-debian.html
Ubuntu でコマンドを打っていけばいい。
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
sudo apt-get update
sudo apt-get install rabbitmq-server
gpg --keyserver pgpkeys.mit.edu --recv-key 7638D0442B90D010
gpg -a --export 7638D0442B90D010 | sudo apt-key add -
echo 'deb http://ftp.debian.org/debian wheezy-backports main' | sudo tee /etc/apt/sources.list.d/wheezy_backports.list
wget -O- https://packages.erlang-solutions.com/debian/erlang_solutions.asc | sudo apt-key add -
echo 'deb https://packages.erlang-solutions.com/debian wheezy contrib' | sudo tee /etc/apt/sources.list.d/esl.list
sudo apt-get update
sudo apt-get install init-system-helpers socat esl-erlang
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
sudo apt-get update
sudo apt-get install rabbitmq-server
RabbitMQ を起動する
rabbitmq-server
備考: 停止したいときは 「rabbitmqctl stop」だ。
キーボードの [Ctrl]+[Z] キーを押して止める
bg 1
数字の 1 の部分は rabbitmq-server のジョブ番号を入れること。
これで RabbitMQ サーバーは起ち上がった。プロセス間通信の準備はオッケーだ。
キューの中身の数
文字列の送り方は このあと説明するが、
文字列を送れたかどうかは、次のコマンドで確認できる。
rabbitmqctl list_queues
サンプル・プログラム(C#)
文字を送るのと、受け取るのを テストできるプログラム。わたしが てきとー に書いたので これは模範としないこと。
//--------------------------------------------------------------------------------
// コマンドライン例
//
// # コンパイル
// mcs /r:RabbitMQ.Client.dll -define:UBUNTU MsgQueue.cs
//
// # キューの中身の数を調べる
// rabbitmqctl list_queues
//--------------------------------------------------------------------------------
//
// Ubuntu の RabbitMQ はソースのバージョンが古いのか、API が異なった。
// #define UBUNTU
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace UsagiMQ
{
/// <summary>
/// 参照 : 「QueueDeclare」 http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
/// 参照 : 「EventingBasicConsumer」 https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
/// 参照 : 「BasicConsume」 https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
/// </summary>
class Program
{
public const string
HOST_NAME = "localhost",
QUEUE_NAME = "1111";
public static ConnectionFactory GetFactory()
{
if(null== m_factory_)
{
m_factory_ = new ConnectionFactory() { HostName = HOST_NAME };
}
return m_factory_;
}
static ConnectionFactory m_factory_;
public static IConnection GetConnection()
{
if (null==m_connection_)
{
m_connection_ = GetFactory().CreateConnection();
}
return m_connection_;
}
static IConnection m_connection_;
public static IModel GetChannel()
{
if (null==m_channel_)
{
m_channel_ = GetConnection().CreateModel();
#if UBUNTU
// Ubuntuでは何故か Spring.Messaging.Amqp.Rabbit の引数 7 つのやつになっている。
m_channel_.QueueDeclare(QUEUE_NAME, false, false, false, false, false, null);
#else
m_channel_.QueueDeclare(QUEUE_NAME, false, false, false, null);
#endif
}
return m_channel_;
}
static IModel m_channel_;
public static EventingBasicConsumer GetConsumer()
{
if (null == m_consumer_)
{
#if UBUNTU
// Ubuntuでは何故か v1.4.0 の引数が 0 個のやつになっている。調べたが引数が1個~6個のものは無かった。
m_consumer_ = new EventingBasicConsumer();
#else
m_consumer_ = new EventingBasicConsumer(GetChannel());
#endif
}
return m_consumer_;
}
static EventingBasicConsumer m_consumer_;
/// <summary>
/// 受信できたときに割り込んでくる処理
/// </summary>
#if UBUNTU
public static BasicDeliverEventHandler GetReceiveHandler()
#else
public static EventHandler<BasicDeliverEventArgs> GetReceiveHandler()
#endif
{
if( null== m_eventHandler_ )
{
#if UBUNTU
m_eventHandler_ = new BasicDeliverEventHandler((model, ea) =>
#else
m_eventHandler_ = new EventHandler<BasicDeliverEventArgs>((model, ea) =>
#endif
{
byte[] body = ea.Body;
string message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [interrupt!] Dequeue(^q^) {0}", message);
});
}
return m_eventHandler_;
}
#if UBUNTU
static BasicDeliverEventHandler m_eventHandler_;
#else
static EventHandler<BasicDeliverEventArgs> m_eventHandler_;
#endif
/// <summary>
/// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
/// </summary>
static void CloseConnection()
{
if (null != m_connection_)
{
m_connection_.Close();
m_connection_ = null;
}
}
/// <summary>
/// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
/// </summary>
static void CloseChannel()
{
if (null != m_channel_)
{
m_channel_.Close();
m_channel_ = null;
}
}
static void Main(string[] args)
{
for (;;)
{
Console.WriteLine(@"選べだぜ☆(^~^)
1 : メッセージ エンキュー
2 : メッセージ デキュー
quit: 終了
");
int category = 0;
for (;;)
{
string line = Console.ReadLine();
switch (line)
{
case "1": category = 1; goto gt_EndLoop1;
case "2": category = 2; goto gt_EndLoop1;
case "quit": goto gt_Quit;
default: break;
}
}
gt_EndLoop1:
;
switch (category)
{
case 1:
{
Console.WriteLine(@"メッセージを書いて[Enter]キーを押せだぜ☆(^~^)");
// "Hello World!" などを入力
string line = Console.ReadLine();
Enqueue(line);
}
break;
case 2: Dequeue(); break;
}
}
gt_Quit:
// 対応するオープンは無いが、ちゃんと閉じないと、プロセスが終わってくれない。
CloseConnection();
}
static void Enqueue(string message)
{
IModel channel = GetChannel();
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", QUEUE_NAME, null, body);
Console.WriteLine(" Enqueue(^q^) {0}", message);
//Console.WriteLine(" Press [enter] to menu.");
//Console.ReadLine();
// 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
CloseChannel();
}
static void Dequeue()
{
IModel channel = GetChannel();
EventingBasicConsumer consumer = GetConsumer();
// 受信できたときに割り込んでくる処理
consumer.Received += GetReceiveHandler();
#if UBUNTU
// Ubuntuでは何故か引数が 5 個のやつになっている。
channel.BasicConsume( QUEUE_NAME, true, "", null, consumer);
#else
channel.BasicConsume(QUEUE_NAME, true, consumer);
#endif
Console.WriteLine(" Please, wait dequeue... Or Press any key to menu.");
Console.ReadKey();//.ReadLine();
// 割込み処理の削除
consumer.Received -= GetReceiveHandler();
// 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
CloseChannel();
}
}
}
mono でコンパイルするなら 例えば次のように書く。
mcs /r:RabbitMQ.Client.dll -define:UBUNTU MsgQueue.cs
サンプル・プログラム(C#) エンキューのみ
文字を送るのだけを テストできるプログラム。わたしが てきとー に書いたので これは模範としないこと。
//--------------------------------------------------------------------------------
// コマンドライン例
//
// # コンパイル
// mcs /r:RabbitMQ.Client.dll -define:UBUNTU tamesi30a2.cs
//
// # キューの中身の数を調べる
// rabbitmqctl list_queues
//--------------------------------------------------------------------------------
//
// Ubuntu の RabbitMQ はソースのバージョンが古いのか、API が異なった。
// #define UBUNTU
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace UsagiMQ
{
/// <summary>
/// 参照 : 「QueueDeclare」 http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
/// 参照 : 「EventingBasicConsumer」 https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
/// 参照 : 「BasicConsume」 https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
/// </summary>
class Program
{
public const string
HOST_NAME = "localhost",
QUEUE_NAME = "1111";
public static ConnectionFactory GetFactory()
{
if(null== m_factory_)
{
m_factory_ = new ConnectionFactory() { HostName = HOST_NAME };
}
return m_factory_;
}
static ConnectionFactory m_factory_;
public static IConnection GetConnection()
{
if (null==m_connection_)
{
m_connection_ = GetFactory().CreateConnection();
}
return m_connection_;
}
static IConnection m_connection_;
public static IModel GetChannel()
{
if (null==m_channel_)
{
m_channel_ = GetConnection().CreateModel();
#if UBUNTU
// Ubuntuでは何故か Spring.Messaging.Amqp.Rabbit の引数 7 つのやつになっている。
m_channel_.QueueDeclare(QUEUE_NAME, false, false, false, false, false, null);
#else
m_channel_.QueueDeclare(QUEUE_NAME, false, false, false, null);
#endif
}
return m_channel_;
}
static IModel m_channel_;
public static EventingBasicConsumer GetConsumer()
{
if (null == m_consumer_)
{
#if UBUNTU
// Ubuntuでは何故か v1.4.0 の引数が 0 個のやつになっている。調べたが引数が1個~6個のものは無かった。
m_consumer_ = new EventingBasicConsumer();
#else
m_consumer_ = new EventingBasicConsumer(GetChannel());
#endif
}
return m_consumer_;
}
static EventingBasicConsumer m_consumer_;
/// <summary>
/// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
/// </summary>
static void CloseConnection()
{
if (null != m_connection_)
{
m_connection_.Close();
m_connection_ = null;
}
}
/// <summary>
/// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
/// </summary>
static void CloseChannel()
{
if (null != m_channel_)
{
m_channel_.Close();
m_channel_ = null;
}
}
static void Main(string[] args)
{
for (;;)
{
Console.WriteLine(@"選べだぜ☆(^~^)
1 : メッセージ エンキュー
quit: 終了
");
int category = 0;
for (;;)
{
string line = Console.ReadLine();
switch (line)
{
case "1": category = 1; goto gt_EndLoop1;
case "quit": goto gt_Quit;
default: break;
}
}
gt_EndLoop1:
;
switch (category)
{
case 1:
{
Console.WriteLine(@"メッセージを書いて[Enter]キーを押せだぜ☆(^~^)");
// "Hello World!" などを入力
string line = Console.ReadLine();
Enqueue(line);
}
break;
}
}
gt_Quit:
// 対応するオープンは無いが、ちゃんと閉じないと、プロセスが終わってくれない。
CloseConnection();
}
static void Enqueue(string message)
{
IModel channel = GetChannel();
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", QUEUE_NAME, null, body);
Console.WriteLine(" Enqueue(^q^) {0}", message);
// 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
CloseChannel();
}
}
}
PHPでも送れる
「php-amqplib/php-amqplib」 (Git Hub)
https://github.com/php-amqplib/php-amqplib
「Apparently missing bcmath dependency」 (php-amqplib/php-amqplib)
https://github.com/php-amqplib/php-amqplib/issues/312
PhpAmqpLib ライブラリ、bcmath ライブラリをインストールするには composer を使う。composerの説明は省く。
文字を送るテストができるプログラム。わたしが てきとー に書いたので これは模範としないこと。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// GETクエリ文字列を取得
$QMsg = urldecode($_SERVER['QUERY_STRING']);
if( "" === $QMsg )
{
$QMsg = "hello, world";
}
// 別のクエリーも送れることを説明
echo '(^q^)' . $QMsg . '<br />';
echo 'これを1回クリックしろだぜ☆m9(^~^)!<br />';
echo '<a href="http://★Webサイト/tamesi14.php?ClickSitanDaze">http://★Webサイト/tamesi14.php?ClickSitanDaze</a><br />';
// キューを開く
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QKey = '1111';
// queue_declare(
// $queue = '',
// $passive = false,
// $durable = false,
// $exclusive = false,
// $auto_delete = true,
// $nowait = false,
// $arguments = null,
// $ticket = null
$channel->queue_declare($QKey, false, false, false, false);
// $channel->queue_declare($QKey, false, true, false, true);
// エンキューする
$msgObj = new AMQPMessage( $QMsg );
$channel->basic_publish($msgObj, '', $QKey);
echo "対局プログラムの近くに届けておくぜ☆(^▽^)v ['" . $QMsg . "']\n";
// リソースを開放する
$channel->close();
$connection->close();
PHP でも受け取れる
loop8.php
<?php
// デキュー用
// サーバーの外部からはアクセスできないようにすること。
//
// このプログラムは、コマンドラインで実行する
// php loop6.php
// あるいは
// nohup /usr/bin/php /home/★user/shogi/php_service/loop6.php > /home/★user/shogi/php_log/loop6.out.log 2> /home/★user/shogi/php_log/loop6.err.log < /dev/null &
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// プロセス間通信の前準備をするぜ☆(^~^)
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$QKey = '1111';
$channel->queue_declare($QKey, false, false, false, false);
// キューを開いてしまえば、あとは無限ループ!
// デキューしたタイミングですることをここに書く
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume($QKey, '', false, true, false, false, $callback);
// 無限ループ
while(count($channel->callbacks))
{
// ここでブロックしながらデキューする
$channel->wait();
}
// ここに来ない
// $channel->close();
// $connection->close();
C++で受け取る
AMQP-CPP を Git Hub から落としてくる。
AMQP-CPP
https://github.com/CopernicaMarketingSoftware/AMQP-CPP
Makefile があるディレクトリで
# make
# make install
みたいな感じでコンパイルすると、/usr フォルダーの中にいろいろ入れてくれるようだ。
evライブラリも入れる
sudo apt-get update
sudo apt-get install libev-dev
受取りサンプルプログラム(C++)
文字を受け取るのを テストできるプログラム。わたしが てきとー に書いたので これは模範としないこと。
tamesi29a10_main.cpp
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
int main()
{
// access to the event loop
auto *loop = EV_DEFAULT;
// handler for libev (so we don't have to implement AMQP::TcpHandler!)
AMQP::LibEvHandler handler(loop);
// make a connection
AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://localhost/"));
// we need a channel too
AMQP::TcpChannel channel(&connection);
// // create a temporary queue
// channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) {
//
// // report the name of the temporary queue
// std::cout << "declared queue " << name << std::endl;
//
// // now we can close the connection
// connection.close();
// });
// create a custom callback
auto callback = [](const std::string &name, int msgcount, int consumercount) {
// @todo add your own implementation
// report the name of the temporary queue
std::cout << "(^q^)declared queue " << name << std::endl;
// now we can close the connection
// connection.close();
};
// declare the queue, and install the callback that is called on success
channel.declareQueue("1111").onSuccess(callback);
// callback function that is called when the consume operation starts
auto startCb = [](const std::string &consumertag) {
// std::cout << "(^q^)consume operation started" << std::endl;
std::cout << "(^q^)consume operation started [" << consumertag << "]" << std::endl;
};
// callback function that is called when the consume operation failed
auto errorCb = [](const char *message) {
// std::cout << "(^q^)consume operation failed" << std::endl;
std::cout << "(^q^)consume operation failed [" << &message << "]" << std::endl;
};
// callback operation when a message was received
auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
// std::cout << "(^q^)message received" << std::endl;
// std::cout << "(^q^)message received exchange=[" << &message.exchange() << "] routingkey=[" << &message.routingkey() << "]" << std::endl;
// std::cout << "(^q^)message received body=[" << message.body() << "] bodySize=[" << message.bodySize() << "]" << std::endl;
std::string myString(message.body(), message.bodySize());
std::cout << "(^q^)message received myString=[" << myString << "]" << std::endl;
// acknowledge the message
channel.ack(deliveryTag);
};
// start consuming from the queue, and install the callbacks
channel.consume("1111")
.onReceived(messageCb)
.onSuccess(startCb)
.onError(errorCb);
// run the loop
ev_run(loop, 0);
// done
return 0;
}
コンパイルするときは次のように叩く。わたしが調べて てきとー に書いたので これは模範としないこと。
# g++ -std=c++11 tamesi29a10_main.cpp -lev -lamqpcpp -pthread -o tamesi29a10_main.exe
送りつけサンプルプログラム(C++)
文字を送るのを テストできるプログラム。わたしが てきとー に書いたので これは模範としないこと。
publish.cpp
// OS : Ubuntu 16.04
// Server : Sakura VPS (Rental)
// Library : libev
// : Install : Command : sudo apt-get update
// : sudo apt-get install libev-dev
// Service : RabbitMQ
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Manual : Command : man rabbitmqctl
// : Start : Command : rabbitmq-server
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : : Command : rabbitmqctl list_queues
// Library : AMQP-CPP
// : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
// : Directory : Command : cd /usr/include/amqpcpp/
// : Compile : Command : g++ -std=c++11 publish.cpp -o publish.exe -lev -lamqpcpp -pthread
// : Execute : Command : ./publish.exe
//
// publish.cpp
// Reffered source code : how to publish messages #109 https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/109
// Reffered source code : https://github.com/CopernicaMarketingSoftware/AMQP-CPP/blob/master/tests/myconnection.cpp
#include <iostream>
#include <string>
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
int main(int argc, char* argv[]) {
auto* loop = EV_DEFAULT;
AMQP::LibEvHandler handler{loop};
AMQP::Address address{"amqp://localhost:5672"};
AMQP::TcpConnection connection{&handler, address};
AMQP::TcpChannel channel{&connection};
std::string exchange_name = "myexchange";
std::string queue_name = "1111";
std::string routing_key = "";
std::cout << "channel.connected: " << channel.connected() << "\n";
channel.declareQueue(queue_name)
.onSuccess([](const std::string& name, uint32_t messages, uint32_t consumers) {
std::cout << "declared queue " << name << " (messages: " << messages << ", consumers: " << consumers << ")\n";
})
.onError([](const char* msg) {
std::cout << "error declaring queue: " << msg << "\n";
});
channel.onReady([]() {
std::cout << "onReady\n";
});
channel.bindQueue(exchange_name, queue_name, routing_key)
.onSuccess([&connection,&channel,&exchange_name,&routing_key](){
std::cout << "bindQueue.onSuccess\n";
std::string msg = "hello, japan";
if (channel.publish(exchange_name, routing_key, msg.c_str(), msg.size())) {
std::cout << "publish ok\n";
} else {
std::cout << "failed to publish?\n";
}
// break in ev loop.
connection.close();
});
// We will monitor until the connection is lost. Execute channel.declareQueue( ... ).
ev_run(loop);
return 0;
}
/*
channel.connected: 1
onReady
declared queue 1111 (messages: 0, consumers: 0)
bindQueue.onSuccess
publish ok
*/