0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RabbitMQチュートリアル4のサンプルプログラムをCで書く

Posted at

RabbitMQ チュートリアル4 Cバージョン

RabbitMQ チュートリアル https://www.rabbitmq.com/getstarted.html のパート4 Routing のPython版のプログラムをCライブラリ https://github.com/alanxz/rabbitmq-c を使って書いたものです。RabbitMQの説明等は、元のチュートリアルを参照してください。

送信

amqp_basic_publish()で、メッセージをパブリッシュする際に、routing_keyパラメータを設定する。

最終的なプログラムは、以下のとおり。

emit.cc
# include <stdlib.h>
# include <stdio.h>
# include <string>
# include <amqp.h>
# include <amqp_tcp_socket.h>
# include <amqp_framing.h>

int  main (int argc, char** argv) {
    std::string  message;
    std::string  severity;
    if (argc > 1) {
        severity.assign (argv[1]);
    } else {
        severity.assign ("info");
    }
    for (int i = 2; i < argc; i ++) {
        if (i > 2)
            message.append (1, ' ');
        message.append (argv[i]);
    }

    amqp_connection_state_t  conn = amqp_new_connection ();

    amqp_socket_t*  socket = amqp_tcp_socket_new (conn);

    int  rc_sock = amqp_socket_open (socket, "localhost", 5672);
    if (rc_sock != AMQP_STATUS_OK) {
        fprintf (stderr, "connection failure.\n");
        exit (1);
    }

    amqp_rpc_reply_t  rc_login = amqp_login (conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    if (rc_login.reply_type != AMQP_RESPONSE_NORMAL) {
        if (rc_login.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
            if (rc_login.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
                amqp_connection_close_t*  m = (amqp_connection_close_t *) rc_login.reply.decoded;
                fwrite (m->reply_text.bytes, 1, m->reply_text.len, stderr);
                fprintf (stderr, "\n");
            }
        }
        exit (1);
    }

    amqp_channel_t  channel = 1;
    amqp_channel_open_ok_t*  rc_channel = amqp_channel_open (conn, channel);

    amqp_bytes_t  exchange = amqp_cstring_bytes ("direct_logs");
    amqp_bytes_t  exchtype = amqp_cstring_bytes ("direct");
    amqp_exchange_declare_ok_t*  dcok = amqp_exchange_declare (conn, channel, exchange, exchtype, false, false, false, false, amqp_empty_table);

    amqp_bytes_t  bsvr = {severity.length (), (void*)severity.data ()};
    amqp_bytes_t  bmsg = {message.length (), (void*)message.data ()};
    int  rc_pub = amqp_basic_publish (conn, channel, exchange, bsvr, false, false, NULL, bmsg);

    amqp_rpc_reply_t  rc_chclose = amqp_channel_close (conn, channel, AMQP_REPLY_SUCCESS);
    amqp_rpc_reply_t  rc_conclose = amqp_connection_close (conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection (conn);

    return 0;
}

受信

amqp_exchange_declare()で、ダイレクトエクスチェンジを作成し、amqp_queue_bind()で、バインディングを設定する際にrouting_keyパラメータを指定する。

最終的なプログラムは、以下のとおり。

log.cc
# include <stdlib.h>
# include <stdio.h>
# include <amqp_tcp_socket.h>
# include <amqp.h>
# include <amqp_framing.h>

void  callback (amqp_connection_state_t* conn, amqp_envelope_t* envelope, amqp_bytes_t body) {
    printf (" [x] ");
    fwrite (envelope->routing_key.bytes, 1, envelope->routing_key.len, stdout);
    printf (":");
    fwrite (body.bytes, 1, body.len, stdout);
    printf ("\n");
}

int  main (int argc, char** argv) {
    if (argc < 2) {
        fprintf (stderr, "usage: %s [info] [warning] [error]\n", argv[0]);
        exit (1);
    }

    amqp_connection_state_t  conn = amqp_new_connection ();

    amqp_socket_t*  socket = amqp_tcp_socket_new (conn);

    int  rc_sock = amqp_socket_open (socket, "localhost", 5672);
    if (rc_sock != AMQP_STATUS_OK) {
        fprintf (stderr, "connection failure.\n");
        exit (1);
    }

    amqp_rpc_reply_t  rc_login = amqp_login (conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    if (rc_login.reply_type != AMQP_RESPONSE_NORMAL) {
        if (rc_login.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
            if (rc_login.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
                amqp_connection_close_t*  m = (amqp_connection_close_t *) rc_login.reply.decoded;
                fwrite (m->reply_text.bytes, 1, m->reply_text.len, stderr);
                fprintf (stderr, "\n");
            }
        }
        exit (1);
    }

    amqp_channel_t  channel = 1;
    amqp_channel_open_ok_t*  rc_channel = amqp_channel_open (conn, channel);

    amqp_bytes_t  exchange = amqp_cstring_bytes ("direct_logs");
    amqp_bytes_t  exchtype = amqp_cstring_bytes ("direct");
    amqp_exchange_declare_ok_t*  rc_exch = amqp_exchange_declare (conn, channel, exchange, exchtype, false, false, false, false, amqp_empty_table);

    amqp_queue_declare_ok_t*  rc_dcl = amqp_queue_declare (conn, channel, amqp_empty_bytes, false, false, true/*exclusive*/, false, amqp_empty_table);

    for (int i = 1; i < argc; i ++) {
        amqp_bytes_t  routing_key = amqp_cstring_bytes (argv[i]);
        amqp_queue_bind_ok_t*  rc_bind = amqp_queue_bind (conn, channel, rc_dcl->queue, exchange, routing_key, amqp_empty_table);
    }

    amqp_basic_consume_ok_t*  rc_con = amqp_basic_consume (conn, channel, rc_dcl->queue, amqp_empty_bytes, false, true/*no ack*/, false, amqp_empty_table);

    printf (" [*] Waiting for messages. To exit press CTRL+C\n");

    while (1) {
        amqp_maybe_release_buffers (conn);
        amqp_envelope_t  envelope;
        amqp_rpc_reply_t  rc_msg = amqp_consume_message (conn, &envelope, NULL, 0);
        switch (rc_msg.reply_type) {
        case AMQP_RESPONSE_NORMAL:
            callback (&conn, &envelope, envelope.message.body);
            break;
        case AMQP_RESPONSE_LIBRARY_EXCEPTION:
            if (rc_msg.library_error == AMQP_STATUS_UNEXPECTED_STATE) {
                amqp_frame_t  frame;
                if (amqp_simple_wait_frame (conn, &frame) != AMQP_STATUS_OK) {
                    goto Ex1;
                }
                if (frame.frame_type == AMQP_FRAME_METHOD) {
                    switch (frame.payload.method.id) {
                    case AMQP_BASIC_ACK_METHOD:
                        break;
                    case AMQP_BASIC_RETURN_METHOD: {
                        amqp_message_t  message;
                        amqp_rpc_reply_t  rc_read = amqp_read_message (conn, frame.channel, &message, 0);
                        if (rc_read.reply_type != AMQP_RESPONSE_NORMAL) {
                            goto Ex1;
                        }
                        amqp_destroy_message (&message);
                        break;
                    }
                    case AMQP_CHANNEL_CLOSE_METHOD:
                        goto Ex1;
                    case AMQP_CONNECTION_CLOSE_METHOD:
                        goto Ex1;
                    default:
                        fprintf (stderr ,"An unexpected method was received %d\n", frame.payload.method.id);
                        goto Ex1;
                    }
                }
            }
            break;
        default:;
        }
        amqp_destroy_envelope (&envelope);
    }

 Ex1:;
    amqp_rpc_reply_t  rc_chclose = amqp_channel_close (conn, channel, AMQP_REPLY_SUCCESS);
    amqp_rpc_reply_t  rc_conclose = amqp_connection_close (conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection (conn);

    return 0;
}
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?