LoginSignup
5
8

More than 5 years have passed since last update.

Mac OS X環境で、Apache Kafkaの基本動作を試してみる

Last updated at Posted at 2016-01-09

BigDataのリアルタイムデータ処理基盤として活用されている、Apache Kafkaの基本動作を試してみました。

◼︎ Kafka動作環境をつくる

  • kafka本体をインストール
$ brew install Caskroom/cask/java
$ brew install kafka
  • kafka設定ファイル編集
$ vi /usr/local/etc/kafka/server.properties
  ...
broker.id=1
  ...
  • zookeeperを起動
$ zkServer start
JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
  • kafkaを起動
$ kafka-server-start.sh /usr/local/etc/kafka/server.properties
  • topic: "kafkaesque"を作成 & 確認
$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaesque
Created topic "kafkaesque".

$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafkaesque
Topic:kafkaesque    PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: kafkaesque   Partition: 0    Leader: 1   Replicas: 1 Isr: 1
  • kaka-pythonライブラリをインストール
$ pip install kafka-python==0.9.4
  • Producer側サンプルアプリを配備
kafka_producer.py
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer
from time import sleep
from datetime import datetime

kafka = KafkaClient("localhost:9092")

producer = SimpleProducer(kafka)

while 1:
  # "kafkaesque" is the name of our topic
  producer.send_messages("kafkaesque", "Metamorphosis! " + str(datetime.now().time()) )
  sleep(1)

◼︎ さっそく、動かしてみる

  • Producer側サンプルアプリ起動
$ python kafka_producer.py 

  • Consumer用コマンドを起動
$ kafka-console-consumer.sh --zookeeper localhost --topic kafkaesque
Metamorphosis! 14:46:27.113143
Metamorphosis! 14:46:28.119675
Metamorphosis! 14:46:29.125022
Metamorphosis! 14:46:30.126464
Metamorphosis! 14:46:31.131289
Metamorphosis! 14:46:32.137694
Metamorphosis! 14:46:33.144169
Metamorphosis! 14:46:34.150528
Metamorphosis! 14:46:35.156905
Metamorphosis! 14:46:36.161729
^CConsumed 10 messages

Producer側から刻々と送信されてくる時刻等のデータを、Cousumer側で随時表示されることが確認できました。

◼︎ Consumer側のgolangサンプルアプリ化

  • Consumer環境作成
$ go get github.com/elodina/go_kafka_client
  • Consumer側サンプルアプリ
kafka_consumer.go
package main

import (
    "fmt"
    "os"
    "os/signal"
    kafkaClient "github.com/elodina/go_kafka_client"
    "strconv"
    "strings"
    "time"
)


func main() {
    config, topic, numConsumers := resolveConfig()

        ctrlc := make(chan os.Signal, 1)
        signal.Notify(ctrlc, os.Interrupt)

        consumers := make([]*kafkaClient.Consumer, numConsumers)
        for i := 0; i < numConsumers; i++ {
                consumers[i] = startNewConsumer(*config, topic)
                time.Sleep(10 * time.Second)
        }

        <-ctrlc
        fmt.Println("Shutdown triggered, closing all alive consumers")
        for _, consumer := range consumers {
                <-consumer.Close()
        }
        fmt.Println("Successfully shut down all consumers")
}

func startNewConsumer(config kafkaClient.ConsumerConfig, topic string) *kafkaClient.Consumer {
        config.Strategy = GetStrategy(config.Consumerid)
        config.WorkerFailureCallback = FailedCallback
        config.WorkerFailedAttemptCallback = FailedAttemptCallback
        consumer := kafkaClient.NewConsumer(&config)
        topics := map[string]int{topic: config.NumConsumerFetchers}
        go func() {
                consumer.StartStatic(topics)
        }()
        return consumer
}


func GetStrategy(consumerId string) func(*kafkaClient.Worker, *kafkaClient.Message, kafkaClient.TaskId) kafkaClient.WorkerResult {
        return func(_ *kafkaClient.Worker, msg *kafkaClient.Message, id kafkaClient.TaskId) kafkaClient.WorkerResult {
                fmt.Printf("msg.Value=[%s]\n", msg.Value)

                return kafkaClient.NewSuccessfulResult(id)
        }
}

func FailedCallback(wm *kafkaClient.WorkerManager) kafkaClient.FailedDecision {
        kafkaClient.Info("main", "Failed callback")

        return kafkaClient.DoNotCommitOffsetAndStop
}

func FailedAttemptCallback(task *kafkaClient.Task, result kafkaClient.WorkerResult) kafkaClient.FailedDecision {
        kafkaClient.Info("main", "Failed attempt")

        return kafkaClient.CommitOffsetAndContinue
}

func setLogLevel(logLevel string) {
        var level kafkaClient.LogLevel
        switch strings.ToLower(logLevel) {
                case "trace":
                level = kafkaClient.TraceLevel
                case "debug":
                level = kafkaClient.DebugLevel
                case "info":
                level = kafkaClient.InfoLevel
                case "warn":
                level = kafkaClient.WarnLevel
                case "error":
                level = kafkaClient.ErrorLevel
                case "critical":
                level = kafkaClient.CriticalLevel
                default:
                level = kafkaClient.InfoLevel
        }
        kafkaClient.Logger = kafkaClient.NewDefaultLogger(level)
}

func resolveConfig() (*kafkaClient.ConsumerConfig, string, int) {
    rawConfig, err := kafkaClient.LoadConfiguration("consumers.properties")
    if err != nil {
        panic("Failed to load configuration file")
    }
    logLevel := rawConfig["log_level"]
    setLogLevel(logLevel) 
    numConsumers, _ := strconv.Atoi(rawConfig["num_consumers"])
    zkTimeout, _ := time.ParseDuration(rawConfig["zookeeper_timeout"])

    numWorkers, _ := strconv.Atoi(rawConfig["num_workers"])
    maxWorkerRetries, _ := strconv.Atoi(rawConfig["max_worker_retries"])
    workerBackoff, _ := time.ParseDuration(rawConfig["worker_backoff"])
    workerRetryThreshold, _ := strconv.Atoi(rawConfig["worker_retry_threshold"])
    workerConsideredFailedTimeWindow, _ := time.ParseDuration(rawConfig["worker_considered_failed_time_window"])
    workerTaskTimeout, _ := time.ParseDuration(rawConfig["worker_task_timeout"])
    workerManagersStopTimeout, _ := time.ParseDuration(rawConfig["worker_managers_stop_timeout"])

    rebalanceBarrierTimeout, _ := time.ParseDuration(rawConfig["rebalance_barrier_timeout"])
    rebalanceMaxRetries, _ := strconv.Atoi(rawConfig["rebalance_max_retries"])
    rebalanceBackoff, _ := time.ParseDuration(rawConfig["rebalance_backoff"])
    partitionAssignmentStrategy, _ := rawConfig["partition_assignment_strategy"]
    excludeInternalTopics, _ := strconv.ParseBool(rawConfig["exclude_internal_topics"])

    numConsumerFetchers, _ := strconv.Atoi(rawConfig["num_consumer_fetchers"])
    fetchBatchSize, _ := strconv.Atoi(rawConfig["fetch_batch_size"])
    fetchMessageMaxBytes, _ := strconv.Atoi(rawConfig["fetch_message_max_bytes"])
    fetchMinBytes, _ := strconv.Atoi(rawConfig["fetch_min_bytes"])
    fetchBatchTimeout, _ := time.ParseDuration(rawConfig["fetch_batch_timeout"])
    requeueAskNextBackoff, _ := time.ParseDuration(rawConfig["requeue_ask_next_backoff"])
    fetchWaitMaxMs, _ := strconv.Atoi(rawConfig["fetch_wait_max_ms"])
    socketTimeout, _ := time.ParseDuration(rawConfig["socket_timeout"])
    queuedMaxMessages, _ := strconv.Atoi(rawConfig["queued_max_messages"])
    refreshLeaderBackoff, _ := time.ParseDuration(rawConfig["refresh_leader_backoff"])
    fetchMetadataRetries, _ := strconv.Atoi(rawConfig["fetch_metadata_retries"])
    fetchMetadataBackoff, _ := time.ParseDuration(rawConfig["fetch_metadata_backoff"])

    time.ParseDuration(rawConfig["fetch_metadata_backoff"])

    offsetsCommitMaxRetries, _ := strconv.Atoi(rawConfig["offsets_commit_max_retries"])

    deploymentTimeout, _ := time.ParseDuration(rawConfig["deployment_timeout"])

    zkConfig := kafkaClient.NewZookeeperConfig()
    zkConfig.ZookeeperConnect = []string{rawConfig["zookeeper_connect"]}
    zkConfig.ZookeeperTimeout = zkTimeout

    config := kafkaClient.DefaultConsumerConfig()
    config.Groupid = rawConfig["group_id"]
    config.NumWorkers = numWorkers
    config.MaxWorkerRetries = maxWorkerRetries
    config.WorkerBackoff = workerBackoff
    config.WorkerRetryThreshold = int32(workerRetryThreshold)
    config.WorkerThresholdTimeWindow = workerConsideredFailedTimeWindow
    config.WorkerTaskTimeout = workerTaskTimeout
    config.WorkerManagersStopTimeout = workerManagersStopTimeout
    config.BarrierTimeout = rebalanceBarrierTimeout
    config.RebalanceMaxRetries = int32(rebalanceMaxRetries)
    config.RebalanceBackoff = rebalanceBackoff
    config.PartitionAssignmentStrategy = partitionAssignmentStrategy
    config.ExcludeInternalTopics = excludeInternalTopics
    config.NumConsumerFetchers = numConsumerFetchers
    config.FetchBatchSize = fetchBatchSize
    config.FetchMessageMaxBytes = int32(fetchMessageMaxBytes)
    config.FetchMinBytes = int32(fetchMinBytes)
    config.FetchBatchTimeout = fetchBatchTimeout
    config.FetchTopicMetadataRetries = fetchMetadataRetries
    config.FetchTopicMetadataBackoff = fetchMetadataBackoff
    config.RequeueAskNextBackoff = requeueAskNextBackoff
    config.FetchWaitMaxMs = int32(fetchWaitMaxMs)
    config.SocketTimeout = socketTimeout
    config.QueuedMaxMessages = int32(queuedMaxMessages)
    config.RefreshLeaderBackoff = refreshLeaderBackoff
    config.Coordinator = kafkaClient.NewZookeeperCoordinator(zkConfig)
    config.AutoOffsetReset = rawConfig["auto_offset_reset"]
    config.OffsetsCommitMaxRetries = offsetsCommitMaxRetries
    config.DeploymentTimeout = deploymentTimeout
    config.OffsetCommitInterval = 10 * time.Second

    return config, rawConfig["topic"], numConsumers
}
  • 設定ファイル
consumers.properties
#Consumer identity
client_id=go-consumer
group_id=cs-rebalance-group
num_consumers=1
topic=kafkaesque
log_level=warn

#Zookeeper connection settings
zookeeper_connect=127.0.0.1:2181
zookeeper_timeout=1s

#workers settings
num_workers=1
max_worker_retries=3
worker_backoff=500ms
worker_retry_threshold=100
worker_considered_failed_time_window=500ms
worker_batch_timeout=5m
worker_task_timeout=1m
worker_managers_stop_timeout=1m

#Rebalance settings
rebalance_barrier_timeout=30s
rebalance_max_retries=4
rebalance_backoff=5s
partition_assignment_strategy=range
exclude_internal_topics=true

#Fetcher settings
num_consumer_fetchers=1
fetch_batch_size=1000
fetch_message_max_bytes=5242880
fetch_min_bytes=1
fetch_batch_timeout=5s
requeue_ask_next_backoff=1s
fetch_wait_max_ms=100
socket_timeout=30s
queued_max_messages=3
refresh_leader_backoff=200ms
fetch_metadata_retries=3
fetch_metadata_backoff=500ms

#Offsets settings
offsets_storage=zookeeper
auto_offset_reset=smallest
offsets_commit_max_retries=5

#Deployment
deployment_timeout=0s

#Metrics
graphite_connect=
flush_interval=10s

◼︎ Consumer側サンプルアプリを動かしてみる

$ go run kafka_consumer.go 
2016/01/09 15:04:53 Connected to 127.0.0.1:2181
2016/01/09 15:04:53 Authenticated: id=95179009841692686, timeout=4000
msg.Value=[Metamorphosis! 15:04:46.812117]
msg.Value=[Metamorphosis! 15:04:47.816758]
msg.Value=[Metamorphosis! 15:04:50.002973]
msg.Value=[Metamorphosis! 15:04:51.010001]
msg.Value=[Metamorphosis! 15:04:52.015099]
msg.Value=[Metamorphosis! 15:04:53.017251]
msg.Value=[Metamorphosis! 15:04:54.020390]
msg.Value=[Metamorphosis! 15:04:55.026017]
msg.Value=[Metamorphosis! 15:04:56.032326]
msg.Value=[Metamorphosis! 15:04:57.038825]
msg.Value=[Metamorphosis! 15:04:58.044228]
^C
Shutdown triggered, closing all alive consumers

golangサンプルアプリでも、Producer側から刻々と送信されてくる時刻等のデータを、Cousumer側で随時表示されることが確認できました。

◼︎ 参照元

5
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
8