13
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Golang / go-nsqからnsqを利用してみる

Last updated at Posted at 2016-12-16

nsqとは

分散型メッセージングミドルウェアの詳細比較 http://postd.cc/dissecting-message-queues/

NSQはBitlyが構築したメッセージングプラットフォームです。

メッセージを受信し、格納し、クライアントに送り届けるデーモンをnsqdといいます。デーモンはスタンドアロンでも動作しますが、NSQは分散型トポロジとして動作するようデザインされていて、nsqlookupdという別のデーモンを活用します。nsqlookupdはnsqdインスタンスのためのサービスを検出する仕組みです。NSQはまた、nsqadminというリアルタイムのクラスタ統計を表示し、キューの削除やトピックの処理など様々な管理業務を実行する役割を果たすWeb UIも提供します。

NSQ-Centric Architecture http://www.slideshare.net/guregu/nsqcentric-architecture-gocon-autumn-2014

実行環境

CentOS 7.2
nsqlookupd v0.3.8 (built w/go1.6.2)
nsqd v0.3.8 (built w/go1.6.2)
nsqadmin v0.3.8 (built w/go1.6.2)
go version go1.6.3 linux/amd64
nsqio/go-nsq 1.0.6

まずnsqのインストール

Linux系はバイナリが提供されているのでこちらからダウンロードします。
http://nsq.io/deployment/installing.html

$ cd /usr/local/src/
$ wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-0.3.8.linux-amd64.go1.6.2.tar.gz
$ tar xzvf nsq-0.3.8.linux-amd64.go1.6.2.tar.gz
$ cd nsq-0.3.8.linux-amd64.go1.6.2/
$ tar cf - . | (cd /usr/local; tar xvf -)
$ /usr/local/bin/
coraenv  dbhome  nsq_pubsub  nsq_stat  nsq_tail  nsq_to_file  nsq_to_http  nsq_to_nsq  nsqadmin  nsqd  nsqlookupd  oraenv  to_nsq  

ダウンロードしてPATHが通っている適当なディレクトリに置きます。

とりあえず起動してnsqだけで動作確認

クイックスタート参照
http://nsq.io/overview/quick_start.html

ターミナルを三つ立ち上げて必要なdeamonをそれぞれ起動します。

nsqlookupd

$ nsqlookupd

デフォルトでバインドするアドレス、ポートは 0.0.0.0:4160(TCP接続用) と 0.0.0.0:4161(HTTP接続用)

nsqd

$ nsqd --lookupd-tcp-address=127.0.0.1:4160

デフォルトでバインドするアドレス、ポートは 0.0.0.0:4150(TCP接続用) と 0.0.0.0:4151(HTTP接続用)

nsqadmin

$ nsqadmin --lookupd-http-address=127.0.0.1:4161

ブラウザで利用する管理画面。デフォルトでバインドするアドレス、ポートは 0.0.0.0.:4171

動作確認

起動後、curlでnsqdにtopic "test" にHTTP接続で「hello world 1」の文字列をメッセージとして送ります。

$ curl -d 'hello world 1' 'http://127.0.0.1:4151/put?topic=test'

送られたメッセージをnsq_to_fileコマンドを使い、/tmp/以下のディレクトリにファイルとして吐き出します。

$ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161

ちゃんと出力されたか確認します。

$ cat /tmp/test.v10-0-0-1.2016-12-13_14.log
hello world 1

めでたくファイルにメッセージが出力されました。

go-nsqの利用

golangからnsq関連を取り扱うオフィシャルのライブラリがgo-nsqです。
ライブラリをgo getします。

$ go get github.com/nsqio/go-nsq

GoDoc https://godoc.org/github.com/nsqio/go-nsq

Producer(送る側)のサンプルコードを書く

"testtopic"というトピックに"test message"というメッセージを送るだけのProducerのコードを書きます。

producer_test.go
package main

import (
	nsq "github.com/nsqio/go-nsq"
	"reflect"
	"unsafe"
)

func s2b(s string) []byte {
	sh := (*reflect.StringHeader)(unsafe.Pointer(&s))
	bh := reflect.SliceHeader{sh.Data, sh.Len, sh.Len}
	return *(*[]byte)(unsafe.Pointer(&bh))
}

func main() {
	cfg := nsq.NewConfig()
	cfg.ClientID += "_P"
	producer, err := nsq.NewProducer("127.0.0.1:4150", cfg)
	defer producer.Stop()
	if err != nil {
		panic(err)
	}

	msgb := s2b("test message")
	err = producer.Publish("testtopic", msgb)
	if err != nil {
		panic(err)
	}
}

nsqでのメッセージは []byte で送る必要があるので、stringsから[]byteへの変換をする必要があります。このためメモリコピーなしの型変換をする自作関数s2bを利用しています。

Consumer(受け取る側)のサンプルコードを書く

"testtopic"というトピックで"testch"チャンネル経由でメッセージ受信待ちをして受信完了したら受信した文字列を出力し終了するコードを書きます。

consumer.go
package main

import (
	"fmt"
	nsq "github.com/nsqio/go-nsq"
	"reflect"
	"unsafe"
)

func b2s(b []byte) string {
	bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
	sh := reflect.StringHeader{bh.Data, bh.Len}
	return *(*string)(unsafe.Pointer(&sh))
}
func hello(s string) {
	fmt.Println(s)
}

func main() {

	q1 := make(chan int)
	q2 := make(chan int)

	cfg := nsq.NewConfig()
	cfg.ClientID += "_C"

	//受信する処理はgo routineにしたほうが都合がよいので関数にする
	f := func() {
		consumer, err := nsq.NewConsumer("testtopic", "testch", cfg)
		defer consumer.Stop()
		if err != nil {
			panic(err)
		}
		//メッセージを受信したタイミングでコールされる
		consumer.AddHandler(nsq.HandlerFunc(func(m *nsq.Message) error {
			//メッセージを[]byteからstringに変換
			s := b2s(m.Body)
			//メッセージを処理する関数に引き渡し
			hello(s)

			//メッセージ受信完了通知
			q1 <- 1
			return nil
		}))

		err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
		if err != nil {
			panic(err)
		}
		//ハンドラからのメッセージ受信完了待ち
		<-q1
		// メッセージ受信完了をmain()に通知
		q2 <- 1
	}
	// メッセージ受信処理をgo routineで実行
	go f()
       
	// go routineからのメッセージ受信完了連絡待ち
	<-q2
}

受け取ったメッセージは []byte 型なので strings への変換用に自作関数b2sを利用しています。

動作確認

consumerを起動

$ go run consumer.go 
2016/12/16 10:18:21 INF    1 [testtopic/testch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=testtopic
2016/12/16 10:18:21 INF    1 [testtopic/testch] (v10-0-0-1.localhost:4150) connecting to nsqd

別ターミナルでproducerを起動

$ go run producer.go 
2016/12/16 10:27:04 INF    1 (127.0.0.1:4150) connecting to nsqd
2016/12/16 10:27:04 INF    1 stopping
2016/12/16 10:27:04 INF    1 exiting router
$

consumerがメッセージを出力して自動終了

2016/12/16 10:27:01 INF    1 [testtopic/testch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=testtopic
2016/12/16 10:27:01 INF    1 [testtopic/testch] (v10-0-0-1.localhost:4150) connecting to nsqd
test message
2016/12/16 10:27:04 INF    1 [testtopic/testch] stopping...
$

まとめ

nsq & go-nsqをgo routine、channelと組み合わせることで、いい感じの非同期処理のメッセージングを利用したものが作れそうであります。

バックエンド処理の観点からは、単純なP2Pでのサブシステム間の通信の場合、単純にnet/httpを使った方がラクでしょうけど、並列のサーバインスタンス群に対して一斉通知を出す等の処理はこちらの利用がよさげです。

13
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?