nsqとは
分散型メッセージングミドルウェアの詳細比較 http://postd.cc/dissecting-message-queues/
NSQはBitlyが構築したメッセージングプラットフォームです。
メッセージを受信し、格納し、クライアントに送り届けるデーモンをnsqdといいます。デーモンはスタンドアロンでも動作しますが、NSQは分散型トポロジとして動作するようデザインされていて、nsqlookupdという別のデーモンを活用します。nsqlookupdはnsqdインスタンスのためのサービスを検出する仕組みです。NSQはまた、nsqadminというリアルタイムのクラスタ統計を表示し、キューの削除やトピックの処理など様々な管理業務を実行する役割を果たすWeb UIも提供します。
NSQ-Centric Architecture http://www.slideshare.net/guregu/nsqcentric-architecture-gocon-autumn-2014
実行環境
CentOS 7.2
nsqlookupd v0.3.8 (built w/go1.6.2)
nsqd v0.3.8 (built w/go1.6.2)
nsqadmin v0.3.8 (built w/go1.6.2)
go version go1.6.3 linux/amd64
nsqio/go-nsq 1.0.6
まずnsqのインストール
Linux系はバイナリが提供されているのでこちらからダウンロードします。
http://nsq.io/deployment/installing.html
$ cd /usr/local/src/
$ wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-0.3.8.linux-amd64.go1.6.2.tar.gz
$ tar xzvf nsq-0.3.8.linux-amd64.go1.6.2.tar.gz
$ cd nsq-0.3.8.linux-amd64.go1.6.2/
$ tar cf - . | (cd /usr/local; tar xvf -)
$ /usr/local/bin/
coraenv dbhome nsq_pubsub nsq_stat nsq_tail nsq_to_file nsq_to_http nsq_to_nsq nsqadmin nsqd nsqlookupd oraenv to_nsq
ダウンロードしてPATHが通っている適当なディレクトリに置きます。
とりあえず起動してnsqだけで動作確認
クイックスタート参照
http://nsq.io/overview/quick_start.html
ターミナルを三つ立ち上げて必要なdeamonをそれぞれ起動します。
nsqlookupd
$ nsqlookupd
デフォルトでバインドするアドレス、ポートは 0.0.0.0:4160(TCP接続用) と 0.0.0.0:4161(HTTP接続用)
nsqd
$ nsqd --lookupd-tcp-address=127.0.0.1:4160
デフォルトでバインドするアドレス、ポートは 0.0.0.0:4150(TCP接続用) と 0.0.0.0:4151(HTTP接続用)
nsqadmin
$ nsqadmin --lookupd-http-address=127.0.0.1:4161
ブラウザで利用する管理画面。デフォルトでバインドするアドレス、ポートは 0.0.0.0.:4171
動作確認
起動後、curlでnsqdにtopic "test" にHTTP接続で「hello world 1」の文字列をメッセージとして送ります。
$ curl -d 'hello world 1' 'http://127.0.0.1:4151/put?topic=test'
送られたメッセージをnsq_to_fileコマンドを使い、/tmp/以下のディレクトリにファイルとして吐き出します。
$ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
ちゃんと出力されたか確認します。
$ cat /tmp/test.v10-0-0-1.2016-12-13_14.log
hello world 1
めでたくファイルにメッセージが出力されました。
go-nsqの利用
golangからnsq関連を取り扱うオフィシャルのライブラリがgo-nsqです。
ライブラリをgo getします。
$ go get github.com/nsqio/go-nsq
GoDoc https://godoc.org/github.com/nsqio/go-nsq
Producer(送る側)のサンプルコードを書く
"testtopic"というトピックに"test message"というメッセージを送るだけのProducerのコードを書きます。
package main
import (
nsq "github.com/nsqio/go-nsq"
"reflect"
"unsafe"
)
func s2b(s string) []byte {
sh := (*reflect.StringHeader)(unsafe.Pointer(&s))
bh := reflect.SliceHeader{sh.Data, sh.Len, sh.Len}
return *(*[]byte)(unsafe.Pointer(&bh))
}
func main() {
cfg := nsq.NewConfig()
cfg.ClientID += "_P"
producer, err := nsq.NewProducer("127.0.0.1:4150", cfg)
defer producer.Stop()
if err != nil {
panic(err)
}
msgb := s2b("test message")
err = producer.Publish("testtopic", msgb)
if err != nil {
panic(err)
}
}
nsqでのメッセージは []byte で送る必要があるので、stringsから[]byteへの変換をする必要があります。このためメモリコピーなしの型変換をする自作関数s2bを利用しています。
Consumer(受け取る側)のサンプルコードを書く
"testtopic"というトピックで"testch"チャンネル経由でメッセージ受信待ちをして受信完了したら受信した文字列を出力し終了するコードを書きます。
package main
import (
"fmt"
nsq "github.com/nsqio/go-nsq"
"reflect"
"unsafe"
)
func b2s(b []byte) string {
bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
sh := reflect.StringHeader{bh.Data, bh.Len}
return *(*string)(unsafe.Pointer(&sh))
}
func hello(s string) {
fmt.Println(s)
}
func main() {
q1 := make(chan int)
q2 := make(chan int)
cfg := nsq.NewConfig()
cfg.ClientID += "_C"
//受信する処理はgo routineにしたほうが都合がよいので関数にする
f := func() {
consumer, err := nsq.NewConsumer("testtopic", "testch", cfg)
defer consumer.Stop()
if err != nil {
panic(err)
}
//メッセージを受信したタイミングでコールされる
consumer.AddHandler(nsq.HandlerFunc(func(m *nsq.Message) error {
//メッセージを[]byteからstringに変換
s := b2s(m.Body)
//メッセージを処理する関数に引き渡し
hello(s)
//メッセージ受信完了通知
q1 <- 1
return nil
}))
err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
if err != nil {
panic(err)
}
//ハンドラからのメッセージ受信完了待ち
<-q1
// メッセージ受信完了をmain()に通知
q2 <- 1
}
// メッセージ受信処理をgo routineで実行
go f()
// go routineからのメッセージ受信完了連絡待ち
<-q2
}
受け取ったメッセージは []byte 型なので strings への変換用に自作関数b2sを利用しています。
動作確認
consumerを起動
$ go run consumer.go
2016/12/16 10:18:21 INF 1 [testtopic/testch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=testtopic
2016/12/16 10:18:21 INF 1 [testtopic/testch] (v10-0-0-1.localhost:4150) connecting to nsqd
別ターミナルでproducerを起動
$ go run producer.go
2016/12/16 10:27:04 INF 1 (127.0.0.1:4150) connecting to nsqd
2016/12/16 10:27:04 INF 1 stopping
2016/12/16 10:27:04 INF 1 exiting router
$
consumerがメッセージを出力して自動終了
2016/12/16 10:27:01 INF 1 [testtopic/testch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=testtopic
2016/12/16 10:27:01 INF 1 [testtopic/testch] (v10-0-0-1.localhost:4150) connecting to nsqd
test message
2016/12/16 10:27:04 INF 1 [testtopic/testch] stopping...
$
まとめ
nsq & go-nsqをgo routine、channelと組み合わせることで、いい感じの非同期処理のメッセージングを利用したものが作れそうであります。
バックエンド処理の観点からは、単純なP2Pでのサブシステム間の通信の場合、単純にnet/http
を使った方がラクでしょうけど、並列のサーバインスタンス群に対して一斉通知を出す等の処理はこちらの利用がよさげです。