0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RabbitMQチュートリアル3のサンプルプログラムをCで書く

Last updated at Posted at 2016-04-02

RabbitMQ チュートリアル3 Cバージョン

RabbitMQ チュートリアル https://www.rabbitmq.com/getstarted.html のパート3 Publish/Subscribe のPython版のプログラムをCライブラリ https://github.com/alanxz/rabbitmq-c を使って書いたものです。RabbitMQの説明等は、元のチュートリアルを参照してください。

送信

エクスチェンジを作成するには、amqp_exchange_declare()を実行する。

amqp_framing.h
amqp_exchange_declare_ok_t*  amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel,
                                                   amqp_bytes_t exchange, amqp_bytes_t type,
                                                   amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t auto_delete, amqp_boolean_t internal,
                                                   amqp_table_t arguments);

最終的なプログラムは、以下のとおり。

emit.cc
# include <stdlib.h>
# include <stdio.h>
# include <string>
# include <amqp.h>
# include <amqp_tcp_socket.h>
# include <amqp_framing.h>

int  main (int argc, char** argv) {
    std::string  message;
    for (int i = 1; i < argc; i ++) {
        if (i > 1)
            message.append (1, ' ');
        message.append (argv[i]);
    }

    amqp_connection_state_t  conn = amqp_new_connection ();

    amqp_socket_t*  socket = amqp_tcp_socket_new (conn);

    int  rc_sock = amqp_socket_open (socket, "localhost", 5672);
    if (rc_sock != AMQP_STATUS_OK) {
        fprintf (stderr, "connection failure.\n");
        exit (1);
    }

    amqp_rpc_reply_t  rc_login = amqp_login (conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    if (rc_login.reply_type != AMQP_RESPONSE_NORMAL) {
        if (rc_login.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
            if (rc_login.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
                amqp_connection_close_t* m = (amqp_connection_close_t *) rc_login.reply.decoded;
                fwrite (m->reply_text.bytes, 1, m->reply_text.len, stderr);
                fprintf (stderr, "\n");
            }
        }
        exit (1);
    }

    amqp_channel_t  channel = 1;
    amqp_channel_open_ok_t*  rc_channel = amqp_channel_open (conn, channel);

    amqp_bytes_t  exchange = amqp_cstring_bytes ("logs");
    amqp_bytes_t  exchtype = amqp_cstring_bytes ("fanout");
    amqp_exchange_declare_ok_t*  rc_exch = amqp_exchange_declare (conn, channel, exchange, exchtype, false, false, false, false, amqp_empty_table);

    amqp_bytes_t  bmsg = {message.length (), (void*)message.data ()};
    int  rc_pub = amqp_basic_publish (conn, channel, exchange, amqp_empty_bytes, false, false, NULL, bmsg);

    amqp_rpc_reply_t  rc_chclose = amqp_channel_close (conn, channel, AMQP_REPLY_SUCCESS);
    amqp_rpc_reply_t  rc_conclose = amqp_connection_close (conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection (conn);

    return 0;
}

受信

キューをエクスチェンジに紐付けるには、amqp_queue_bind()を実行する。

amqp_framing.h
amqp_queue_bind_ok_t*  amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel,
                                       amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key,
                                       amqp_table_t arguments);

最終的なプログラムは、以下のとおり。

recv.cc
# include <stdlib.h>
# include <stdio.h>
# include <amqp.h>
# include <amqp_tcp_socket.h>
# include <amqp_framing.h>

void  callback (amqp_connection_state_t* conn, amqp_envelope_t* envelope, amqp_bytes_t body) {
    printf (" [x] Received ");
    fwrite (body.bytes, 1, body.len, stdout);
    printf ("\n");
}

int  main (int argc, char** argv) {
    amqp_connection_state_t  conn = amqp_new_connection ();

    amqp_socket_t*  socket = amqp_tcp_socket_new (conn);

    int  rc_sock = amqp_socket_open (socket, "localhost", 5672);
    if (rc_sock != AMQP_STATUS_OK) {
        fprintf (stderr, "connection failure.\n");
        exit (1);
    }

    amqp_rpc_reply_t  rc_login = amqp_login (conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    if (rc_login.reply_type != AMQP_RESPONSE_NORMAL) {
        if (rc_login.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
            if (rc_login.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
                amqp_connection_close_t*  m = (amqp_connection_close_t *) rc_login.reply.decoded;
                fwrite (m->reply_text.bytes, 1, m->reply_text.len, stderr);
                fprintf (stderr, "\n");
            }
        }
        exit (1);
    }

    amqp_channel_t  channel = 1;
    amqp_channel_open_ok_t*  rc_channel = amqp_channel_open (conn, channel);

    amqp_bytes_t  exchange = amqp_cstring_bytes ("logs");
    amqp_bytes_t  exchtype = amqp_cstring_bytes ("fanout");
    amqp_exchange_declare_ok_t*  rc_exch = amqp_exchange_declare (conn, channel, exchange, exchtype, false, false, false, false, amqp_empty_table);

    amqp_queue_declare_ok_t*  rc_dcl = amqp_queue_declare (conn, channel, amqp_empty_bytes, false, false, true/*exclusive*/, false, amqp_empty_table);

    amqp_queue_bind_ok_t*  rc_bind = amqp_queue_bind (conn, channel, rc_dcl->queue, exchange, amqp_empty_bytes, amqp_empty_table);

    amqp_basic_consume_ok_t*  rc_cons = amqp_basic_consume (conn, channel, rc_dcl->queue, amqp_empty_bytes, false, true/*no ack*/, false, amqp_empty_table);

    printf (" [*] Waiting for messages. To exit press CTRL+C\n");

    while (1) {
        amqp_maybe_release_buffers (conn);
        amqp_envelope_t  envelope;
        amqp_rpc_reply_t  rc_msg = amqp_consume_message (conn, &envelope, NULL, 0);
        switch (rc_msg.reply_type) {
        case AMQP_RESPONSE_NORMAL:
            callback (&conn, &envelope, envelope.message.body);
            break;
        case AMQP_RESPONSE_LIBRARY_EXCEPTION:
            if (rc_msg.library_error == AMQP_STATUS_UNEXPECTED_STATE) {
                amqp_frame_t  frame;
                if (amqp_simple_wait_frame (conn, &frame) != AMQP_STATUS_OK) {
                    goto Ex1;
                }
                if (frame.frame_type == AMQP_FRAME_METHOD) {
                    switch (frame.payload.method.id) {
                    case AMQP_BASIC_ACK_METHOD:
                        break;
                    case AMQP_BASIC_RETURN_METHOD: {
                        amqp_message_t  message;
                        amqp_rpc_reply_t  rc_read = amqp_read_message (conn, frame.channel, &message, 0);
                        if (rc_read.reply_type != AMQP_RESPONSE_NORMAL) {
                            goto Ex1;
                        }
                        amqp_destroy_message (&message);
                        break;
                    }
                    case AMQP_CHANNEL_CLOSE_METHOD:
                        goto Ex1;
                    case AMQP_CONNECTION_CLOSE_METHOD:
                        goto Ex1;
                    default:
                        fprintf (stderr ,"An unexpected method was received %d\n", frame.payload.method.id);
                        goto Ex1;
                    }
                }
            }
            break;
        default:;
        }
        amqp_destroy_envelope (&envelope);
    }

 Ex1:;
    amqp_rpc_reply_t  rc_chclose = amqp_channel_close (conn, channel, AMQP_REPLY_SUCCESS);
    amqp_rpc_reply_t  rc_conclose = amqp_connection_close (conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection (conn);

    return 0;
}
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?