Posted at

RabbitMQチュートリアル4のサンプルプログラムをCで書く

More than 3 years have passed since last update.


RabbitMQ チュートリアル4 Cバージョン

RabbitMQ チュートリアル https://www.rabbitmq.com/getstarted.html のパート4 Routing のPython版のプログラムをCライブラリ https://github.com/alanxz/rabbitmq-c を使って書いたものです。RabbitMQの説明等は、元のチュートリアルを参照してください。


送信

amqp_basic_publish()で、メッセージをパブリッシュする際に、routing_keyパラメータを設定する。

最終的なプログラムは、以下のとおり。


emit.cc

#include <stdlib.h>

#include <stdio.h>
#include <string>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>

int main (int argc, char** argv) {
std::string message;
std::string severity;
if (argc > 1) {
severity.assign (argv[1]);
} else {
severity.assign ("info");
}
for (int i = 2; i < argc; i ++) {
if (i > 2)
message.append (1, ' ');
message.append (argv[i]);
}

amqp_connection_state_t conn = amqp_new_connection ();

amqp_socket_t* socket = amqp_tcp_socket_new (conn);

int rc_sock = amqp_socket_open (socket, "localhost", 5672);
if (rc_sock != AMQP_STATUS_OK) {
fprintf (stderr, "connection failure.\n");
exit (1);
}

amqp_rpc_reply_t rc_login = amqp_login (conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (rc_login.reply_type != AMQP_RESPONSE_NORMAL) {
if (rc_login.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
if (rc_login.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
amqp_connection_close_t* m = (amqp_connection_close_t *) rc_login.reply.decoded;
fwrite (m->reply_text.bytes, 1, m->reply_text.len, stderr);
fprintf (stderr, "\n");
}
}
exit (1);
}

amqp_channel_t channel = 1;
amqp_channel_open_ok_t* rc_channel = amqp_channel_open (conn, channel);

amqp_bytes_t exchange = amqp_cstring_bytes ("direct_logs");
amqp_bytes_t exchtype = amqp_cstring_bytes ("direct");
amqp_exchange_declare_ok_t* dcok = amqp_exchange_declare (conn, channel, exchange, exchtype, false, false, false, false, amqp_empty_table);

amqp_bytes_t bsvr = {severity.length (), (void*)severity.data ()};
amqp_bytes_t bmsg = {message.length (), (void*)message.data ()};
int rc_pub = amqp_basic_publish (conn, channel, exchange, bsvr, false, false, NULL, bmsg);

amqp_rpc_reply_t rc_chclose = amqp_channel_close (conn, channel, AMQP_REPLY_SUCCESS);
amqp_rpc_reply_t rc_conclose = amqp_connection_close (conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection (conn);

return 0;
}



受信

amqp_exchange_declare()で、ダイレクトエクスチェンジを作成し、amqp_queue_bind()で、バインディングを設定する際にrouting_keyパラメータを指定する。

最終的なプログラムは、以下のとおり。


log.cc

#include <stdlib.h>

#include <stdio.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>

void callback (amqp_connection_state_t* conn, amqp_envelope_t* envelope, amqp_bytes_t body) {
printf (" [x] ");
fwrite (envelope->routing_key.bytes, 1, envelope->routing_key.len, stdout);
printf (":");
fwrite (body.bytes, 1, body.len, stdout);
printf ("\n");
}

int main (int argc, char** argv) {
if (argc < 2) {
fprintf (stderr, "usage: %s [info] [warning] [error]\n", argv[0]);
exit (1);
}

amqp_connection_state_t conn = amqp_new_connection ();

amqp_socket_t* socket = amqp_tcp_socket_new (conn);

int rc_sock = amqp_socket_open (socket, "localhost", 5672);
if (rc_sock != AMQP_STATUS_OK) {
fprintf (stderr, "connection failure.\n");
exit (1);
}

amqp_rpc_reply_t rc_login = amqp_login (conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (rc_login.reply_type != AMQP_RESPONSE_NORMAL) {
if (rc_login.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
if (rc_login.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
amqp_connection_close_t* m = (amqp_connection_close_t *) rc_login.reply.decoded;
fwrite (m->reply_text.bytes, 1, m->reply_text.len, stderr);
fprintf (stderr, "\n");
}
}
exit (1);
}

amqp_channel_t channel = 1;
amqp_channel_open_ok_t* rc_channel = amqp_channel_open (conn, channel);

amqp_bytes_t exchange = amqp_cstring_bytes ("direct_logs");
amqp_bytes_t exchtype = amqp_cstring_bytes ("direct");
amqp_exchange_declare_ok_t* rc_exch = amqp_exchange_declare (conn, channel, exchange, exchtype, false, false, false, false, amqp_empty_table);

amqp_queue_declare_ok_t* rc_dcl = amqp_queue_declare (conn, channel, amqp_empty_bytes, false, false, true/*exclusive*/, false, amqp_empty_table);

for (int i = 1; i < argc; i ++) {
amqp_bytes_t routing_key = amqp_cstring_bytes (argv[i]);
amqp_queue_bind_ok_t* rc_bind = amqp_queue_bind (conn, channel, rc_dcl->queue, exchange, routing_key, amqp_empty_table);
}

amqp_basic_consume_ok_t* rc_con = amqp_basic_consume (conn, channel, rc_dcl->queue, amqp_empty_bytes, false, true/*no ack*/, false, amqp_empty_table);

printf (" [*] Waiting for messages. To exit press CTRL+C\n");

while (1) {
amqp_maybe_release_buffers (conn);
amqp_envelope_t envelope;
amqp_rpc_reply_t rc_msg = amqp_consume_message (conn, &envelope, NULL, 0);
switch (rc_msg.reply_type) {
case AMQP_RESPONSE_NORMAL:
callback (&conn, &envelope, envelope.message.body);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
if (rc_msg.library_error == AMQP_STATUS_UNEXPECTED_STATE) {
amqp_frame_t frame;
if (amqp_simple_wait_frame (conn, &frame) != AMQP_STATUS_OK) {
goto Ex1;
}
if (frame.frame_type == AMQP_FRAME_METHOD) {
switch (frame.payload.method.id) {
case AMQP_BASIC_ACK_METHOD:
break;
case AMQP_BASIC_RETURN_METHOD: {
amqp_message_t message;
amqp_rpc_reply_t rc_read = amqp_read_message (conn, frame.channel, &message, 0);
if (rc_read.reply_type != AMQP_RESPONSE_NORMAL) {
goto Ex1;
}
amqp_destroy_message (&message);
break;
}
case AMQP_CHANNEL_CLOSE_METHOD:
goto Ex1;
case AMQP_CONNECTION_CLOSE_METHOD:
goto Ex1;
default:
fprintf (stderr ,"An unexpected method was received %d\n", frame.payload.method.id);
goto Ex1;
}
}
}
break;
default:;
}
amqp_destroy_envelope (&envelope);
}

Ex1:;
amqp_rpc_reply_t rc_chclose = amqp_channel_close (conn, channel, AMQP_REPLY_SUCCESS);
amqp_rpc_reply_t rc_conclose = amqp_connection_close (conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection (conn);

return 0;
}