Edited at

RabbitMQチュートリアル1のサンプルプログラムをCで書く

More than 3 years have passed since last update.


RabbitMQ チュートリアル1 Cバージョン

RabbitMQ チュートリアル https://www.rabbitmq.com/getstarted.html のPython版のプログラムを下敷きにして、Cライブラリ https://github.com/alanxz/rabbitmq-c を使ったプログラムを書きます。RabbitMQの説明等は、元のチュートリアルを参照してください。

プログラム自体は C++を使っていますが、class定義などを使っていないので、Cでも大きな違いはありません。エラーチェックのコードは、適宜追加してください。


送信

RabbitMQサーバとのコネクションを確立するために、まずコネクションオブジェクトを作成する。コネクションオブジェクトの使用終了後は、amqp_destroy_connection()で破棄する。


amqp.h

amqp_connection_state_t  amqp_new_connection(void);

int amqp_destroy_connection(amqp_connection_state_t state);

RabbitMQサーバに接続するには、amqp_tcp_socket_new()でソケットを作成し、amqp_socket_open()でホスト、ポート番号を指定してコネクションを張る。


amqp_tcp_socket.h

amqp_socket_t*  amqp_tcp_socket_new(amqp_connection_state_t state);

int amqp_socket_open(amqp_socket_t* socket, const char *host, int port);

サーバに接続後、バーチャルホスト、認証方法、ID、PWを送信してログインする。


amqp.h

amqp_rpc_reply_t  amqp_login(amqp_connection_state_t state,

char const *vhost,
int channel_max, int frame_max, int heartbeat,
amqp_sasl_method_enum sasl_method, ...);

正常終了の場合、戻り値の amqp_rpc_reply_t のメンバ reply_type に AMQP_RESPONSE_NORMAL が格納される。

キューを作成するには、amqp_queue_declare()を使う。


amqp_framing.h

amqp_queue_declare_ok_t*  amqp_queue_declare(amqp_connection_state_t state,

amqp_channel_t channel,
amqp_bytes_t queue,
amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive, amqp_boolean_t auto_delete,
amqp_table_t arguments);

キュー名などのテキストデータは amqp_bytes_t構造体で指定する。この構造体は、バイトデータの長さとデータへのポインタからなる。関数への受け渡しでは、この構造体がコピーされる。


amqp.h

typedef struct amqp_bytes_t_ {

size_t len; /* バッファのバイト数 */
void* bytes; /* バッファへのポインタ */
} amqp_bytes_t;

C文字列からamqp_bytes_t構造体を生成するには、amqp_cstring_bytes()関数が利用できる。

exchangeとrouting_keyを指定して、キューのexchangeにデータを送信する。


amqp.h

int  amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel,

amqp_bytes_t exchange, amqp_bytes_t routing_key,
amqp_boolean_t mandatory, amqp_boolean_t immediate,
amqp_basic_properties_t const* properties,
amqp_bytes_t body);

最終的なプログラムは以下のとおり。


send.cc

#include <stdio.h>

#include <stdlib.h>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>

int main (int argc, char** argv) {
amqp_connection_state_t conn = amqp_new_connection ();

amqp_socket_t* socket = amqp_tcp_socket_new (conn);

int rc_sock = amqp_socket_open (socket, "localhost", 5672);
if (rc_sock != AMQP_STATUS_OK) {
fprintf (stderr, "connection failure.\n");
exit (1);
}

amqp_rpc_reply_t rc_login = amqp_login (conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (rc_login.reply_type != AMQP_RESPONSE_NORMAL) {
if (rc_login.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
if (rc_login.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
amqp_connection_close_t *m = (amqp_connection_close_t *) rc_login.reply.decoded;
fwrite (m->reply_text.bytes, 1, m->reply_text.len, stderr);
fprintf (stderr, "\n");
}
}
exit (1);
}

amqp_channel_t channel = 1;
amqp_channel_open_ok_t* rc_channel = amqp_channel_open (conn, channel);

amqp_bytes_t queue = amqp_cstring_bytes ("hello");
amqp_queue_declare_ok_t* rc_decl = amqp_queue_declare (conn, channel, queue, false, false, false, false, amqp_empty_table);

amqp_bytes_t msg = amqp_cstring_bytes ("Hello World!");
int rc_pub = amqp_basic_publish (conn, channel, amqp_empty_bytes, queue, false, false, NULL, msg);

amqp_rpc_reply_t rc_chclose = amqp_channel_close (conn, channel, AMQP_REPLY_SUCCESS);
amqp_rpc_reply_t rc_conclose = amqp_connection_close (conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection (conn);

return 0;
}



受信

ライブラリ自体には、イベントループやコールバックの仕組みは用意されていないが、Pythonプログラムの構造にならって、コールバック関数を用意する。ここでは、キューから受け取ったメッセージデータ以外に、コネクションオブジェクトとメッセージエンベロープを引数とする。

void  callback (amqp_connection_state_t* conn, amqp_envelope_t* envelope, amqp_bytes_t body);

amqp_basic_consume()で受信方法を指定する。no_ackオプションを指定するには、6番目のパラメータのno_ackにtrueをセットする。


amqp_framing.h

amqp_basic_consume_ok_t*  amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel,

amqp_bytes_t queue, amqp_bytes_t consumer_tag,
amqp_boolean_t no_local, amqp_boolean_t no_ack, amqp_boolean_t exclusive,
amqp_table_t arguments);

amqp_consume_message()でメッセージを待ち受ける。メッセージ以外のデータを受信した場合、適宜処理する。


amqp.h

amqp_rpc_reply_t  amqp_consume_message(amqp_connection_state_t state,

amqp_envelope_t *envelope,
struct timeval *timeout, int flags);


最終的なプログラムは以下のとおり。


recv.cc

#include <stdio.h>

#include <stdlib.h>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>

void callback (amqp_connection_state_t* conn, amqp_envelope_t* envelope, amqp_bytes_t body) {
printf (" [x] Received ");
fwrite (body.bytes, 1, body.len, stdout);
printf ("\n");
}

int main (int argc, char** argv) {
amqp_connection_state_t conn = amqp_new_connection ();

amqp_socket_t* socket = amqp_tcp_socket_new (conn);

int rc_sock = amqp_socket_open (socket, "localhost", 5672);
if (rc_sock != AMQP_STATUS_OK) {
fprintf (stderr, "connection failure.\n");
exit (1);
}

amqp_rpc_reply_t rc_login = amqp_login (conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (rc_login.reply_type != AMQP_RESPONSE_NORMAL) {
if (rc_login.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
if (rc_login.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
amqp_connection_close_t *m = (amqp_connection_close_t *) rc_login.reply.decoded;
fwrite (m->reply_text.bytes, 1, m->reply_text.len, stderr);
fprintf (stderr, "\n");
}
}
exit (1);
}

amqp_channel_t channel = 1;
amqp_channel_open_ok_t* rc_channel = amqp_channel_open (conn, channel);

amqp_bytes_t queue = amqp_cstring_bytes ("hello");
amqp_queue_declare_ok_t* rc_decl = amqp_queue_declare (conn, channel, queue, false, false, false, false, amqp_empty_table);

amqp_basic_consume_ok_t* rc_cons = amqp_basic_consume (conn, channel, queue, amqp_empty_bytes, false, true/*no_ack*/, false, amqp_empty_table);

printf (" [*] Waiting for messages. To exit press CTRL+C\n");

while (1) {
amqp_maybe_release_buffers (conn);
amqp_envelope_t envelope;
amqp_rpc_reply_t rc_msg = amqp_consume_message (conn, &envelope, NULL, 0);
switch (rc_msg.reply_type) {
case AMQP_RESPONSE_NORMAL:
callback (&conn, &envelope, envelope.message.body);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
if (rc_msg.library_error == AMQP_STATUS_UNEXPECTED_STATE) {
amqp_frame_t frame;
if (amqp_simple_wait_frame (conn, &frame) != AMQP_STATUS_OK) {
goto Ex1;
}
if (frame.frame_type == AMQP_FRAME_METHOD) {
switch (frame.payload.method.id) {
case AMQP_BASIC_ACK_METHOD:
break;
case AMQP_BASIC_RETURN_METHOD: {
amqp_message_t message;
amqp_rpc_reply_t rc_read = amqp_read_message (conn, frame.channel, &message, 0);
if (rc_read.reply_type != AMQP_RESPONSE_NORMAL) {
goto Ex1;
}
amqp_destroy_message (&message);
break;
}
case AMQP_CHANNEL_CLOSE_METHOD:
goto Ex1;
case AMQP_CONNECTION_CLOSE_METHOD:
goto Ex1;
default:
fprintf (stderr ,"An unexpected method was received %d\n", frame.payload.method.id);
goto Ex1;
}
}
}
break;
default:;
}
amqp_destroy_envelope (&envelope);
}

Ex1:;
amqp_rpc_reply_t rc_chclose = amqp_channel_close (conn, channel, AMQP_REPLY_SUCCESS);
amqp_rpc_reply_t rc_conclose = amqp_connection_close (conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection (conn);

return 0;
}