2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RabbitMQチュートリアル1のサンプルプログラムをCで書く

Last updated at Posted at 2016-03-31

RabbitMQ チュートリアル1 Cバージョン

RabbitMQ チュートリアル https://www.rabbitmq.com/getstarted.html のPython版のプログラムを下敷きにして、Cライブラリ https://github.com/alanxz/rabbitmq-c を使ったプログラムを書きます。RabbitMQの説明等は、元のチュートリアルを参照してください。

プログラム自体は C++を使っていますが、class定義などを使っていないので、Cでも大きな違いはありません。エラーチェックのコードは、適宜追加してください。

送信

RabbitMQサーバとのコネクションを確立するために、まずコネクションオブジェクトを作成する。コネクションオブジェクトの使用終了後は、amqp_destroy_connection()で破棄する。

amqp.h
amqp_connection_state_t  amqp_new_connection(void);
int  amqp_destroy_connection(amqp_connection_state_t state);

RabbitMQサーバに接続するには、amqp_tcp_socket_new()でソケットを作成し、amqp_socket_open()でホスト、ポート番号を指定してコネクションを張る。

amqp_tcp_socket.h
amqp_socket_t*  amqp_tcp_socket_new(amqp_connection_state_t state);
int  amqp_socket_open(amqp_socket_t* socket, const char *host, int port);

サーバに接続後、バーチャルホスト、認証方法、ID、PWを送信してログインする。

amqp.h
amqp_rpc_reply_t  amqp_login(amqp_connection_state_t state,
                             char const *vhost,
                             int channel_max, int frame_max, int heartbeat,
                             amqp_sasl_method_enum sasl_method, ...);

正常終了の場合、戻り値の amqp_rpc_reply_t のメンバ reply_type に AMQP_RESPONSE_NORMAL が格納される。

キューを作成するには、amqp_queue_declare()を使う。

amqp_framing.h
amqp_queue_declare_ok_t*  amqp_queue_declare(amqp_connection_state_t state,
                                             amqp_channel_t channel,
                                             amqp_bytes_t queue,
                                             amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive, amqp_boolean_t auto_delete,
                                             amqp_table_t arguments);

キュー名などのテキストデータは amqp_bytes_t構造体で指定する。この構造体は、バイトデータの長さとデータへのポインタからなる。関数への受け渡しでは、この構造体がコピーされる。

amqp.h
typedef struct amqp_bytes_t_ {
  size_t  len;   /* バッファのバイト数 */
  void*  bytes;  /* バッファへのポインタ */
} amqp_bytes_t;

C文字列からamqp_bytes_t構造体を生成するには、amqp_cstring_bytes()関数が利用できる。

exchangeとrouting_keyを指定して、キューのexchangeにデータを送信する。

amqp.h
int  amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel,
                        amqp_bytes_t exchange, amqp_bytes_t routing_key,
                        amqp_boolean_t mandatory, amqp_boolean_t immediate,
                        amqp_basic_properties_t const* properties,
                        amqp_bytes_t body);

最終的なプログラムは以下のとおり。

send.cc
# include <stdio.h>
# include <stdlib.h>
# include <amqp.h>
# include <amqp_tcp_socket.h>
# include <amqp_framing.h>

int  main (int argc, char** argv) {
    amqp_connection_state_t  conn = amqp_new_connection ();

    amqp_socket_t*  socket = amqp_tcp_socket_new (conn);

    int  rc_sock = amqp_socket_open (socket, "localhost", 5672);
    if (rc_sock != AMQP_STATUS_OK) {
        fprintf (stderr, "connection failure.\n");
        exit (1);
    }

    amqp_rpc_reply_t  rc_login = amqp_login (conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    if (rc_login.reply_type != AMQP_RESPONSE_NORMAL) {
        if (rc_login.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
            if (rc_login.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
                amqp_connection_close_t *m = (amqp_connection_close_t *) rc_login.reply.decoded;
                fwrite (m->reply_text.bytes, 1, m->reply_text.len, stderr);
                fprintf (stderr, "\n");
            }
        }
        exit (1);
    }

    amqp_channel_t  channel = 1;
    amqp_channel_open_ok_t*  rc_channel = amqp_channel_open (conn, channel);

    amqp_bytes_t  queue = amqp_cstring_bytes ("hello");
    amqp_queue_declare_ok_t*  rc_decl = amqp_queue_declare (conn, channel, queue, false, false, false, false, amqp_empty_table);

    amqp_bytes_t  msg = amqp_cstring_bytes ("Hello World!");
    int  rc_pub = amqp_basic_publish (conn, channel, amqp_empty_bytes, queue, false, false, NULL, msg);

    amqp_rpc_reply_t  rc_chclose = amqp_channel_close (conn, channel, AMQP_REPLY_SUCCESS);
    amqp_rpc_reply_t  rc_conclose = amqp_connection_close (conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection (conn);

    return 0;
}

受信

ライブラリ自体には、イベントループやコールバックの仕組みは用意されていないが、Pythonプログラムの構造にならって、コールバック関数を用意する。ここでは、キューから受け取ったメッセージデータ以外に、コネクションオブジェクトとメッセージエンベロープを引数とする。

void  callback (amqp_connection_state_t* conn, amqp_envelope_t* envelope, amqp_bytes_t body);

amqp_basic_consume()で受信方法を指定する。no_ackオプションを指定するには、6番目のパラメータのno_ackにtrueをセットする。

amqp_framing.h
amqp_basic_consume_ok_t*  amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel,
                                             amqp_bytes_t queue, amqp_bytes_t consumer_tag,
                                             amqp_boolean_t no_local, amqp_boolean_t no_ack, amqp_boolean_t exclusive,
                                             amqp_table_t arguments);

amqp_consume_message()でメッセージを待ち受ける。メッセージ以外のデータを受信した場合、適宜処理する。

amqp.h
amqp_rpc_reply_t  amqp_consume_message(amqp_connection_state_t state,
                                       amqp_envelope_t *envelope,
                                       struct timeval *timeout, int flags);

最終的なプログラムは以下のとおり。

recv.cc
# include <stdio.h>
# include <stdlib.h>
# include <amqp.h>
# include <amqp_tcp_socket.h>
# include <amqp_framing.h>

void  callback (amqp_connection_state_t* conn, amqp_envelope_t* envelope, amqp_bytes_t body) {
    printf (" [x] Received ");
    fwrite (body.bytes, 1, body.len, stdout);
    printf ("\n");
}

int  main (int argc, char** argv) {
    amqp_connection_state_t  conn = amqp_new_connection ();

    amqp_socket_t*  socket = amqp_tcp_socket_new (conn);

    int  rc_sock = amqp_socket_open (socket, "localhost", 5672);
    if (rc_sock != AMQP_STATUS_OK) {
        fprintf (stderr, "connection failure.\n");
        exit (1);
    }

    amqp_rpc_reply_t  rc_login = amqp_login (conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    if (rc_login.reply_type != AMQP_RESPONSE_NORMAL) {
        if (rc_login.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
            if (rc_login.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
                amqp_connection_close_t *m = (amqp_connection_close_t *) rc_login.reply.decoded;
                fwrite (m->reply_text.bytes, 1, m->reply_text.len, stderr);
                fprintf (stderr, "\n");
            }
        }
        exit (1);
    }

    amqp_channel_t  channel = 1;
    amqp_channel_open_ok_t*  rc_channel = amqp_channel_open (conn, channel);

    amqp_bytes_t  queue = amqp_cstring_bytes ("hello");
    amqp_queue_declare_ok_t*  rc_decl = amqp_queue_declare (conn, channel, queue, false, false, false, false, amqp_empty_table);

    amqp_basic_consume_ok_t*  rc_cons = amqp_basic_consume (conn, channel, queue, amqp_empty_bytes, false, true/*no_ack*/, false, amqp_empty_table);

    printf (" [*] Waiting for messages. To exit press CTRL+C\n");

    while (1) {
        amqp_maybe_release_buffers (conn);
        amqp_envelope_t  envelope;
        amqp_rpc_reply_t  rc_msg = amqp_consume_message (conn, &envelope, NULL, 0);
        switch (rc_msg.reply_type) {
        case AMQP_RESPONSE_NORMAL:
            callback (&conn, &envelope, envelope.message.body);
            break;
        case AMQP_RESPONSE_LIBRARY_EXCEPTION:
            if (rc_msg.library_error == AMQP_STATUS_UNEXPECTED_STATE) {
                amqp_frame_t  frame;
                if (amqp_simple_wait_frame (conn, &frame) != AMQP_STATUS_OK) {
                    goto Ex1;
                }
                if (frame.frame_type == AMQP_FRAME_METHOD) {
                    switch (frame.payload.method.id) {
                    case AMQP_BASIC_ACK_METHOD:
                        break;
                    case AMQP_BASIC_RETURN_METHOD: {
                        amqp_message_t  message;
                        amqp_rpc_reply_t  rc_read = amqp_read_message (conn, frame.channel, &message, 0);
                        if (rc_read.reply_type != AMQP_RESPONSE_NORMAL) {
                            goto Ex1;
                        }
                        amqp_destroy_message (&message);
                        break;
                    }
                    case AMQP_CHANNEL_CLOSE_METHOD:
                        goto Ex1;
                    case AMQP_CONNECTION_CLOSE_METHOD:
                        goto Ex1;
                    default:
                        fprintf (stderr ,"An unexpected method was received %d\n", frame.payload.method.id);
                        goto Ex1;
                    }
                }
            }
            break;
        default:;
        }
        amqp_destroy_envelope (&envelope);
    }

 Ex1:;
    amqp_rpc_reply_t  rc_chclose = amqp_channel_close (conn, channel, AMQP_REPLY_SUCCESS);
    amqp_rpc_reply_t  rc_conclose = amqp_connection_close (conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection (conn);

    return 0;
}
2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?