LoginSignup
3
5

More than 5 years have passed since last update.

RabbitMQでPublisherとConsumer(Golang編)

Posted at

始めに

DockerでKafkaが上手いこと動かん!!・・・というしょうもない理由でRabbitMQにも手を出してみる。
今回はGoで書きます。クライアントはgithub.com/streadway/amqpを使います。

※RabbitMQ自体の解説はしません。

構成

Golang: 1.9
RabbitMQ: 3.7(Docker repository office
streadway/amqp: master branch

成果物

モデル

とりあえず適当にメッセージのモデルを定義します。

protocol/protocol.go
package protocol

// Protocol メッセージモデル
type Protocol struct {
    Message   string
    Timestamp int64
}

Publisher

続いてPublisher。KafkaではProducerと言ってた。

publisher/main.go
package main

import (
    "encoding/json"
    "flag"
    "fmt"
    "log"
    "time"

    "github.com/lightstaff/go-rabbitmq-example/protocol"
    "github.com/streadway/amqp"
)

var (
    // RabbitMQのURLはパラメータで指定
    rabbitmqURL = flag.String("rabbitmqUrl", "localhost:5672", "Your RabbtMQ URL")
)

func main() {
    flag.Parse()

    if *rabbitmqURL == "" {
        log.Fatalln("[ERROR] require rabbitmqUrl")
    }

    log.Println("publisher start")

    // amqpだから・・・
    url := fmt.Sprintf("amqp://%s", *rabbitmqURL)

    // ダイアルして・・・
    conn, err := amqp.Dial(url)
    if err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }
    defer conn.Close()

    // チャンネル開いて・・・
    ch, err := conn.Channel()
    if err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }
    defer ch.Close()

    // Exchangeを作って・・・
    if err := ch.ExchangeDeclare("test", "fanout", false, true, false, false, nil); err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }

    // とりあえず3回・・・
    for i := 0; i < 3; i++ {
        // メッセージ作って・・・
        p := &protocol.Protocol{
            Message:   fmt.Sprintf("Hello. No%d", i),
            Timestamp: time.Now().UnixNano(),
        }

        // バイナリ化して・・・
        bytes, err := json.Marshal(p)
        if err != nil {
            log.Printf("[ERROR] %s", err.Error())
            continue
        }

        // Publish!!
        if err := ch.Publish("test", "", false, false, amqp.Publishing{
            ContentType: "text/plain",
            Body:        bytes,
        }); err != nil {
            log.Printf("[ERROR] %s", err.Error())
            continue
        }

        log.Printf("[INFO] send message. msg: %v", p)
    }

    log.Println("publisher stop")
}

内容はコードのコメントの通りです。https://www.rabbitmq.com/confirms.html なんてことも出来るらしいけど今回は省略。

Consumer

続いてConsumer。

consumer/main.go
package main

import (
    "context"
    "encoding/json"
    "flag"
    "fmt"
    "log"
    "os"
    "os/signal"

    "github.com/lightstaff/go-rabbitmq-example/protocol"
    "github.com/streadway/amqp"
)

var (
    // RabbitMQのURLはパラメータで指定
    rabbitmqURL = flag.String("rabbitmqUrl", "localhost:5672", "Your RabbtMQ URL")
)

func main() {
    flag.Parse()

    if *rabbitmqURL == "" {
        log.Fatalln("[ERROR] require rabbitmqUrl")
    }

    log.Println("consumer start")

    // amqpだから・・・
    url := fmt.Sprintf("amqp://%s", *rabbitmqURL)

    // goroutineかけるので・・・
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 終了待機するので・・・
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    // ダイアルして・・・
    conn, err := amqp.Dial(url)
    if err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }
    defer conn.Close()

    // チャンネル開いて・・・
    ch, err := conn.Channel()
    if err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }
    defer ch.Close()

    // Exchangeを作って・・・
    if err := ch.ExchangeDeclare("test", "fanout", false, true, false, false, nil); err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }

    // Queueを作って・・・
    q, err := ch.QueueDeclare("", false, true, true, false, nil)
    if err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }

    // QueueにExchangeをBindして・・・
    if err := ch.QueueBind(q.Name, "", "test", false, nil); err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }

    // Consume!!
    msgs, err := ch.Consume(q.Name, "", true, true, false, false, nil)
    if err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }

    // メッセージ受付ルーチン
    go func() {
    CONSUMER_FOR:
        for {
            select {
            case <-ctx.Done():
                break CONSUMER_FOR
            case m, ok := <-msgs:
                if ok {
                    // モデル化して・・・
                    var p protocol.Protocol
                    if err := json.Unmarshal(m.Body, &p); err != nil {
                        log.Printf("[ERROR] %s", err.Error())
                        continue CONSUMER_FOR
                    }

                    log.Printf("[INFO] success consumed. tag: %d, body: %v", m.DeliveryTag, &p)
                }
            }
        }
    }()

    <-signals

    log.Println("consumer stop")
}

これまた内容はコードのコメントの通りですが、Publisherに比べ手順が増えてます。Publisher・Consumer共にch.ExchangeDeclare(...)でExchangeの作成を呼んでますが、GoDocにExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags.とあるので既に作られていた場合はフラグのチェックが行われる模様です(PublisherとConsumerで違っていたらエラーを吐きます)。

実行

PublisherとConsumerをそれぞれ別のコンソールで実行してみます。ついでなんでConsumerは2個起動してみます。

publisher
$: go run main.go -rabbitmqUrl="xxx.xxx.xxx.xxx:5672"
2018/04/28 15:51:15 publisher start
2018/04/28 15:51:16 [INFO] send message. msg: &{Hello. No0 1524898276122079800}
2018/04/28 15:51:16 [INFO] send message. msg: &{Hello. No1 1524898276122579800}
2018/04/28 15:51:16 [INFO] send message. msg: &{Hello. No2 1524898276123079900}
2018/04/28 15:51:16 publisher stop
consumer1
$: go run main.go -rabbitmqUrl="xxx.xxx.xxx.xxx:5672"
2018/04/28 15:51:03 consumer start
2018/04/28 15:51:16 [INFO] success consumed. tag: 1, msg: &{Hello. No0 1524898276122079800}
2018/04/28 15:51:16 [INFO] success consumed. tag: 2, msg: &{Hello. No1 1524898276122579800}
2018/04/28 15:51:16 [INFO] success consumed. tag: 3, msg: &{Hello. No2 1524898276123079900}
2018/04/28 15:51:31 consumer stop
consumer2
$: go run main.go -rabbitmqUrl="xxx.xxx.xxx.xxx:5672"
2018/04/28 15:51:07 consumer start
2018/04/28 15:51:16 [INFO] success consumed. tag: 1, msg: &{Hello. No0 1524898276122079800}
2018/04/28 15:51:16 [INFO] success consumed. tag: 2, msg: &{Hello. No1 1524898276122579800}
2018/04/28 15:51:16 [INFO] success consumed. tag: 3, msg: &{Hello. No2 1524898276123079900}
2018/04/28 15:51:34 consumer stop

といった感じに出力されます。

終わりに

RabbitMQのTutorialは色々な言語の例が豊富で良いと思いました(公式に書いてあると信用できる感があるし・・・)。
次はScalaかC#で書く予定。

3
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
5