5
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Mac OS X環境で、Apache Kafkaの基本動作を試してみる

Last updated at Posted at 2016-01-09

BigDataのリアルタイムデータ処理基盤として活用されている、Apache Kafkaの基本動作を試してみました。

◼︎ Kafka動作環境をつくる

  • kafka本体をインストール
$ brew install Caskroom/cask/java
$ brew install kafka
  • kafka設定ファイル編集
$ vi /usr/local/etc/kafka/server.properties
  ...
broker.id=1
  ...
  • zookeeperを起動
$ zkServer start
JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
  • kafkaを起動
$ kafka-server-start.sh /usr/local/etc/kafka/server.properties
  • topic: "kafkaesque"を作成 & 確認
$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaesque
Created topic "kafkaesque".

$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafkaesque
Topic:kafkaesque	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: kafkaesque	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
  • kaka-pythonライブラリをインストール
$ pip install kafka-python==0.9.4
  • Producer側サンプルアプリを配備
kafka_producer.py
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer
from time import sleep
from datetime import datetime

kafka = KafkaClient("localhost:9092")

producer = SimpleProducer(kafka)

while 1:
  # "kafkaesque" is the name of our topic
  producer.send_messages("kafkaesque", "Metamorphosis! " + str(datetime.now().time()) )
  sleep(1)

◼︎ さっそく、動かしてみる

  • Producer側サンプルアプリ起動
$ python kafka_producer.py 

  • Consumer用コマンドを起動
$ kafka-console-consumer.sh --zookeeper localhost --topic kafkaesque
Metamorphosis! 14:46:27.113143
Metamorphosis! 14:46:28.119675
Metamorphosis! 14:46:29.125022
Metamorphosis! 14:46:30.126464
Metamorphosis! 14:46:31.131289
Metamorphosis! 14:46:32.137694
Metamorphosis! 14:46:33.144169
Metamorphosis! 14:46:34.150528
Metamorphosis! 14:46:35.156905
Metamorphosis! 14:46:36.161729
^CConsumed 10 messages

Producer側から刻々と送信されてくる時刻等のデータを、Cousumer側で随時表示されることが確認できました。

◼︎ Consumer側のgolangサンプルアプリ化

  • Consumer環境作成
$ go get github.com/elodina/go_kafka_client
  • Consumer側サンプルアプリ
kafka_consumer.go
package main

import (
	"fmt"
	"os"
	"os/signal"
	kafkaClient "github.com/elodina/go_kafka_client"
	"strconv"
	"strings"
	"time"
)


func main() {
	config, topic, numConsumers := resolveConfig()

        ctrlc := make(chan os.Signal, 1)
        signal.Notify(ctrlc, os.Interrupt)

        consumers := make([]*kafkaClient.Consumer, numConsumers)
        for i := 0; i < numConsumers; i++ {
                consumers[i] = startNewConsumer(*config, topic)
                time.Sleep(10 * time.Second)
        }

        <-ctrlc
        fmt.Println("Shutdown triggered, closing all alive consumers")
        for _, consumer := range consumers {
                <-consumer.Close()
        }
        fmt.Println("Successfully shut down all consumers")
}

func startNewConsumer(config kafkaClient.ConsumerConfig, topic string) *kafkaClient.Consumer {
        config.Strategy = GetStrategy(config.Consumerid)
        config.WorkerFailureCallback = FailedCallback
        config.WorkerFailedAttemptCallback = FailedAttemptCallback
        consumer := kafkaClient.NewConsumer(&config)
        topics := map[string]int{topic: config.NumConsumerFetchers}
        go func() {
                consumer.StartStatic(topics)
        }()
        return consumer
}


func GetStrategy(consumerId string) func(*kafkaClient.Worker, *kafkaClient.Message, kafkaClient.TaskId) kafkaClient.WorkerResult {
        return func(_ *kafkaClient.Worker, msg *kafkaClient.Message, id kafkaClient.TaskId) kafkaClient.WorkerResult {
                fmt.Printf("msg.Value=[%s]\n", msg.Value)

                return kafkaClient.NewSuccessfulResult(id)
        }
}

func FailedCallback(wm *kafkaClient.WorkerManager) kafkaClient.FailedDecision {
        kafkaClient.Info("main", "Failed callback")

        return kafkaClient.DoNotCommitOffsetAndStop
}

func FailedAttemptCallback(task *kafkaClient.Task, result kafkaClient.WorkerResult) kafkaClient.FailedDecision {
        kafkaClient.Info("main", "Failed attempt")

        return kafkaClient.CommitOffsetAndContinue
}

func setLogLevel(logLevel string) {
        var level kafkaClient.LogLevel
        switch strings.ToLower(logLevel) {
                case "trace":
                level = kafkaClient.TraceLevel
                case "debug":
                level = kafkaClient.DebugLevel
                case "info":
                level = kafkaClient.InfoLevel
                case "warn":
                level = kafkaClient.WarnLevel
                case "error":
                level = kafkaClient.ErrorLevel
                case "critical":
                level = kafkaClient.CriticalLevel
                default:
                level = kafkaClient.InfoLevel
        }
        kafkaClient.Logger = kafkaClient.NewDefaultLogger(level)
}

func resolveConfig() (*kafkaClient.ConsumerConfig, string, int) {
	rawConfig, err := kafkaClient.LoadConfiguration("consumers.properties")
	if err != nil {
		panic("Failed to load configuration file")
	}
	logLevel := rawConfig["log_level"]
	setLogLevel(logLevel) 
	numConsumers, _ := strconv.Atoi(rawConfig["num_consumers"])
	zkTimeout, _ := time.ParseDuration(rawConfig["zookeeper_timeout"])

	numWorkers, _ := strconv.Atoi(rawConfig["num_workers"])
	maxWorkerRetries, _ := strconv.Atoi(rawConfig["max_worker_retries"])
	workerBackoff, _ := time.ParseDuration(rawConfig["worker_backoff"])
	workerRetryThreshold, _ := strconv.Atoi(rawConfig["worker_retry_threshold"])
	workerConsideredFailedTimeWindow, _ := time.ParseDuration(rawConfig["worker_considered_failed_time_window"])
	workerTaskTimeout, _ := time.ParseDuration(rawConfig["worker_task_timeout"])
	workerManagersStopTimeout, _ := time.ParseDuration(rawConfig["worker_managers_stop_timeout"])

	rebalanceBarrierTimeout, _ := time.ParseDuration(rawConfig["rebalance_barrier_timeout"])
	rebalanceMaxRetries, _ := strconv.Atoi(rawConfig["rebalance_max_retries"])
	rebalanceBackoff, _ := time.ParseDuration(rawConfig["rebalance_backoff"])
	partitionAssignmentStrategy, _ := rawConfig["partition_assignment_strategy"]
	excludeInternalTopics, _ := strconv.ParseBool(rawConfig["exclude_internal_topics"])

	numConsumerFetchers, _ := strconv.Atoi(rawConfig["num_consumer_fetchers"])
	fetchBatchSize, _ := strconv.Atoi(rawConfig["fetch_batch_size"])
	fetchMessageMaxBytes, _ := strconv.Atoi(rawConfig["fetch_message_max_bytes"])
	fetchMinBytes, _ := strconv.Atoi(rawConfig["fetch_min_bytes"])
	fetchBatchTimeout, _ := time.ParseDuration(rawConfig["fetch_batch_timeout"])
	requeueAskNextBackoff, _ := time.ParseDuration(rawConfig["requeue_ask_next_backoff"])
	fetchWaitMaxMs, _ := strconv.Atoi(rawConfig["fetch_wait_max_ms"])
	socketTimeout, _ := time.ParseDuration(rawConfig["socket_timeout"])
	queuedMaxMessages, _ := strconv.Atoi(rawConfig["queued_max_messages"])
	refreshLeaderBackoff, _ := time.ParseDuration(rawConfig["refresh_leader_backoff"])
	fetchMetadataRetries, _ := strconv.Atoi(rawConfig["fetch_metadata_retries"])
	fetchMetadataBackoff, _ := time.ParseDuration(rawConfig["fetch_metadata_backoff"])

	time.ParseDuration(rawConfig["fetch_metadata_backoff"])

	offsetsCommitMaxRetries, _ := strconv.Atoi(rawConfig["offsets_commit_max_retries"])

	deploymentTimeout, _ := time.ParseDuration(rawConfig["deployment_timeout"])

	zkConfig := kafkaClient.NewZookeeperConfig()
	zkConfig.ZookeeperConnect = []string{rawConfig["zookeeper_connect"]}
	zkConfig.ZookeeperTimeout = zkTimeout

	config := kafkaClient.DefaultConsumerConfig()
	config.Groupid = rawConfig["group_id"]
	config.NumWorkers = numWorkers
	config.MaxWorkerRetries = maxWorkerRetries
	config.WorkerBackoff = workerBackoff
	config.WorkerRetryThreshold = int32(workerRetryThreshold)
	config.WorkerThresholdTimeWindow = workerConsideredFailedTimeWindow
	config.WorkerTaskTimeout = workerTaskTimeout
	config.WorkerManagersStopTimeout = workerManagersStopTimeout
	config.BarrierTimeout = rebalanceBarrierTimeout
	config.RebalanceMaxRetries = int32(rebalanceMaxRetries)
	config.RebalanceBackoff = rebalanceBackoff
	config.PartitionAssignmentStrategy = partitionAssignmentStrategy
	config.ExcludeInternalTopics = excludeInternalTopics
	config.NumConsumerFetchers = numConsumerFetchers
	config.FetchBatchSize = fetchBatchSize
	config.FetchMessageMaxBytes = int32(fetchMessageMaxBytes)
	config.FetchMinBytes = int32(fetchMinBytes)
	config.FetchBatchTimeout = fetchBatchTimeout
	config.FetchTopicMetadataRetries = fetchMetadataRetries
	config.FetchTopicMetadataBackoff = fetchMetadataBackoff
	config.RequeueAskNextBackoff = requeueAskNextBackoff
	config.FetchWaitMaxMs = int32(fetchWaitMaxMs)
	config.SocketTimeout = socketTimeout
	config.QueuedMaxMessages = int32(queuedMaxMessages)
	config.RefreshLeaderBackoff = refreshLeaderBackoff
	config.Coordinator = kafkaClient.NewZookeeperCoordinator(zkConfig)
	config.AutoOffsetReset = rawConfig["auto_offset_reset"]
	config.OffsetsCommitMaxRetries = offsetsCommitMaxRetries
	config.DeploymentTimeout = deploymentTimeout
	config.OffsetCommitInterval = 10 * time.Second

	return config, rawConfig["topic"], numConsumers
}
  • 設定ファイル
consumers.properties
#Consumer identity
client_id=go-consumer
group_id=cs-rebalance-group
num_consumers=1
topic=kafkaesque
log_level=warn

#Zookeeper connection settings
zookeeper_connect=127.0.0.1:2181
zookeeper_timeout=1s

#workers settings
num_workers=1
max_worker_retries=3
worker_backoff=500ms
worker_retry_threshold=100
worker_considered_failed_time_window=500ms
worker_batch_timeout=5m
worker_task_timeout=1m
worker_managers_stop_timeout=1m

#Rebalance settings
rebalance_barrier_timeout=30s
rebalance_max_retries=4
rebalance_backoff=5s
partition_assignment_strategy=range
exclude_internal_topics=true

#Fetcher settings
num_consumer_fetchers=1
fetch_batch_size=1000
fetch_message_max_bytes=5242880
fetch_min_bytes=1
fetch_batch_timeout=5s
requeue_ask_next_backoff=1s
fetch_wait_max_ms=100
socket_timeout=30s
queued_max_messages=3
refresh_leader_backoff=200ms
fetch_metadata_retries=3
fetch_metadata_backoff=500ms

#Offsets settings
offsets_storage=zookeeper
auto_offset_reset=smallest
offsets_commit_max_retries=5

#Deployment
deployment_timeout=0s

#Metrics
graphite_connect=
flush_interval=10s

◼︎ Consumer側サンプルアプリを動かしてみる

$ go run kafka_consumer.go 
2016/01/09 15:04:53 Connected to 127.0.0.1:2181
2016/01/09 15:04:53 Authenticated: id=95179009841692686, timeout=4000
msg.Value=[Metamorphosis! 15:04:46.812117]
msg.Value=[Metamorphosis! 15:04:47.816758]
msg.Value=[Metamorphosis! 15:04:50.002973]
msg.Value=[Metamorphosis! 15:04:51.010001]
msg.Value=[Metamorphosis! 15:04:52.015099]
msg.Value=[Metamorphosis! 15:04:53.017251]
msg.Value=[Metamorphosis! 15:04:54.020390]
msg.Value=[Metamorphosis! 15:04:55.026017]
msg.Value=[Metamorphosis! 15:04:56.032326]
msg.Value=[Metamorphosis! 15:04:57.038825]
msg.Value=[Metamorphosis! 15:04:58.044228]
^C
Shutdown triggered, closing all alive consumers

golangサンプルアプリでも、Producer側から刻々と送信されてくる時刻等のデータを、Cousumer側で随時表示されることが確認できました。

◼︎ 参照元

5
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?