RabbitMQ チュートリアル1 Cバージョン
RabbitMQ チュートリアル https://www.rabbitmq.com/getstarted.html のPython版のプログラムを下敷きにして、Cライブラリ https://github.com/alanxz/rabbitmq-c を使ったプログラムを書きます。RabbitMQの説明等は、元のチュートリアルを参照してください。
プログラム自体は C++を使っていますが、class定義などを使っていないので、Cでも大きな違いはありません。エラーチェックのコードは、適宜追加してください。
送信
RabbitMQサーバとのコネクションを確立するために、まずコネクションオブジェクトを作成する。コネクションオブジェクトの使用終了後は、amqp_destroy_connection()で破棄する。
amqp_connection_state_t amqp_new_connection(void);
int amqp_destroy_connection(amqp_connection_state_t state);
RabbitMQサーバに接続するには、amqp_tcp_socket_new()でソケットを作成し、amqp_socket_open()でホスト、ポート番号を指定してコネクションを張る。
amqp_socket_t* amqp_tcp_socket_new(amqp_connection_state_t state);
int amqp_socket_open(amqp_socket_t* socket, const char *host, int port);
サーバに接続後、バーチャルホスト、認証方法、ID、PWを送信してログインする。
amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
char const *vhost,
int channel_max, int frame_max, int heartbeat,
amqp_sasl_method_enum sasl_method, ...);
正常終了の場合、戻り値の amqp_rpc_reply_t のメンバ reply_type に AMQP_RESPONSE_NORMAL が格納される。
キューを作成するには、amqp_queue_declare()を使う。
amqp_queue_declare_ok_t* amqp_queue_declare(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_bytes_t queue,
amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive, amqp_boolean_t auto_delete,
amqp_table_t arguments);
キュー名などのテキストデータは amqp_bytes_t構造体で指定する。この構造体は、バイトデータの長さとデータへのポインタからなる。関数への受け渡しでは、この構造体がコピーされる。
typedef struct amqp_bytes_t_ {
size_t len; /* バッファのバイト数 */
void* bytes; /* バッファへのポインタ */
} amqp_bytes_t;
C文字列からamqp_bytes_t構造体を生成するには、amqp_cstring_bytes()関数が利用できる。
exchangeとrouting_keyを指定して、キューのexchangeにデータを送信する。
int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel,
amqp_bytes_t exchange, amqp_bytes_t routing_key,
amqp_boolean_t mandatory, amqp_boolean_t immediate,
amqp_basic_properties_t const* properties,
amqp_bytes_t body);
最終的なプログラムは以下のとおり。
#include <stdio.h>
#include <stdlib.h>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
int main (int argc, char** argv) {
amqp_connection_state_t conn = amqp_new_connection ();
amqp_socket_t* socket = amqp_tcp_socket_new (conn);
int rc_sock = amqp_socket_open (socket, "localhost", 5672);
if (rc_sock != AMQP_STATUS_OK) {
fprintf (stderr, "connection failure.\n");
exit (1);
}
amqp_rpc_reply_t rc_login = amqp_login (conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (rc_login.reply_type != AMQP_RESPONSE_NORMAL) {
if (rc_login.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
if (rc_login.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
amqp_connection_close_t *m = (amqp_connection_close_t *) rc_login.reply.decoded;
fwrite (m->reply_text.bytes, 1, m->reply_text.len, stderr);
fprintf (stderr, "\n");
}
}
exit (1);
}
amqp_channel_t channel = 1;
amqp_channel_open_ok_t* rc_channel = amqp_channel_open (conn, channel);
amqp_bytes_t queue = amqp_cstring_bytes ("hello");
amqp_queue_declare_ok_t* rc_decl = amqp_queue_declare (conn, channel, queue, false, false, false, false, amqp_empty_table);
amqp_bytes_t msg = amqp_cstring_bytes ("Hello World!");
int rc_pub = amqp_basic_publish (conn, channel, amqp_empty_bytes, queue, false, false, NULL, msg);
amqp_rpc_reply_t rc_chclose = amqp_channel_close (conn, channel, AMQP_REPLY_SUCCESS);
amqp_rpc_reply_t rc_conclose = amqp_connection_close (conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection (conn);
return 0;
}
受信
ライブラリ自体には、イベントループやコールバックの仕組みは用意されていないが、Pythonプログラムの構造にならって、コールバック関数を用意する。ここでは、キューから受け取ったメッセージデータ以外に、コネクションオブジェクトとメッセージエンベロープを引数とする。
void callback (amqp_connection_state_t* conn, amqp_envelope_t* envelope, amqp_bytes_t body);
amqp_basic_consume()で受信方法を指定する。no_ackオプションを指定するには、6番目のパラメータのno_ackにtrueをセットする。
amqp_basic_consume_ok_t* amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel,
amqp_bytes_t queue, amqp_bytes_t consumer_tag,
amqp_boolean_t no_local, amqp_boolean_t no_ack, amqp_boolean_t exclusive,
amqp_table_t arguments);
amqp_consume_message()でメッセージを待ち受ける。メッセージ以外のデータを受信した場合、適宜処理する。
amqp_rpc_reply_t amqp_consume_message(amqp_connection_state_t state,
amqp_envelope_t *envelope,
struct timeval *timeout, int flags);
最終的なプログラムは以下のとおり。
#include <stdio.h>
#include <stdlib.h>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
void callback (amqp_connection_state_t* conn, amqp_envelope_t* envelope, amqp_bytes_t body) {
printf (" [x] Received ");
fwrite (body.bytes, 1, body.len, stdout);
printf ("\n");
}
int main (int argc, char** argv) {
amqp_connection_state_t conn = amqp_new_connection ();
amqp_socket_t* socket = amqp_tcp_socket_new (conn);
int rc_sock = amqp_socket_open (socket, "localhost", 5672);
if (rc_sock != AMQP_STATUS_OK) {
fprintf (stderr, "connection failure.\n");
exit (1);
}
amqp_rpc_reply_t rc_login = amqp_login (conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (rc_login.reply_type != AMQP_RESPONSE_NORMAL) {
if (rc_login.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
if (rc_login.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
amqp_connection_close_t *m = (amqp_connection_close_t *) rc_login.reply.decoded;
fwrite (m->reply_text.bytes, 1, m->reply_text.len, stderr);
fprintf (stderr, "\n");
}
}
exit (1);
}
amqp_channel_t channel = 1;
amqp_channel_open_ok_t* rc_channel = amqp_channel_open (conn, channel);
amqp_bytes_t queue = amqp_cstring_bytes ("hello");
amqp_queue_declare_ok_t* rc_decl = amqp_queue_declare (conn, channel, queue, false, false, false, false, amqp_empty_table);
amqp_basic_consume_ok_t* rc_cons = amqp_basic_consume (conn, channel, queue, amqp_empty_bytes, false, true/*no_ack*/, false, amqp_empty_table);
printf (" [*] Waiting for messages. To exit press CTRL+C\n");
while (1) {
amqp_maybe_release_buffers (conn);
amqp_envelope_t envelope;
amqp_rpc_reply_t rc_msg = amqp_consume_message (conn, &envelope, NULL, 0);
switch (rc_msg.reply_type) {
case AMQP_RESPONSE_NORMAL:
callback (&conn, &envelope, envelope.message.body);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
if (rc_msg.library_error == AMQP_STATUS_UNEXPECTED_STATE) {
amqp_frame_t frame;
if (amqp_simple_wait_frame (conn, &frame) != AMQP_STATUS_OK) {
goto Ex1;
}
if (frame.frame_type == AMQP_FRAME_METHOD) {
switch (frame.payload.method.id) {
case AMQP_BASIC_ACK_METHOD:
break;
case AMQP_BASIC_RETURN_METHOD: {
amqp_message_t message;
amqp_rpc_reply_t rc_read = amqp_read_message (conn, frame.channel, &message, 0);
if (rc_read.reply_type != AMQP_RESPONSE_NORMAL) {
goto Ex1;
}
amqp_destroy_message (&message);
break;
}
case AMQP_CHANNEL_CLOSE_METHOD:
goto Ex1;
case AMQP_CONNECTION_CLOSE_METHOD:
goto Ex1;
default:
fprintf (stderr ,"An unexpected method was received %d\n", frame.payload.method.id);
goto Ex1;
}
}
}
break;
default:;
}
amqp_destroy_envelope (&envelope);
}
Ex1:;
amqp_rpc_reply_t rc_chclose = amqp_channel_close (conn, channel, AMQP_REPLY_SUCCESS);
amqp_rpc_reply_t rc_conclose = amqp_connection_close (conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection (conn);
return 0;
}