Edited at

kafkaと戯れ

More than 3 years have passed since last update.

1machineでkafka環境を作って戯れる


1.環境設定


OS&JDK

CentOS7 をインストールしたのち


  • firewalld をオフ

  • selinux をオフ

  • JDK8 をインストール

yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel


  • /etc/profile or ~/.bashrc にJAVA_HOME設定

export JAVA_HOME=/usr/lib/jvm/java


ZooKeeper

CDHのRPMを利用する /etc/yum.repos.d/ に以下を配置

https://archive.cloudera.com/cdh5/redhat/7/x86_64/cdh/cloudera-cdh5.repo

でyum install して、サービス起動と、自動起動設定もしておく

# yum -y install zookeeper-server

# service zookeeper-server start
# systemctl enable zookeeper-server.service

ポート2181が空いてれば起動成功かな zookeeper-client を使って確認してもよい


Kafka

Apache サイトのバイナリtarball をダウンロード、展開

開発用なのでヒープサイズはちっちゃくてよい(デフォルトは1GB)

export KAFKA_HEAP_OPTS="-Xmx256M"

それに合わせて config/server.propertiesの各種値も1ケタ小さくしておこう

socket.request.max.bytes=10485760

log.retention.bytes=107374182
log.segment.bytes=107374182

そしてスタート

$ bin/kafka-server-start.sh config/server.properties &

疎通確認もしておく. topic 作って確認

$ bin/kafka-topics.sh --create --zookeeper localhost:2181\

--replication-factor 1 --partitions 1 --topic test
$ bin/kafka-topics.sh --list --zookeeper localhost:2181

メッセージを入れて取り出す 二つターミナルを用意して以下のようにする

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

うまくメッセージパッシングできていることを確認


2.Kafka Streamsとの戯れ


デモの実行

v0.10で導入された kafka streams を試してみる。サンプルのデモを実行するため、二つのトピック streams-file-inputstreams-wordcount-output を作成しておく

$ bin/kafka-topics.sh --create --zookeeper localhost:2181\

--replication-factor 1 --partitions 1 --topic streams-file-input
$ bin/kafka-topics.sh --create --zookeeper localhost:2181\
--replication-factor 1 --partitions 1 --topic streams-wordcount-output

input となるstreamの方には適当に、Apache Licenceの文章でも入れておく

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input

**** Apache Licence の文章******
Ctrl+D

そして、クラスパスを通して、Streamアプリを実行

$  ./bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

そして結果を確認

$ ./bin/kafka-console-consumer.sh \

--zookeeper localhost:2181 --topic streams-wordcount-output \
--from-beginning --formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer


Kafka Streaming の概要

というか Kafka Streaming とはなんぞや?ドキュメント http://docs.confluent.io/2.1.0-alpha1/streams/index.html に概要が書かれているが、要は


  • Kafka のクライアントライブラリとして実装された

  • ストリーミングを実現するAPI群で

  • トピックAのメッセージを処理してトピックBに詰め込むことを可能にする

  • High Level なDSLと Low Level なインターフェイスを持ち

  • スケーラブルでエラーセーフにすることもできる

とのこと


3.Kafka Streamsのアプリ開発


環境整備

男は黙ってvi. 以下のようなactivateファイルを用意して開発時には source activate とする

PS1=(kafka)$PS1

KAFKA_HOME=/home/______/kafka/kafka
for i in $KAFKA_HOME/libs/*.jar
do
CLASSPATH=$i:$CLASSPATH
done
export CLASSPATH

何回も実行して出力先のトピックが汚れるのもいやなので、毎回作り直す。以下のようなスクリプトをinit.shというように名前をつけて都度リフレッシュ。

./kafka/bin/kafka-topics.sh --delete --topic b --zookeeper localhost:2181

./kafka/bin/kafka-topics.sh --create --topic b --partitions 1 --replication-factor 1 --zookeeper localhost:2181

なお、トピックの削除には、サーバ側の設定が必要。以下のように設定して再起動。


config/server.properties

delete.topic.enable=true



テストコードのインターフェース

以下の「はまった点」にもあるように、同じ名前のアプリケーション名を使いまわしていると、前回の状態をKafka側が覚えている、すなわち同じコンシューマグループのクライアントとして認識されるため、試験をするときはアプリケーション名を都度変えて実行する


はまった点

さっきはうまくいっていたのに今回はうまくいかない

:同じアプリケーション名だと、前回の状態を引き継ぐので、意図通りにConsumeできていない可能性がある