前回の記事 : http://qiita.com/muzudho1/items/1432040086218e14ebe2
標準入出力をキーボード、画面に向けて開けていると、バックグラウンドに持っていったとき 認証否認で落ちるのだろう、と目星をつけたのが 記事<その7> だった。
テストケース作成
a1 : (バックグラウンドに回さない)キーボード入力で、メッセージキュー「1112」にメッセージを送れるものとする。常時 メッセージキュー「1113」を監視し、メッセージが入っていれば 画面に出すものとする。
このプログラムは 必要になったら実行し、ログアウトしたら消えているものと想定する。
a2 : (バックグラウンドに回す)常時 メッセージキュー「1112」を監視し、メッセージが入っていれば その文字列に「daze」と付けて メッセージキュー「1113」にエンキューする。(現状、C++でエンキューする方法は分からない。ファイルで代用する)
このプログラムは ログアウトしても稼働しているものとする。
これで a2 が常時 稼働していて、a1 でアクセスできるのなら よしとする。
tamesi33a1_cs.cs
// OS : Windows 10
// IDE : Visual Studio 2015
// Install : NuGet : Install-Package RabbitMQ.Client -Version 4.1.1
//
// OS : Ubuntu 16.04
// Compile : Command : mcs /r:RabbitMQ.Client.dll -define:UBUNTU tamesi33a1_cs.cs
// : Command : chmod 755 tamesi33a1_cs.cs
// Execute : Command : // フォアグラウンドで実行する
// : ./tamesi33a1_cs.exe
// Check : Command : // キューの中身の数を調べる
// : rabbitmqctl list_queues
//--------------------------------------------------------------------------------
// tamesi33a1_cs.cs
// Ubuntu の RabbitMQ はソースのバージョンが古いのか、API が異なった。
// #define UBUNTU
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.ObjectModel;
using System.Text;
namespace UsagiMQ
{
/// <summary>
/// メッセージを エンキュー、デキューします。
/// エンキューは「1112」、デキューは「1113」キューに向けて行います。
///
/// 参照 : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
/// 参照 : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
/// 参照 : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
/// 参照 : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
/// </summary>
class Program
{
public const string HOST_NAME = "localhost";
public static readonly ReadOnlyCollection<string> QUEUE_NAMES =
Array.AsReadOnly(new string[] { "1112", "1113" });
public const int ENQUEUE_INDEX = 0;
public const int DEQUEUE_INDEX = 1;
public static ConnectionFactory GetFactory()
{
if (null == m_factory_)
{
m_factory_ = new ConnectionFactory() { HostName = HOST_NAME };
}
return m_factory_;
}
static ConnectionFactory m_factory_;
public static IConnection GetConnection()
{
if (null == m_connection_)
{
m_connection_ = GetFactory().CreateConnection();
}
return m_connection_;
}
static IConnection m_connection_;
public static IModel GetChannel(int index)
{
if (null == m_channels_[index])
{
m_channels_[index] = GetConnection().CreateModel();
#if UBUNTU
// Ubuntuでは何故か Spring.Messaging.Amqp.Rabbit の引数 7 つのやつになっている。
m_channels_[index].QueueDeclare(QUEUE_NAMES[index], false, false, false, false, false, null);
#else
m_channels_[index].QueueDeclare(QUEUE_NAMES[index], false, false, false, null);
#endif
}
return m_channels_[index];
}
static IModel[] m_channels_ = new IModel[2];
public static EventingBasicConsumer GetConsumer(int index)
{
if (null == m_consumers_[index])
{
#if UBUNTU
// Ubuntuでは何故か v1.4.0 の引数が 0 個のやつになっている。調べたが引数が1個~6個のものは無かった。
m_consumers_[index] = new EventingBasicConsumer();
#else
m_consumers_[index] = new EventingBasicConsumer(GetChannel(index));
#endif
}
return m_consumers_[index];
}
static EventingBasicConsumer[] m_consumers_ = new EventingBasicConsumer[2];
/// <summary>
/// 受信できたときに割り込んでくる処理
/// </summary>
#if UBUNTU
public static BasicDeliverEventHandler GetReceiveHandler()
#else
public static EventHandler<BasicDeliverEventArgs> GetReceiveHandler()
#endif
{
if (null == m_eventHandler_)
{
#if UBUNTU
m_eventHandler_ = new BasicDeliverEventHandler((model, ea) =>
#else
m_eventHandler_ = new EventHandler<BasicDeliverEventArgs>((model, ea) =>
#endif
{
byte[] body = ea.Body;
string message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [interrupt!] Dequeue(^q^) {0}", message);
});
}
return m_eventHandler_;
}
#if UBUNTU
static BasicDeliverEventHandler m_eventHandler_;
#else
static EventHandler<BasicDeliverEventArgs> m_eventHandler_;
#endif
/// <summary>
/// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
/// </summary>
static void CloseConnection()
{
if (null != m_connection_)
{
m_connection_.Close();
m_connection_ = null;
}
}
/// <summary>
/// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
/// </summary>
static void CloseChannel(int index)
{
if (null != m_channels_[index])
{
m_channels_[index].Close();
m_channels_[index] = null;
}
}
static void Main(string[] args)
{
for (;;)
{
Console.WriteLine(@"選べだぜ☆(^~^)
1 : メッセージ エンキュー
2 : メッセージ デキュー
quit: 終了
");
int category = 0;
for (;;)
{
string line = Console.ReadLine();
switch (line)
{
case "1": category = 1; goto gt_EndLoop1;
case "2": category = 2; goto gt_EndLoop1;
case "quit": goto gt_Quit;
default: break;
}
}
gt_EndLoop1:
;
switch (category)
{
case 1:
{
Console.WriteLine(@"メッセージを書いて[Enter]キーを押せだぜ☆(^~^)");
// "Hello World!" などを入力
string line = Console.ReadLine();
Enqueue(line);
}
break;
case 2: Dequeue(); break;
}
}
gt_Quit:
// 対応するオープンは無いが、ちゃんと閉じないと、プロセスが終わってくれない。
CloseConnection();
}
static void Enqueue(string message)
{
IModel channel = GetChannel(ENQUEUE_INDEX);
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", QUEUE_NAMES[ENQUEUE_INDEX], null, body);
Console.WriteLine(" Enqueue(^q^) {0}", message);
// 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
CloseChannel(ENQUEUE_INDEX);
}
static void Dequeue()
{
IModel channel = GetChannel(DEQUEUE_INDEX);
EventingBasicConsumer consumer = GetConsumer(DEQUEUE_INDEX);
// 受信できたときに割り込んでくる処理
consumer.Received += GetReceiveHandler();
#if UBUNTU
// Ubuntuでは何故か引数が 5 個のやつになっている。
channel.BasicConsume( QUEUE_NAMES[DEQUEUE_INDEX], true, "", null, consumer);
#else
channel.BasicConsume( QUEUE_NAMES[DEQUEUE_INDEX], true, consumer);
#endif
Console.WriteLine(" Please, wait dequeue... Or Press any key to menu.");
Console.ReadKey();
// 割込み処理の削除
consumer.Received -= GetReceiveHandler();
// 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
CloseChannel(DEQUEUE_INDEX);
}
}
}
tamesi33a2_cs.cs
// OS : Windows 10
// IDE : Visual Studio 2015
// Install : NuGet : Install-Package RabbitMQ.Client -Version 4.1.1
//
// OS : Ubuntu 16.04
// Compile : Command : mcs /r:RabbitMQ.Client.dll -define:UBUNTU tamesi33a2_cs.cs
// : Command : chmod 755 tamesi33a2_cs.cs
// Execute : Command : // バックグラウンドで実行する
// : nohup ./tamesi33a2_cs.exe > ./tamesi33a2_cs.out.log 2> ./tamesi33a2_cs.err.log < /dev/null &
// Check : : // tamesi33a1_cs.exe を使う
//--------------------------------------------------------------------------------
// tamesi33a2_cs.cs
// Ubuntu の RabbitMQ はソースのバージョンが古いのか、API が異なった。
// #define UBUNTU
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.ObjectModel;
using System.Text;
using System.Threading;
namespace UsagiMQ
{
/// <summary>
/// メッセージの末尾に daze を付けます。
/// デキューは「1112」、エンキューは「1113」キューに向けて行います。
/// 標準入出力は、キーボード、画面には接続しないようにします。
///
/// 参照 : QueueDeclare http://docs.spring.io/spring-amqp-net/docs/1.0.x/api/html/Spring.Messaging.Amqp.Rabbit~Spring.Messaging.Amqp.Rabbit.Connection.CachedModel~QueueDeclare(String,Boolean,Boolean,Boolean,Boolean,Boolean,IDictionary).html
/// 参照 : EventingBasicConsumer https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.Events.EventingBasicConsumer.html
/// 参照 : BasicConsume https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.IModel.html#method-M:RabbitMQ.Client.IModel.BasicConsume(System.UInt16,System.String,System.Boolean,System.Collections.IDictionary,RabbitMQ.Client.IBasicConsumer)
/// 参照 : C#でconstな配列を実現する (もっとクールにプログラミング) http://pgnote.net/?p=885
/// </summary>
class Program
{
public const string HOST_NAME = "localhost";
public static readonly ReadOnlyCollection<string> QUEUE_NAMES =
Array.AsReadOnly(new string[] { "1113", "1112" });
public const int ENQUEUE_INDEX = 0;
public const int DEQUEUE_INDEX = 1;
// 回転式バッファー
public static RotationBuffer rollingBuffer = new RotationBuffer();
public static ConnectionFactory GetFactory()
{
if (null == m_factory_)
{
m_factory_ = new ConnectionFactory() { HostName = HOST_NAME };
}
return m_factory_;
}
static ConnectionFactory m_factory_;
public static IConnection GetConnection()
{
if (null == m_connection_)
{
m_connection_ = GetFactory().CreateConnection();
}
return m_connection_;
}
static IConnection m_connection_;
public static bool IsConnected()
{
if (null != m_connection_)
{
return m_connection_.IsOpen;
}
return false;
}
public static IModel GetChannel(int index)
{
if (null == m_channels_[index])
{
m_channels_[index] = GetConnection().CreateModel();
#if UBUNTU
// Ubuntuでは何故か Spring.Messaging.Amqp.Rabbit の引数 7 つのやつになっている。
m_channels_[index].QueueDeclare(QUEUE_NAMES[index], false, false, false, false, false, null);
#else
m_channels_[index].QueueDeclare(QUEUE_NAMES[index], false, false, false, null);
#endif
}
return m_channels_[index];
}
static IModel[] m_channels_ = new IModel[2];
public static EventingBasicConsumer GetConsumer(int index)
{
if (null == m_consumers_[index])
{
#if UBUNTU
// Ubuntuでは何故か v1.4.0 の引数が 0 個のやつになっている。調べたが引数が1個~6個のものは無かった。
m_consumers_[index] = new EventingBasicConsumer();
#else
m_consumers_[index] = new EventingBasicConsumer(GetChannel(index));
#endif
}
return m_consumers_[index];
}
static EventingBasicConsumer[] m_consumers_ = new EventingBasicConsumer[2];
/// <summary>
/// 受信できたときに割り込んでくる処理
/// </summary>
#if UBUNTU
public static BasicDeliverEventHandler GetReceiveHandler()
#else
public static EventHandler<BasicDeliverEventArgs> GetReceiveHandler()
#endif
{
if (null == m_eventHandler_)
{
#if UBUNTU
m_eventHandler_ = new BasicDeliverEventHandler((model, ea) =>
#else
m_eventHandler_ = new EventHandler<BasicDeliverEventArgs>((model, ea) =>
#endif
{
byte[] body = ea.Body;
string message = Encoding.UTF8.GetString(body);
// 末尾に daze を付ける。
message += "daze";
// 回転式バッファーに入れる
rollingBuffer.PutMessage(message);
});
}
return m_eventHandler_;
}
#if UBUNTU
static BasicDeliverEventHandler m_eventHandler_;
#else
static EventHandler<BasicDeliverEventArgs> m_eventHandler_;
#endif
/// <summary>
/// 対応するオープンは無いけれど、開けたら閉める、を完璧に対応する必要がある。
/// </summary>
static void CloseChannel(int index)
{
if (null != m_channels_[index])
{
m_channels_[index].Close();
m_channels_[index] = null;
}
}
static void Main(string[] args)
{
// 常時監視をスタート
StartDequeue();
// 無限ループ
while (IsConnected())
{
string message = rollingBuffer.GetMessage();
if (null != message)
{
// エンキューする
Enqueue(message);
}
Thread.Sleep(20);
}
// このプログラムは、自分では接続を切らない。
}
static void Enqueue(string message)
{
IModel channel = GetChannel(ENQUEUE_INDEX);
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", QUEUE_NAMES[ENQUEUE_INDEX], null, body);
// 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
CloseChannel(ENQUEUE_INDEX);
}
static void StartDequeue()
{
IModel channel = GetChannel(DEQUEUE_INDEX);
EventingBasicConsumer consumer = GetConsumer(DEQUEUE_INDEX);
// 受信できたときに割り込んでくる処理
consumer.Received += GetReceiveHandler();
#if UBUNTU
// Ubuntuでは何故か引数が 5 個のやつになっている。
channel.BasicConsume( QUEUE_NAMES[DEQUEUE_INDEX], true, "", null, consumer);
#else
channel.BasicConsume( QUEUE_NAMES[DEQUEUE_INDEX], true, consumer);
#endif
// 割込み処理の削除
// consumer.Received -= GetReceiveHandler();
// 対応するオープンは無いが、ちゃんと閉じないと、レシーブしてくれない。
// CloseChannel(DEQUEUE_INDEX);
}
}
/// <summary>
/// 回転式バッファー。
/// これはメイン・スレッドに置く。
/// デキューのスレッドでエンキューすることはできない。
/// デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
/// </summary>
public class RotationBuffer
{
public const int bufferSize = 100;
public string[] buffer = new string[bufferSize];
public int[] bufferCursors = new int[] { 0, 0 };
public const int PUT_INDEX = 0;
public const int GET_INDEX = 1;
public void PutMessage(string message)
{
buffer[bufferCursors[PUT_INDEX]] = message;
bufferCursors[PUT_INDEX]++;
if (!(bufferCursors[PUT_INDEX] < bufferSize))
{
bufferCursors[PUT_INDEX] = 0;
}
}
public string GetMessage()
{
if (null != buffer[bufferCursors[GET_INDEX]])
{
string message = buffer[bufferCursors[GET_INDEX]];
buffer[bufferCursors[GET_INDEX]] = null;
bufferCursors[GET_INDEX]++;
if (!(bufferCursors[GET_INDEX] < bufferSize))
{
bufferCursors[GET_INDEX] = 0;
}
return message;
}
return null;
}
}
}
画面とキーボードに一度も接続しないプログラムなら バックグラウンドに常駐できるんだろうか?
nohup ./tamesi33a2_cs.exe > ./tamesi33a2_cs.out.log 2> ./tamesi33a2_cs.err.log < /dev/null &
で走らせてログアウトし、
# ps aux | grep tamesi33
root 9044 3.0 2.3 341852 23828 ? Sl 13:47 0:05 /usr/bin/cli . tamesi33a2_cs.exe
root 9137 0.0 0.0 12936 984 pts/6 S+ 13:50 0:00 grep --color=auto tamesi33
ログインし直しても プロセスは残っているようだ。このプロセスにアクセスできれば 話しは早いんだが。
プロセス間通信か、それがだめならTCPでポートを使ってソケット通信か。
デキューのスレッドで、エンキューはできない。
回転式バッファーで仲介した。
回転式バッファーはメイン・スレッドに置く。
デキュー処理は、回転式バッファーを仲介にしてエンキュー処理にメッセージを渡す。
これで 1112→1113 の流れはうまくいった。
# nohup ./tamesi33a2_cs.exe > ./tamesi33a2_cs.out.log 2> ./tamesi33a2_cs.err.log < /dev/null &
[1] 11288
root@tk2-217-18401:/home/★user/shogi/csharp_service# jobs
[1]+ Running nohup ./tamesi33a2_cs.exe > ./tamesi33a2_cs.out.log 2> ./tamesi33a2_cs.err.log < /dev/null &
root@tk2-217-18401:/home/★user/shogi/csharp_service# ./tamesi33a1_cs.exe 選べだぜ☆(^~^)
1 : メッセージ エンキュー
2 : メッセージ デキュー
quit: 終了
1
メッセージを書いて[Enter]キーを押せだぜ☆(^~^)
dorayaki
Enqueue(^q^) dorayaki
選べだぜ☆(^~^)
1 : メッセージ エンキュー
2 : メッセージ デキュー
quit: 終了
1
メッセージを書いて[Enter]キーを押せだぜ☆(^~^)
kamome
Enqueue(^q^) kamome
選べだぜ☆(^~^)
1 : メッセージ エンキュー
2 : メッセージ デキュー
quit: 終了
1
メッセージを書いて[Enter]キーを押せだぜ☆(^~^)
sirasu
Enqueue(^q^) sirasu
選べだぜ☆(^~^)
1 : メッセージ エンキュー
2 : メッセージ デキュー
quit: 終了
quit
root@tk2-217-18401:/home/★user/shogi/csharp_service# rabbitmqctl list_queues Listing queues ...
1112 0
1111 0
1113 3
myqueue 0
root@tk2-217-18401:/home/★user/shogi/csharp_service# ./tamesi33a1_cs.exe
選べだぜ☆(^~^)
1 : メッセージ エンキュー
2 : メッセージ デキュー
quit: 終了
2
Please, wait dequeue... Or Press any key to menu.
[interrupt!] Dequeue(^q^) dorayakidaze
[interrupt!] Dequeue(^q^) kamomedaze
[interrupt!] Dequeue(^q^) sirasudaze
次はログアウトしてみる。
root@tk2-217-18401:/home/★user/shogi/csharp_service# rabbitmqctl list_queues
Listing queues ...
1112 0
1111 0
1113 0
myqueue 0
root@tk2-217-18401:/home/★user/shogi/csharp_service# ./tamesi33a1_cs.exe
選べだぜ☆(^~^)
1 : メッセージ エンキュー
2 : メッセージ デキュー
quit: 終了
1
メッセージを書いて[Enter]キーを押せだぜ☆(^~^)
yakitori
Enqueue(^q^) yakitori
選べだぜ☆(^~^)
1 : メッセージ エンキュー
2 : メッセージ デキュー
quit: 終了
1
メッセージを書いて[Enter]キーを押せだぜ☆(^~^)
brush
Enqueue(^q^) brush
選べだぜ☆(^~^)
1 : メッセージ エンキュー
2 : メッセージ デキュー
quit: 終了
1
メッセージを書いて[Enter]キーを押せだぜ☆(^~^)
kamome
Enqueue(^q^) kamome
選べだぜ☆(^~^)
1 : メッセージ エンキュー
2 : メッセージ デキュー
quit: 終了
quit
root@tk2-217-18401:/home/★user/shogi/csharp_service# rabbitmqctl list_queues
Listing queues ...
1112 0
1111 0
1113 3
myqueue 0
root@tk2-217-18401:/home/★user/shogi/csharp_service# ./tamesi33a1_cs.exe
選べだぜ☆(^~^)
1 : メッセージ エンキュー
2 : メッセージ デキュー
quit: 終了
2
Please, wait dequeue... Or Press any key to menu.
[interrupt!] Dequeue(^q^) yakitoridaze
[interrupt!] Dequeue(^q^) brushdaze
[interrupt!] Dequeue(^q^) kamomedaze
いけた。
バックグラウンドに回したプロセスとは キーボード、画面に 標準入出力を接続するのは おさらば ということらしい。
OSは シングル・タスクなんだろうか?
C++のプロセス通信の publish の方をどうしようか
C++は プロセス通信のpublish が動いていないのだった。何か探さないとな。
「Clients & Developer Tools」(RabbitMQ)
https://www.rabbitmq.com/devtools.html
「RabbitMQ C client」(alanxz/rabbitmq-c)
https://github.com/alanxz/rabbitmq-c
C言語ならこれらしい。コンパイラーとか g++ でもいいのか?
「SimpleAmqpClient」(alanxz/SimpleAmqpClient)
https://github.com/alanxz/SimpleAmqpClient
これを使うには、上記のC言語のやつがいるらしい。2度手間だなあ。
「abbitcpp」(akalend/amqpcpp)
https://github.com/akalend/amqpcpp
これには publish の example もある。これがいいんじゃないか。
AMQP-CPP と amqpcpp
名前がまぎらわしすぎだろ。ファイルとか 衝突しなければいいが。
amqpcpp を Git Hub からダウンロードしてきて、FileZilla で Windows 10 から Ubuntu へファイル移動。
# make install
make: *** No rule to make target 'install'. Stop.
なんのこっちゃ。
# make
g++ -Wall -I/usr/local/include -L/usr/local/lib -Iinclude/ -c -o src/AMQP.o src/AMQP.cpp
In file included from src/AMQP.cpp:9:0:
include/AMQPcpp.h:44:18: fatal error: amqp.h: No such file or directory
compilation terminated.
<builtin>: recipe for target 'src/AMQP.o' failed
make: *** [src/AMQP.o] Error 1
うーむ。
なんだろうこれ。
zip のリンクをクリック。
解凍して rabbitmq-codegen-458c4557f51c を FileZilla で移動。
# cd rabbitmq-codegen-458c4557f51c
# make install
make: *** No rule to make target 'install'. Stop.
これも違う。
なんか Makefile の中身が足りないんじゃないか?
AMQP-CPP の publish の使い方をググろう
「利用AMQP-CPP访问RabbitMq,消息不能超过10K否则consume不到」(cnblogs)
https://q.cnblogs.com/q/82696/
channel.declareQueue(AMQP::exclusive).onSuccess(callback);
でコールバックを読んで、そのコールバックの中で
channel.publish("","rpc_queue",env);
と書くのだろうか?
「AMQP-CPP/tests/myconnection.cpp」(CopernicaMarketingSoftware/AMQP-CPP)
https://github.com/CopernicaMarketingSoftware/AMQP-CPP/blob/master/tests/myconnection.cpp
// OS : Ubuntu 16.04
// Server : Sakura VPS (Rental)
// Library : libev
// : Install : Command : sudo apt-get update
// : sudo apt-get install libev-dev
// Service : RabbitMQ
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Manual : Command : man rabbitmqctl
// : Start : Command : rabbitmq-server
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : : Command : rabbitmqctl list_queues
// Library : AMQP-CPP
// : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
// : Directory : Command : cd /usr/include/amqpcpp/
// : Compile : Command : g++ -std=c++11 publish.cpp -o publish.exe -lev -lamqpcpp -pthread
// : Execute : Command : ./publish.exe
//
// publish.cpp
// Reffered source code : how to publish messages #109 https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/109
// Reffered source code : https://github.com/CopernicaMarketingSoftware/AMQP-CPP/blob/master/tests/myconnection.cpp
#include <iostream>
#include <string>
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
auto* loop = EV_DEFAULT;
AMQP::LibEvHandler handler{loop};
AMQP::Address address{"amqp://localhost:5672"};
AMQP::TcpConnection connection{&handler, address};
AMQP::TcpChannel channel{&connection};
std::string exchange_name = "myexchange";
std::string queue_name = "myqueue";
std::string routing_key = "";
int main(int argc, char* argv[]) {
std::cout << "channel.connected: " << channel.connected() << "\n";
channel.declareExchange(exchange_name, AMQP::direct, AMQP::durable);
channel.declareQueue(queue_name, AMQP::durable)
.onSuccess([](const std::string& name, uint32_t messages, uint32_t consumers) {
std::cout << "declared queue " << name << " (messages: " << messages << ", consumers: " << consumers << ")\n";
})
.onError([](const char* msg) {
std::cout << "error declaring queue: " << msg << "\n";
});
channel.onReady([]() {
std::cout << "onReady\n";
});
channel.bindQueue(exchange_name, queue_name, routing_key)
.onSuccess([](){
std::cout << "bindQueue.onSuccess\n";
std::string msg = "hello, japan";
if (channel.publish(exchange_name, routing_key, msg.c_str(), msg.size())) {
std::cout << "publish ok\n";
} else {
std::cout << "failed to publish?\n";
}
// break in ev loop.
connection.close();
});
// We will monitor until the connection is lost. Execute channel.declareQueue( ... ).
ev_run(loop);
return 0;
}
/*
# ./publish.exe
channel.connected: 1
onReady
declared queue myqueue (messages: 1, consumers: 0)
bindQueue.onSuccess
publish ok
*/
こう書くと myqueue では動くが、 "1111" では動かない。キューに状態があるのだろうか?
publish でけた
キューの状態が durable, autodelete, passive, exclusive の4つあって、合わないのを設定してはいけないのか。
// OS : Ubuntu 16.04
// Server : Sakura VPS (Rental)
// Library : libev
// : Install : Command : sudo apt-get update
// : sudo apt-get install libev-dev
// Service : RabbitMQ
// : Reference : Web site : Top page http://www.rabbitmq.com/
// : Install : Web site : Installing on Debian / Ubuntu http://www.rabbitmq.com/install-debian.html
// : Manual : Command : man rabbitmqctl
// : Start : Command : rabbitmq-server
// : Stop : Command : rabbitmqctl stop
// : Check : Command : rabbitmqctl status
// : : Command : rabbitmqctl list_queues
// Library : AMQP-CPP
// : Reference : Web site : AMQP-CPP README.md https://github.com/CopernicaMarketingSoftware/AMQP-CPP
// : Directory : Command : cd /usr/include/amqpcpp/
// : Compile : Command : g++ -std=c++11 publish.cpp -o publish.exe -lev -lamqpcpp -pthread
// : Execute : Command : ./publish.exe
//
// publish.cpp
// Reffered source code : how to publish messages #109 https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/109
// Reffered source code : https://github.com/CopernicaMarketingSoftware/AMQP-CPP/blob/master/tests/myconnection.cpp
#include <iostream>
#include <string>
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
int main(int argc, char* argv[]) {
auto* loop = EV_DEFAULT;
AMQP::LibEvHandler handler{loop};
AMQP::Address address{"amqp://localhost:5672"};
AMQP::TcpConnection connection{&handler, address};
AMQP::TcpChannel channel{&connection};
std::string exchange_name = "myexchange";
std::string queue_name = "1111";
std::string routing_key = "";
std::cout << "channel.connected: " << channel.connected() << "\n";
channel.declareQueue(queue_name)
.onSuccess([](const std::string& name, uint32_t messages, uint32_t consumers) {
std::cout << "declared queue " << name << " (messages: " << messages << ", consumers: " << consumers << ")\n";
})
.onError([](const char* errMsg) {
std::cout << "error declaring queue: " << errMsg << "\n";
});
channel.onReady([]() {
std::cout << "onReady\n";
});
channel.bindQueue(exchange_name, queue_name, routing_key)
.onSuccess([&connection,&channel,&exchange_name,&routing_key](){
std::cout << "bindQueue.onSuccess\n";
std::string msg = "hello, japan";
if (channel.publish(exchange_name, routing_key, msg.c_str(), msg.size())) {
std::cout << "publish ok\n";
} else {
std::cout << "failed to publish?\n";
}
// break in ev loop.
connection.close();
});
// We will monitor until the connection is lost. Execute channel.declareQueue( ... ), etc.
ev_run(loop);
return 0;
}
/*
channel.connected: 1
onReady
declared queue 1111 (messages: 0, consumers: 0)
bindQueue.onSuccess
publish ok
*/
じゃあ、浮かむ瀬の標準入出力を プロセス間通信に置き換えよう
ストリーム出力なんか すごい数あるが、どう置き換えるのがいいのか。