#始めに
DockerでKafkaが上手いこと動かん!!・・・というしょうもない理由でRabbitMQにも手を出してみる。
今回はGoで書きます。クライアントはgithub.com/streadway/amqpを使います。
※RabbitMQ自体の解説はしません。
##構成
Golang: 1.9
RabbitMQ: 3.7(Docker repository office)
streadway/amqp: master branch
##成果物
https://github.com/lightstaff/go-rabbitmq-example
#モデル
とりあえず適当にメッセージのモデルを定義します。
package protocol
// Protocol メッセージモデル
type Protocol struct {
Message string
Timestamp int64
}
#Publisher
続いてPublisher。KafkaではProducerと言ってた。
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"time"
"github.com/lightstaff/go-rabbitmq-example/protocol"
"github.com/streadway/amqp"
)
var (
// RabbitMQのURLはパラメータで指定
rabbitmqURL = flag.String("rabbitmqUrl", "localhost:5672", "Your RabbtMQ URL")
)
func main() {
flag.Parse()
if *rabbitmqURL == "" {
log.Fatalln("[ERROR] require rabbitmqUrl")
}
log.Println("publisher start")
// amqpだから・・・
url := fmt.Sprintf("amqp://%s", *rabbitmqURL)
// ダイアルして・・・
conn, err := amqp.Dial(url)
if err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
defer conn.Close()
// チャンネル開いて・・・
ch, err := conn.Channel()
if err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
defer ch.Close()
// Exchangeを作って・・・
if err := ch.ExchangeDeclare("test", "fanout", false, true, false, false, nil); err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
// とりあえず3回・・・
for i := 0; i < 3; i++ {
// メッセージ作って・・・
p := &protocol.Protocol{
Message: fmt.Sprintf("Hello. No%d", i),
Timestamp: time.Now().UnixNano(),
}
// バイナリ化して・・・
bytes, err := json.Marshal(p)
if err != nil {
log.Printf("[ERROR] %s", err.Error())
continue
}
// Publish!!
if err := ch.Publish("test", "", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: bytes,
}); err != nil {
log.Printf("[ERROR] %s", err.Error())
continue
}
log.Printf("[INFO] send message. msg: %v", p)
}
log.Println("publisher stop")
}
内容はコードのコメントの通りです。https://www.rabbitmq.com/confirms.html なんてことも出来るらしいけど今回は省略。
#Consumer
続いてConsumer。
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"os/signal"
"github.com/lightstaff/go-rabbitmq-example/protocol"
"github.com/streadway/amqp"
)
var (
// RabbitMQのURLはパラメータで指定
rabbitmqURL = flag.String("rabbitmqUrl", "localhost:5672", "Your RabbtMQ URL")
)
func main() {
flag.Parse()
if *rabbitmqURL == "" {
log.Fatalln("[ERROR] require rabbitmqUrl")
}
log.Println("consumer start")
// amqpだから・・・
url := fmt.Sprintf("amqp://%s", *rabbitmqURL)
// goroutineかけるので・・・
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 終了待機するので・・・
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// ダイアルして・・・
conn, err := amqp.Dial(url)
if err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
defer conn.Close()
// チャンネル開いて・・・
ch, err := conn.Channel()
if err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
defer ch.Close()
// Exchangeを作って・・・
if err := ch.ExchangeDeclare("test", "fanout", false, true, false, false, nil); err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
// Queueを作って・・・
q, err := ch.QueueDeclare("", false, true, true, false, nil)
if err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
// QueueにExchangeをBindして・・・
if err := ch.QueueBind(q.Name, "", "test", false, nil); err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
// Consume!!
msgs, err := ch.Consume(q.Name, "", true, true, false, false, nil)
if err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
// メッセージ受付ルーチン
go func() {
CONSUMER_FOR:
for {
select {
case <-ctx.Done():
break CONSUMER_FOR
case m, ok := <-msgs:
if ok {
// モデル化して・・・
var p protocol.Protocol
if err := json.Unmarshal(m.Body, &p); err != nil {
log.Printf("[ERROR] %s", err.Error())
continue CONSUMER_FOR
}
log.Printf("[INFO] success consumed. tag: %d, body: %v", m.DeliveryTag, &p)
}
}
}
}()
<-signals
log.Println("consumer stop")
}
これまた内容はコードのコメントの通りですが、Publisherに比べ手順が増えてます。Publisher・Consumer共にch.ExchangeDeclare(...)
でExchangeの作成を呼んでますが、GoDocにExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags.とあるので既に作られていた場合はフラグのチェックが行われる模様です(PublisherとConsumerで違っていたらエラーを吐きます)。
#実行
PublisherとConsumerをそれぞれ別のコンソールで実行してみます。ついでなんでConsumerは2個起動してみます。
$: go run main.go -rabbitmqUrl="xxx.xxx.xxx.xxx:5672"
2018/04/28 15:51:15 publisher start
2018/04/28 15:51:16 [INFO] send message. msg: &{Hello. No0 1524898276122079800}
2018/04/28 15:51:16 [INFO] send message. msg: &{Hello. No1 1524898276122579800}
2018/04/28 15:51:16 [INFO] send message. msg: &{Hello. No2 1524898276123079900}
2018/04/28 15:51:16 publisher stop
$: go run main.go -rabbitmqUrl="xxx.xxx.xxx.xxx:5672"
2018/04/28 15:51:03 consumer start
2018/04/28 15:51:16 [INFO] success consumed. tag: 1, msg: &{Hello. No0 1524898276122079800}
2018/04/28 15:51:16 [INFO] success consumed. tag: 2, msg: &{Hello. No1 1524898276122579800}
2018/04/28 15:51:16 [INFO] success consumed. tag: 3, msg: &{Hello. No2 1524898276123079900}
2018/04/28 15:51:31 consumer stop
$: go run main.go -rabbitmqUrl="xxx.xxx.xxx.xxx:5672"
2018/04/28 15:51:07 consumer start
2018/04/28 15:51:16 [INFO] success consumed. tag: 1, msg: &{Hello. No0 1524898276122079800}
2018/04/28 15:51:16 [INFO] success consumed. tag: 2, msg: &{Hello. No1 1524898276122579800}
2018/04/28 15:51:16 [INFO] success consumed. tag: 3, msg: &{Hello. No2 1524898276123079900}
2018/04/28 15:51:34 consumer stop
といった感じに出力されます。
#終わりに
RabbitMQのTutorialは色々な言語の例が豊富で良いと思いました(公式に書いてあると信用できる感があるし・・・)。
次はScalaかC#で書く予定。