LoginSignup
11
14

More than 5 years have passed since last update.

GoでRabbitMQに接続してみる

Posted at

環境・バージョン
Windows 7
go 1.4.2
RabbitMQ 3.4.1
go amqp 0.9.1

AMQPクライアントの導入

go get github.com/streadway/amqp

Subscriber

sub.go
package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

var amqpURI string = "amqp://localhost:5672"

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    conn, err := amqp.Dial(amqpURI)
    failOnError(err, "Failed to connect to MQ")
    defer conn.Close()

    channel, err := conn.Channel()
    failOnError(err, "Failed to open a channel")

    q, err := channel.QueueDeclare(
        "my-queue", // name
        false,      // durable
        false,      // delete when unused
        false,      // exclusive
        false,      // no-wait
        nil,        // arguments
    )
    failOnError(err, "Failed to declare a queue")

    messages, err := channel.Consume(
        q.Name,     // queue
        "",         // consumer
        true,       // auto-ack
        false,      // exclusive
        false,      // no-local
        false,      // no-wait
        nil,        // arguments
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for data := range messages {
            log.Printf("%s\n", data.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
    <-forever
}

Publisher

pub.go
package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

var amqpURI string = "amqp://localhost:5672"

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    conn, err := amqp.Dial(amqpURI)
    failOnError(err, "Failed to connect to MQ")
    defer conn.Close()

    channel, err := conn.Channel()
    failOnError(err, "Failed to open a channel")

    sendMessage(channel, "my-queue", "Hello, AMQP1!!")
    sendMessage(channel, "my-queue", "Hello, AMQP2!!")
    sendMessage(channel, "my-queue", "Hello, AMQP3!!")
}

func sendMessage(channel *amqp.Channel, queueName string, payload string) {
    err := channel.Publish(
        "",     // exchange
        queueName,  // routing key
        false,      // mandatory
        false,      // immediate
        amqp.Publishing{
            ContentType:    "text/plain",
            Body:           []byte(payload),
        })
    failOnError(err, "Failed to publish a message")
}

実行結果

2015/06/17 11:27:40  [*] Waiting for messages. To exit press CTRL+C
2015/06/17 11:27:45 Hello, AMQP1!!
2015/06/17 11:27:45 Hello, AMQP2!!
2015/06/17 11:27:45 Hello, AMQP3!!
11
14
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
14