BigDataのリアルタイムデータ処理基盤として活用されている、Apache Kafkaの基本動作を試してみました。
◼︎ Kafka動作環境をつくる
- kafka本体をインストール
$ brew install Caskroom/cask/java
$ brew install kafka
- kafka設定ファイル編集
$ vi /usr/local/etc/kafka/server.properties
...
broker.id=1
...
- zookeeperを起動
$ zkServer start
JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
- kafkaを起動
$ kafka-server-start.sh /usr/local/etc/kafka/server.properties
- topic: "kafkaesque"を作成 & 確認
$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaesque
Created topic "kafkaesque".
$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafkaesque
Topic:kafkaesque PartitionCount:1 ReplicationFactor:1 Configs:
Topic: kafkaesque Partition: 0 Leader: 1 Replicas: 1 Isr: 1
- kaka-pythonライブラリをインストール
$ pip install kafka-python==0.9.4
- Producer側サンプルアプリを配備
kafka_producer.py
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer
from time import sleep
from datetime import datetime
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
while 1:
# "kafkaesque" is the name of our topic
producer.send_messages("kafkaesque", "Metamorphosis! " + str(datetime.now().time()) )
sleep(1)
◼︎ さっそく、動かしてみる
- Producer側サンプルアプリ起動
$ python kafka_producer.py
- Consumer用コマンドを起動
$ kafka-console-consumer.sh --zookeeper localhost --topic kafkaesque
Metamorphosis! 14:46:27.113143
Metamorphosis! 14:46:28.119675
Metamorphosis! 14:46:29.125022
Metamorphosis! 14:46:30.126464
Metamorphosis! 14:46:31.131289
Metamorphosis! 14:46:32.137694
Metamorphosis! 14:46:33.144169
Metamorphosis! 14:46:34.150528
Metamorphosis! 14:46:35.156905
Metamorphosis! 14:46:36.161729
^CConsumed 10 messages
Producer側から刻々と送信されてくる時刻等のデータを、Cousumer側で随時表示されることが確認できました。
◼︎ Consumer側のgolangサンプルアプリ化
- Consumer環境作成
$ go get github.com/elodina/go_kafka_client
- Consumer側サンプルアプリ
kafka_consumer.go
package main
import (
"fmt"
"os"
"os/signal"
kafkaClient "github.com/elodina/go_kafka_client"
"strconv"
"strings"
"time"
)
func main() {
config, topic, numConsumers := resolveConfig()
ctrlc := make(chan os.Signal, 1)
signal.Notify(ctrlc, os.Interrupt)
consumers := make([]*kafkaClient.Consumer, numConsumers)
for i := 0; i < numConsumers; i++ {
consumers[i] = startNewConsumer(*config, topic)
time.Sleep(10 * time.Second)
}
<-ctrlc
fmt.Println("Shutdown triggered, closing all alive consumers")
for _, consumer := range consumers {
<-consumer.Close()
}
fmt.Println("Successfully shut down all consumers")
}
func startNewConsumer(config kafkaClient.ConsumerConfig, topic string) *kafkaClient.Consumer {
config.Strategy = GetStrategy(config.Consumerid)
config.WorkerFailureCallback = FailedCallback
config.WorkerFailedAttemptCallback = FailedAttemptCallback
consumer := kafkaClient.NewConsumer(&config)
topics := map[string]int{topic: config.NumConsumerFetchers}
go func() {
consumer.StartStatic(topics)
}()
return consumer
}
func GetStrategy(consumerId string) func(*kafkaClient.Worker, *kafkaClient.Message, kafkaClient.TaskId) kafkaClient.WorkerResult {
return func(_ *kafkaClient.Worker, msg *kafkaClient.Message, id kafkaClient.TaskId) kafkaClient.WorkerResult {
fmt.Printf("msg.Value=[%s]\n", msg.Value)
return kafkaClient.NewSuccessfulResult(id)
}
}
func FailedCallback(wm *kafkaClient.WorkerManager) kafkaClient.FailedDecision {
kafkaClient.Info("main", "Failed callback")
return kafkaClient.DoNotCommitOffsetAndStop
}
func FailedAttemptCallback(task *kafkaClient.Task, result kafkaClient.WorkerResult) kafkaClient.FailedDecision {
kafkaClient.Info("main", "Failed attempt")
return kafkaClient.CommitOffsetAndContinue
}
func setLogLevel(logLevel string) {
var level kafkaClient.LogLevel
switch strings.ToLower(logLevel) {
case "trace":
level = kafkaClient.TraceLevel
case "debug":
level = kafkaClient.DebugLevel
case "info":
level = kafkaClient.InfoLevel
case "warn":
level = kafkaClient.WarnLevel
case "error":
level = kafkaClient.ErrorLevel
case "critical":
level = kafkaClient.CriticalLevel
default:
level = kafkaClient.InfoLevel
}
kafkaClient.Logger = kafkaClient.NewDefaultLogger(level)
}
func resolveConfig() (*kafkaClient.ConsumerConfig, string, int) {
rawConfig, err := kafkaClient.LoadConfiguration("consumers.properties")
if err != nil {
panic("Failed to load configuration file")
}
logLevel := rawConfig["log_level"]
setLogLevel(logLevel)
numConsumers, _ := strconv.Atoi(rawConfig["num_consumers"])
zkTimeout, _ := time.ParseDuration(rawConfig["zookeeper_timeout"])
numWorkers, _ := strconv.Atoi(rawConfig["num_workers"])
maxWorkerRetries, _ := strconv.Atoi(rawConfig["max_worker_retries"])
workerBackoff, _ := time.ParseDuration(rawConfig["worker_backoff"])
workerRetryThreshold, _ := strconv.Atoi(rawConfig["worker_retry_threshold"])
workerConsideredFailedTimeWindow, _ := time.ParseDuration(rawConfig["worker_considered_failed_time_window"])
workerTaskTimeout, _ := time.ParseDuration(rawConfig["worker_task_timeout"])
workerManagersStopTimeout, _ := time.ParseDuration(rawConfig["worker_managers_stop_timeout"])
rebalanceBarrierTimeout, _ := time.ParseDuration(rawConfig["rebalance_barrier_timeout"])
rebalanceMaxRetries, _ := strconv.Atoi(rawConfig["rebalance_max_retries"])
rebalanceBackoff, _ := time.ParseDuration(rawConfig["rebalance_backoff"])
partitionAssignmentStrategy, _ := rawConfig["partition_assignment_strategy"]
excludeInternalTopics, _ := strconv.ParseBool(rawConfig["exclude_internal_topics"])
numConsumerFetchers, _ := strconv.Atoi(rawConfig["num_consumer_fetchers"])
fetchBatchSize, _ := strconv.Atoi(rawConfig["fetch_batch_size"])
fetchMessageMaxBytes, _ := strconv.Atoi(rawConfig["fetch_message_max_bytes"])
fetchMinBytes, _ := strconv.Atoi(rawConfig["fetch_min_bytes"])
fetchBatchTimeout, _ := time.ParseDuration(rawConfig["fetch_batch_timeout"])
requeueAskNextBackoff, _ := time.ParseDuration(rawConfig["requeue_ask_next_backoff"])
fetchWaitMaxMs, _ := strconv.Atoi(rawConfig["fetch_wait_max_ms"])
socketTimeout, _ := time.ParseDuration(rawConfig["socket_timeout"])
queuedMaxMessages, _ := strconv.Atoi(rawConfig["queued_max_messages"])
refreshLeaderBackoff, _ := time.ParseDuration(rawConfig["refresh_leader_backoff"])
fetchMetadataRetries, _ := strconv.Atoi(rawConfig["fetch_metadata_retries"])
fetchMetadataBackoff, _ := time.ParseDuration(rawConfig["fetch_metadata_backoff"])
time.ParseDuration(rawConfig["fetch_metadata_backoff"])
offsetsCommitMaxRetries, _ := strconv.Atoi(rawConfig["offsets_commit_max_retries"])
deploymentTimeout, _ := time.ParseDuration(rawConfig["deployment_timeout"])
zkConfig := kafkaClient.NewZookeeperConfig()
zkConfig.ZookeeperConnect = []string{rawConfig["zookeeper_connect"]}
zkConfig.ZookeeperTimeout = zkTimeout
config := kafkaClient.DefaultConsumerConfig()
config.Groupid = rawConfig["group_id"]
config.NumWorkers = numWorkers
config.MaxWorkerRetries = maxWorkerRetries
config.WorkerBackoff = workerBackoff
config.WorkerRetryThreshold = int32(workerRetryThreshold)
config.WorkerThresholdTimeWindow = workerConsideredFailedTimeWindow
config.WorkerTaskTimeout = workerTaskTimeout
config.WorkerManagersStopTimeout = workerManagersStopTimeout
config.BarrierTimeout = rebalanceBarrierTimeout
config.RebalanceMaxRetries = int32(rebalanceMaxRetries)
config.RebalanceBackoff = rebalanceBackoff
config.PartitionAssignmentStrategy = partitionAssignmentStrategy
config.ExcludeInternalTopics = excludeInternalTopics
config.NumConsumerFetchers = numConsumerFetchers
config.FetchBatchSize = fetchBatchSize
config.FetchMessageMaxBytes = int32(fetchMessageMaxBytes)
config.FetchMinBytes = int32(fetchMinBytes)
config.FetchBatchTimeout = fetchBatchTimeout
config.FetchTopicMetadataRetries = fetchMetadataRetries
config.FetchTopicMetadataBackoff = fetchMetadataBackoff
config.RequeueAskNextBackoff = requeueAskNextBackoff
config.FetchWaitMaxMs = int32(fetchWaitMaxMs)
config.SocketTimeout = socketTimeout
config.QueuedMaxMessages = int32(queuedMaxMessages)
config.RefreshLeaderBackoff = refreshLeaderBackoff
config.Coordinator = kafkaClient.NewZookeeperCoordinator(zkConfig)
config.AutoOffsetReset = rawConfig["auto_offset_reset"]
config.OffsetsCommitMaxRetries = offsetsCommitMaxRetries
config.DeploymentTimeout = deploymentTimeout
config.OffsetCommitInterval = 10 * time.Second
return config, rawConfig["topic"], numConsumers
}
- 設定ファイル
consumers.properties
#Consumer identity
client_id=go-consumer
group_id=cs-rebalance-group
num_consumers=1
topic=kafkaesque
log_level=warn
#Zookeeper connection settings
zookeeper_connect=127.0.0.1:2181
zookeeper_timeout=1s
#workers settings
num_workers=1
max_worker_retries=3
worker_backoff=500ms
worker_retry_threshold=100
worker_considered_failed_time_window=500ms
worker_batch_timeout=5m
worker_task_timeout=1m
worker_managers_stop_timeout=1m
#Rebalance settings
rebalance_barrier_timeout=30s
rebalance_max_retries=4
rebalance_backoff=5s
partition_assignment_strategy=range
exclude_internal_topics=true
#Fetcher settings
num_consumer_fetchers=1
fetch_batch_size=1000
fetch_message_max_bytes=5242880
fetch_min_bytes=1
fetch_batch_timeout=5s
requeue_ask_next_backoff=1s
fetch_wait_max_ms=100
socket_timeout=30s
queued_max_messages=3
refresh_leader_backoff=200ms
fetch_metadata_retries=3
fetch_metadata_backoff=500ms
#Offsets settings
offsets_storage=zookeeper
auto_offset_reset=smallest
offsets_commit_max_retries=5
#Deployment
deployment_timeout=0s
#Metrics
graphite_connect=
flush_interval=10s
◼︎ Consumer側サンプルアプリを動かしてみる
$ go run kafka_consumer.go
2016/01/09 15:04:53 Connected to 127.0.0.1:2181
2016/01/09 15:04:53 Authenticated: id=95179009841692686, timeout=4000
msg.Value=[Metamorphosis! 15:04:46.812117]
msg.Value=[Metamorphosis! 15:04:47.816758]
msg.Value=[Metamorphosis! 15:04:50.002973]
msg.Value=[Metamorphosis! 15:04:51.010001]
msg.Value=[Metamorphosis! 15:04:52.015099]
msg.Value=[Metamorphosis! 15:04:53.017251]
msg.Value=[Metamorphosis! 15:04:54.020390]
msg.Value=[Metamorphosis! 15:04:55.026017]
msg.Value=[Metamorphosis! 15:04:56.032326]
msg.Value=[Metamorphosis! 15:04:57.038825]
msg.Value=[Metamorphosis! 15:04:58.044228]
^C
Shutdown triggered, closing all alive consumers
golangサンプルアプリでも、Producer側から刻々と送信されてくる時刻等のデータを、Cousumer側で随時表示されることが確認できました。