LoginSignup
2
2

More than 5 years have passed since last update.

kafkaと戯れ

Last updated at Posted at 2016-06-04

1machineでkafka環境を作って戯れる

1.環境設定

OS&JDK

CentOS7 をインストールしたのち

  • firewalld をオフ
  • selinux をオフ
  • JDK8 をインストール
yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel
  • /etc/profile or ~/.bashrc にJAVA_HOME設定
export JAVA_HOME=/usr/lib/jvm/java

ZooKeeper

CDHのRPMを利用する /etc/yum.repos.d/ に以下を配置

https://archive.cloudera.com/cdh5/redhat/7/x86_64/cdh/cloudera-cdh5.repo

でyum install して、サービス起動と、自動起動設定もしておく

# yum -y install zookeeper-server
# service zookeeper-server start
# systemctl enable zookeeper-server.service

ポート2181が空いてれば起動成功かな zookeeper-client を使って確認してもよい

Kafka

Apache サイトのバイナリtarball をダウンロード、展開

開発用なのでヒープサイズはちっちゃくてよい(デフォルトは1GB)

export KAFKA_HEAP_OPTS="-Xmx256M"

それに合わせて config/server.propertiesの各種値も1ケタ小さくしておこう

socket.request.max.bytes=10485760
log.retention.bytes=107374182
log.segment.bytes=107374182

そしてスタート

$ bin/kafka-server-start.sh config/server.properties &

疎通確認もしておく. topic 作って確認

$ bin/kafka-topics.sh --create --zookeeper localhost:2181\
  --replication-factor 1 --partitions 1 --topic test
$ bin/kafka-topics.sh --list --zookeeper localhost:2181

メッセージを入れて取り出す 二つターミナルを用意して以下のようにする

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

うまくメッセージパッシングできていることを確認

2.Kafka Streamsとの戯れ

デモの実行

v0.10で導入された kafka streams を試してみる。サンプルのデモを実行するため、二つのトピック streams-file-inputstreams-wordcount-output を作成しておく

$ bin/kafka-topics.sh --create --zookeeper localhost:2181\
  --replication-factor 1 --partitions 1 --topic streams-file-input
$ bin/kafka-topics.sh --create --zookeeper localhost:2181\
  --replication-factor 1 --partitions 1 --topic streams-wordcount-output

input となるstreamの方には適当に、Apache Licenceの文章でも入れておく

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input
**** Apache Licence の文章******
Ctrl+D

そして、クラスパスを通して、Streamアプリを実行

$  ./bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

そして結果を確認

$ ./bin/kafka-console-consumer.sh \
--zookeeper localhost:2181 --topic streams-wordcount-output \
--from-beginning --formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Kafka Streaming の概要

というか Kafka Streaming とはなんぞや?ドキュメント http://docs.confluent.io/2.1.0-alpha1/streams/index.html に概要が書かれているが、要は

  • Kafka のクライアントライブラリとして実装された
  • ストリーミングを実現するAPI群で
  • トピックAのメッセージを処理してトピックBに詰め込むことを可能にする
  • High Level なDSLと Low Level なインターフェイスを持ち
  • スケーラブルでエラーセーフにすることもできる

とのこと

3.Kafka Streamsのアプリ開発

環境整備

男は黙ってvi. 以下のようなactivateファイルを用意して開発時には source activate とする

PS1=(kafka)$PS1
KAFKA_HOME=/home/______/kafka/kafka
for i in $KAFKA_HOME/libs/*.jar
do
CLASSPATH=$i:$CLASSPATH
done
export CLASSPATH

何回も実行して出力先のトピックが汚れるのもいやなので、毎回作り直す。以下のようなスクリプトをinit.shというように名前をつけて都度リフレッシュ。

./kafka/bin/kafka-topics.sh --delete --topic b --zookeeper localhost:2181
./kafka/bin/kafka-topics.sh --create --topic b --partitions 1 --replication-factor 1 --zookeeper localhost:2181

なお、トピックの削除には、サーバ側の設定が必要。以下のように設定して再起動。

config/server.properties
delete.topic.enable=true

テストコードのインターフェース

以下の「はまった点」にもあるように、同じ名前のアプリケーション名を使いまわしていると、前回の状態をKafka側が覚えている、すなわち同じコンシューマグループのクライアントとして認識されるため、試験をするときはアプリケーション名を都度変えて実行する

はまった点

さっきはうまくいっていたのに今回はうまくいかない
:同じアプリケーション名だと、前回の状態を引き継ぐので、意図通りにConsumeできていない可能性がある

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2