12
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

KafkaでProducerとConsumer(Golang編)

Last updated at Posted at 2018-04-17

始めに

Kafkaを使うことになりそうなのでいくつかの言語ごとにProducerとConsumerを記述してみる。
今回はGolang編。Kafkaへの接続にはsarama [https://github.com/Shopify/sarama] を利用します。

※Kafka自体の解説はしません。

##構成##
Golang: 1.9
sarama: 1.16.0
VSCode: 1.22

##成果物##
https://github.com/lightstaff/go-kafka-example

Producer

saramaのテストを参考に、プロデューサーから書いてみます。saramaには「SyncProducer」と「AsyncProducer」があるみたいですが、今回は別スレッドで動作させるので「AsynProducer」にしてみました。

main.go
// Producer部分のみ抜粋

var (
	// kafkaのアドレス
	bootstrapServers = flag.String("bootstrapServers", "localhost:9092", "kafka address")
)

// SendMessage 送信メッセージ
type SendMessage struct {
	Message   string `json:"message"`
	Timestamp int64  `json:"timestamp"`
}

func main() {
	flag.Parse()

	if *bootstrapServers == "" {
		flag.PrintDefaults()
		os.Exit(1)
	}
        
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	brokers := strings.Split(*bootstrapServers, ",")
	config := sarama.NewConfig()

	config.Producer.Return.Errors = true
	config.Producer.Return.Successes = true
	config.Producer.Retry.Max = 3

	producer, err := sarama.NewAsyncProducer(brokers, config)
	if err != nil {
		panic(err)
	}

	defer producer.AsyncClose()

	// プロデューサールーチン
	go func() {
	PRODUCER_FOR:
		for {
			time.Sleep(10000 * time.Millisecond)

			timestamp := time.Now().UnixNano()

			send := &SendMessage{
				Message:   "Hello",
				Timestamp: timestamp,
			}

			jsBytes, err := json.Marshal(send)
			if err != nil {
				panic(err)
			}

			msg := &sarama.ProducerMessage{
				Topic: "test.A",
				Key:   sarama.StringEncoder(strconv.FormatInt(timestamp, 10)),
				Value: sarama.StringEncoder(string(jsBytes)),
			}

			producer.Input() <- msg

			select {
			case <-producer.Successes():
				fmt.Println(fmt.Sprintf("success send. message: %s, timestamp: %d", send.Message, send.Timestamp))
			case err := <-producer.Errors():
				fmt.Println(fmt.Sprintf("fail send. reason: %v", err.Msg))
			case <-ctx.Done():
				break PRODUCER_FOR
			}
		}
	}()

	fmt.Println("go-kafka-example start.")

	<-signals

	fmt.Println("go-kafka-example stop.")
}

10秒に一回、「SendMessage」をjson化したものを送信します。
producer.Input() <- msgが少し気持ち悪い気がする・・・。

Consumer

次にコンシューマーを書いてみます。

main.go
// Consumer部分のみ抜粋

var (
	// kafkaのアドレス
	bootstrapServers = flag.String("bootstrapServers", "localhost:9092", "kafka address")
)

// ConsumedMessage 受信メッセージ
type ConsumedMessage struct {
	Message   string `json:"message"`
	Timestamp int64  `json:"timestamp"`
}

func main() {
	flag.Parse()

	if *bootstrapServers == "" {
		flag.PrintDefaults()
		os.Exit(1)
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	brokers := strings.Split(*bootstrapServers, ",")
	config := sarama.NewConfig()

	config.Consumer.Return.Errors = true

	consumer, err := sarama.NewConsumer(brokers, config)
	if err != nil {
		panic(err)
	}

	defer func() {
		if err := consumer.Close(); err != nil {
			panic(err)
		}
	}()

	partition, err := consumer.ConsumePartition("test.A", 0, sarama.OffsetNewest)
	if err != nil {
		panic(err)
	}

	// コンシューマールーチン
	go func() {
	CONSUMER_FOR:
		for {
			select {
			case msg := <-partition.Messages():
				var consumed ConsumedMessage
				if err := json.Unmarshal(msg.Value, &consumed); err != nil {
					fmt.Println(err)
				}
				fmt.Println(fmt.Sprintf("consumed message. message: %s, timestamp: %d", consumed.Message, consumed.Timestamp))
			case <-ctx.Done():
				break CONSUMER_FOR
			}
		}
	}()

	fmt.Println("go-kafka-example start.")

	<-signals

	fmt.Println("go-kafka-example stop.")
}

コンシューマーは「Consumer」構造体から更にパーテーションを生成する形になっているようです。メッセージに受信を待って、jsonから「ConsumedMessage」に変換しています。

実行

プロデューサーとコンシューマーをごっちゃにしたもの(main.go)をビルドして実行するとコンソールに下記のように出力されるはずです(30秒動作させて停止しています)。

go-kafka-example start.
success send. message: Hello, timestamp: 1523932794754645700
consumed message. message: Hello, timestamp: 1523932794754645700
success send. message: Hello, timestamp: 1523932804761916500
consumed message. message: Hello, timestamp: 1523932804761916500
success send. message: Hello, timestamp: 1523932814765186700
consumed message. message: Hello, timestamp: 1523932814765186700
go-kafka-example stop.

終わりに

Producer・Consumer共にgoroutineの閉じた世界で書いてますが、外に出したい場合はchannelで上手いことする必要があります。
あとoffset周りをどうこうしたい場合はもう少し複雑化すると思われます(参考)。

追記

別のクライアントライブラリとしてconfluent製のconfluent-kafka-goというのもありますが、こちらはlibrdkafkaというCライブラリのインストールが必須のため今回は試していません

試してみました。内容はsarama版と大差ないので投稿はしませんが、ソースのリンクを置いておきます。
https://github.com/lightstaff/confluent-kafka-go-example

12
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?