始めに
Kafkaを使うことになりそうなのでいくつかの言語ごとにProducerとConsumerを記述してみる。
今回はGolang編。Kafkaへの接続にはsarama [https://github.com/Shopify/sarama] を利用します。
※Kafka自体の解説はしません。
##構成##
Golang: 1.9
sarama: 1.16.0
VSCode: 1.22
##成果物##
https://github.com/lightstaff/go-kafka-example
Producer
saramaのテストを参考に、プロデューサーから書いてみます。saramaには「SyncProducer」と「AsyncProducer」があるみたいですが、今回は別スレッドで動作させるので「AsynProducer」にしてみました。
// Producer部分のみ抜粋
var (
// kafkaのアドレス
bootstrapServers = flag.String("bootstrapServers", "localhost:9092", "kafka address")
)
// SendMessage 送信メッセージ
type SendMessage struct {
Message string `json:"message"`
Timestamp int64 `json:"timestamp"`
}
func main() {
flag.Parse()
if *bootstrapServers == "" {
flag.PrintDefaults()
os.Exit(1)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
brokers := strings.Split(*bootstrapServers, ",")
config := sarama.NewConfig()
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 3
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
panic(err)
}
defer producer.AsyncClose()
// プロデューサールーチン
go func() {
PRODUCER_FOR:
for {
time.Sleep(10000 * time.Millisecond)
timestamp := time.Now().UnixNano()
send := &SendMessage{
Message: "Hello",
Timestamp: timestamp,
}
jsBytes, err := json.Marshal(send)
if err != nil {
panic(err)
}
msg := &sarama.ProducerMessage{
Topic: "test.A",
Key: sarama.StringEncoder(strconv.FormatInt(timestamp, 10)),
Value: sarama.StringEncoder(string(jsBytes)),
}
producer.Input() <- msg
select {
case <-producer.Successes():
fmt.Println(fmt.Sprintf("success send. message: %s, timestamp: %d", send.Message, send.Timestamp))
case err := <-producer.Errors():
fmt.Println(fmt.Sprintf("fail send. reason: %v", err.Msg))
case <-ctx.Done():
break PRODUCER_FOR
}
}
}()
fmt.Println("go-kafka-example start.")
<-signals
fmt.Println("go-kafka-example stop.")
}
10秒に一回、「SendMessage」をjson化したものを送信します。
producer.Input() <- msg
が少し気持ち悪い気がする・・・。
Consumer
次にコンシューマーを書いてみます。
// Consumer部分のみ抜粋
var (
// kafkaのアドレス
bootstrapServers = flag.String("bootstrapServers", "localhost:9092", "kafka address")
)
// ConsumedMessage 受信メッセージ
type ConsumedMessage struct {
Message string `json:"message"`
Timestamp int64 `json:"timestamp"`
}
func main() {
flag.Parse()
if *bootstrapServers == "" {
flag.PrintDefaults()
os.Exit(1)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
brokers := strings.Split(*bootstrapServers, ",")
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
consumer, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
}
defer func() {
if err := consumer.Close(); err != nil {
panic(err)
}
}()
partition, err := consumer.ConsumePartition("test.A", 0, sarama.OffsetNewest)
if err != nil {
panic(err)
}
// コンシューマールーチン
go func() {
CONSUMER_FOR:
for {
select {
case msg := <-partition.Messages():
var consumed ConsumedMessage
if err := json.Unmarshal(msg.Value, &consumed); err != nil {
fmt.Println(err)
}
fmt.Println(fmt.Sprintf("consumed message. message: %s, timestamp: %d", consumed.Message, consumed.Timestamp))
case <-ctx.Done():
break CONSUMER_FOR
}
}
}()
fmt.Println("go-kafka-example start.")
<-signals
fmt.Println("go-kafka-example stop.")
}
コンシューマーは「Consumer」構造体から更にパーテーションを生成する形になっているようです。メッセージに受信を待って、jsonから「ConsumedMessage」に変換しています。
実行
プロデューサーとコンシューマーをごっちゃにしたもの(main.go)をビルドして実行するとコンソールに下記のように出力されるはずです(30秒動作させて停止しています)。
go-kafka-example start.
success send. message: Hello, timestamp: 1523932794754645700
consumed message. message: Hello, timestamp: 1523932794754645700
success send. message: Hello, timestamp: 1523932804761916500
consumed message. message: Hello, timestamp: 1523932804761916500
success send. message: Hello, timestamp: 1523932814765186700
consumed message. message: Hello, timestamp: 1523932814765186700
go-kafka-example stop.
終わりに
Producer・Consumer共にgoroutineの閉じた世界で書いてますが、外に出したい場合はchannelで上手いことする必要があります。
あとoffset周りをどうこうしたい場合はもう少し複雑化すると思われます(参考)。
追記
別のクライアントライブラリとしてconfluent製のconfluent-kafka-goというのもありますが、こちらはlibrdkafkaというCライブラリのインストールが必須のため今回は試していません。
試してみました。内容はsarama版と大差ないので投稿はしませんが、ソースのリンクを置いておきます。
https://github.com/lightstaff/confluent-kafka-go-example