Edited at

RabbitMQチュートリアル2のサンプルプログラムをCで書く

More than 3 years have passed since last update.


RabbitMQ チュートリアル2 Cバージョン

RabbitMQ チュートリアル https://www.rabbitmq.com/getstarted.html のパート2 Work queuesのPython版のプログラムをCライブラリ https://github.com/alanxz/rabbitmq-c を使って書いたものです。RabbitMQの説明等は、元のチュートリアルを参照してください。


送信

メッセージを永続化させるため、キューとメッセージにdurableオプションを付ける。amqp_queue_declare()で、キューを定義する時、durableパラメータをtrueにセットする。amqp_basic_publish()で、メッセージを発行する時、propertyパラメータで、delivery_modeを指定する。


amqp_framing.h

#define AMQP_BASIC_CONTENT_TYPE_FLAG (1 << 15) /**< basic.content-type property flag */

#define AMQP_BASIC_CONTENT_ENCODING_FLAG (1 << 14)
/**< basic.content-encoding property flag */
#define AMQP_BASIC_HEADERS_FLAG (1 << 13)
/**< basic.headers property flag */
#define AMQP_BASIC_DELIVERY_MODE_FLAG (1 << 12)
/**< basic.delivery-mode property flag */
#define AMQP_BASIC_PRIORITY_FLAG (1 << 11)
/**< basic.priority property flag */
#define AMQP_BASIC_CORRELATION_ID_FLAG (1 << 10)
/**< basic.correlation-id property flag */
#define AMQP_BASIC_REPLY_TO_FLAG (1 << 9)
/**< basic.reply-to property flag */
#define AMQP_BASIC_EXPIRATION_FLAG (1 << 8)
/**< basic.expiration property flag */
#define AMQP_BASIC_MESSAGE_ID_FLAG (1 << 7)
/**< basic.message-id property flag */
#define AMQP_BASIC_TIMESTAMP_FLAG (1 << 6)
/**< basic.timestamp property flag */
#define AMQP_BASIC_TYPE_FLAG (1 << 5)
/**< basic.type property flag */
#define AMQP_BASIC_USER_ID_FLAG (1 << 4)
/**< basic.user-id property flag */
#define AMQP_BASIC_APP_ID_FLAG (1 << 3)
/**< basic.app-id property flag */
#define AMQP_BASIC_CLUSTER_ID_FLAG (1 << 2)
/**< basic.cluster-id property flag */

typedef struct amqp_basic_properties_t_ {
amqp_flags_t _flags; /**< bit-mask of set fields */
amqp_bytes_t content_type; /**< content-type */
amqp_bytes_t content_encoding; /**< content-encoding */
amqp_table_t headers; /**< headers */
uint8_t delivery_mode; /**< delivery-mode */
uint8_t priority; /**< priority */
amqp_bytes_t correlation_id; /**< correlation-id */
amqp_bytes_t reply_to; /**< reply-to */
amqp_bytes_t expiration; /**< expiration */
amqp_bytes_t message_id; /**< message-id */
uint64_t timestamp; /**< timestamp */
amqp_bytes_t type; /**< type */
amqp_bytes_t user_id; /**< user-id */
amqp_bytes_t app_id; /**< app-id */
amqp_bytes_t cluster_id; /**< cluster-id */
} amqp_basic_properties_t;

delivery_modeの値は、列挙型で定義されている。delivery_modeを指定した時は、_flagsに


amqp.h

typedef enum {

AMQP_DELIVERY_NONPERSISTENT = 1, /**< Non-persistent message */
AMQP_DELIVERY_PERSISTENT = 2 /**< Persistent message */
} amqp_delivery_mode_enum;

最終的なプログラムは以下のとおり。


task.cc

#include <stdlib.h>

#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <string>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>

int main (int argc, char** argv) {
std::string message;
if (argc > 1) {
for (int i = 1; i < argc; i ++) {
if (i > 1)
message.append (1, ' ');
message.append (argv[i]);
}
} else {
message = "Hello World!";
}

amqp_connection_state_t conn = amqp_new_connection ();

amqp_socket_t* socket = amqp_tcp_socket_new (conn);

int rc_sock = amqp_socket_open (socket, "localhost", 5672);
if (rc_sock != AMQP_STATUS_OK) {
fprintf (stderr, "connection failure.\n");
exit (1);
}

amqp_rpc_reply_t rc_login = amqp_login (conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (rc_login.reply_type != AMQP_RESPONSE_NORMAL) {
if (rc_login.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
if (rc_login.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
amqp_connection_close_t *m = (amqp_connection_close_t *) rc_login.reply.decoded;
fwrite (m->reply_text.bytes, 1, m->reply_text.len, stderr);
fprintf (stderr, "\n");
}
}
exit (1);
}

amqp_channel_t channel = 1;
amqp_channel_open_ok_t* rc_channel = amqp_channel_open (conn, channel);

amqp_bytes_t queue = amqp_cstring_bytes ("hello_task");
amqp_queue_declare_ok_t* rc_decl = amqp_queue_declare (conn, channel, queue, false, true/*durable*/, false, false, amqp_empty_table);

amqp_bytes_t bmsg = {message.length (), (void*)message.data ()};
amqp_basic_properties_t prop;
memset (&prop, 0, sizeof (prop));
prop.delivery_mode = AMQP_DELIVERY_PERSISTENT;
prop._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
int rc_pub = amqp_basic_publish (conn, channel, amqp_empty_bytes, queue, false, false, &prop, bmsg);

amqp_rpc_reply_t rc_chclose = amqp_channel_close (conn, channel, AMQP_REPLY_SUCCESS);
amqp_rpc_reply_t rc_conclose = amqp_connection_close (conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection (conn);

return 0;
}



受信

キューからメッセージを受信した時、確認応答を返すまでキューからメッセージを取り除かないよう設定するには、amqp_basic_consume()のno_ackパラメータをfalseにセットして呼び出す。

確認応答を返すには、amqp_basic_ack()を実行する。channel, delivery_tagパラメータは、メッセージエンベロープから取り出す。


amqp.h

int  amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel,

uint64_t delivery_tag, amqp_boolean_t multiple);

qosメソッドの設定を行うには、amqp_basic_qos()を実行する。


amqp_framing.h

amqp_basic_qos_ok_t*  amqp_basic_qos(amqp_connection_state_t state, amqp_channel_t channel,

uint32_t prefetch_size, uint16_t prefetch_count, amqp_boolean_t global);

最終的なプログラムは、以下のとおり。


worker.cc

#include <stdlib.h>

#include <stdio.h>
#include <unistd.h>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>

void callback (amqp_connection_state_t* conn, amqp_envelope_t* envelope, amqp_bytes_t body) {
printf (" [x] Received ");
fwrite (body.bytes, 1, body.len, stdout);
printf ("\n");

int count = 0;
for (int i = 0; i < body.len; i ++) {
if (((char*)body.bytes)[i] == '.')
count ++;
}
sleep (count);
printf (" [x] Done\n");

int rc_ack = amqp_basic_ack (*conn, envelope->channel, envelope->delivery_tag, false);
}

int main (int argc, char** argv) {
amqp_connection_state_t conn = amqp_new_connection ();

amqp_socket_t* socket = amqp_tcp_socket_new (conn);

int rc_sock = amqp_socket_open (socket, "localhost", 5672);
if (rc_sock != AMQP_STATUS_OK) {
fprintf (stderr, "connection failure.\n");
exit (1);
}

amqp_rpc_reply_t rc_login = amqp_login (conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (rc_login.reply_type != AMQP_RESPONSE_NORMAL) {
if (rc_login.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
if (rc_login.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
amqp_connection_close_t *m = (amqp_connection_close_t *) rc_login.reply.decoded;
fwrite (m->reply_text.bytes, 1, m->reply_text.len, stderr);
fprintf (stderr, "\n");
}
}
exit (1);
}

amqp_channel_t channel = 1;
amqp_channel_open_ok_t* rc_channel = amqp_channel_open (conn, channel);

amqp_bytes_t queue = amqp_cstring_bytes ("hello_task");
amqp_queue_declare_ok_t* rc_dcl = amqp_queue_declare (conn, channel, queue, false, true/*durable*/, false, false, amqp_empty_table);

amqp_basic_qos_ok_t* rc_qos = amqp_basic_qos (conn, channel, 0, 1/*prefetch count*/, false);

amqp_basic_consume_ok_t* rc_cons = amqp_basic_consume (conn, channel, queue, amqp_empty_bytes, false, false, false, amqp_empty_table);

printf (" [*] Waiting for messages. To exit press CTRL+C\n");

while (1) {
amqp_maybe_release_buffers (conn);
amqp_envelope_t envelope;
amqp_rpc_reply_t rc_msg = amqp_consume_message (conn, &envelope, NULL, 0);
switch (rc_msg.reply_type) {
case AMQP_RESPONSE_NORMAL:
callback (&conn, &envelope, envelope.message.body);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
if (rc_msg.library_error == AMQP_STATUS_UNEXPECTED_STATE) {
amqp_frame_t frame;
if (amqp_simple_wait_frame (conn, &frame) != AMQP_STATUS_OK) {
goto Ex1;
}
if (frame.frame_type == AMQP_FRAME_METHOD) {
switch (frame.payload.method.id) {
case AMQP_BASIC_ACK_METHOD:
break;
case AMQP_BASIC_RETURN_METHOD: {
amqp_message_t message;
amqp_rpc_reply_t rc_read = amqp_read_message (conn, frame.channel, &message, 0);
if (rc_read.reply_type != AMQP_RESPONSE_NORMAL) {
goto Ex1;
}
amqp_destroy_message (&message);
break;
}
case AMQP_CHANNEL_CLOSE_METHOD:
goto Ex1;
case AMQP_CONNECTION_CLOSE_METHOD:
goto Ex1;
default:
fprintf (stderr ,"An unexpected method was received %d\n", frame.payload.method.id);
goto Ex1;
}
}
}
break;
default:;
}
amqp_destroy_envelope (&envelope);
}

Ex1:;
amqp_rpc_reply_t rc_chclose = amqp_channel_close (conn, channel, AMQP_REPLY_SUCCESS);
amqp_rpc_reply_t rc_conclose = amqp_connection_close (conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection (conn);

return 0;
}