docker-compose で Kafka の動作確認ができて、docker-fluentd-image のカスタマイズを試すのもやったので、組み合わせて docker-compose で fluent-plugin-kafka を軽く試してみた。
準備
ディレクトリを作成して移動。
$ mkdir custom-fluentd
$ cd custom-fluentd
docker-compose.yml を作成する。
$ vi docker-compose.yml
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
fluentd:
build: .
volumes:
- ./log:/fluentd/log
depends_on:
- kafka
fluentd のイメージはカスタマイズして fluent-plugin-kafka をインストールするため、Dockerfile を作成する。
(fluent-plugin-kafka だけでなく zookeeper の gem もインストールする必要があった。)
$ vi Dockerfile
FROM fluent/fluentd:latest-onbuild
MAINTAINER YOUR_NAME <...@...>
WORKDIR /home/fluent
ENV PATH /home/fluent/.gem/ruby/2.3.0/bin:$PATH
USER root
RUN apk --no-cache add sudo build-base ruby-dev && \
sudo -u fluent gem install zookeeper fluent-plugin-kafka && \
rm -rf /home/fluent/.gem/ruby/2.3.0/cache/*.gem && sudo -u fluent gem sources -c && \
apk del sudo build-base ruby-dev
EXPOSE 24284
USER fluent
CMD exec fluentd -c /fluentd/etc/$FLUENTD_CONF -p /fluentd/plugins $FLUENTD_OPT
fluentd の設定ファイルを作成する。とりあえず受信したログを全部 docker という名前のトピックに送信する設定。
$ vi fluent.conf
<source>
@type forward
@id input1
@label @mainstream
port 24224
</source>
<filter **>
@type stdout
</filter>
<label @mainstream>
<match **>
@type kafka_buffered
brokers kafka:9092
zookeeper zookeeper:2181
default_topic docker
</match>
</label>
あと必要なディレクトリを作っておく。(plugins のほうはいらないかも。)
$ mkdir log
$ mkdir plugins
コンテナ起動
ドッカコンポーザップディーする。
$ docker-compose up -d
今回 docker-compose.yml が手抜きなので、zookeeper が立ち上がる前に kafka が立ち上がろうとして落ちてしまったり、kafka が立ち上がる前に fluentd が立ち上がろうとして落ちてしまったりする。
ので、docker-compose ps
して落ちていたら、手動で docker-compose start kafka
とか docker-compose start fluentd
した。
うまく立ち上がると docker-compose ps
がこんな感じになる。
$ docker-compose ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------------------------
customfluentd_fluentd_1 /bin/sh -c exec fluentd -c ... Up 24224/tcp, 24284/tcp, 5140/tcp
customfluentd_kafka_1 start-kafka.sh Up 0.0.0.0:32797->9092/tcp
customfluentd_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
動作確認
fluent.conf に指定した docker トピックを kafka に作成しておく。
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic docker
Created topic "test".
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181
docker
fluentd の IP を調べて、別コンテナから docker ログを送信する。
$ FLUENTD_IP=$(docker inspect -f "{{.NetworkSettings.Networks.customfluentd_default.IPAddress}}" customfluentd_fluentd_1)
$ echo $FLUENTD_IP
172.19.0.4
$ docker run --log-driver=fluentd --log-opt fluentd-address=$FLUENTD_IP:24224 python:alpine echo Hello
Hello
kafka-console-consumer.sh を立ち上げてトピックにたまったログを最初から読んでみると、ちゃんとさきほどの Hello というメッセージを受信することができた。
$ docker-compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic docker --from-beginning
{"source":"stdout","log":"Hello","container_id":"c207a0ab45f5f67cb8bf1fc04480681a1a8a8b3947aad9025bf725125583c605","container_name":"/tiny_visvesvaraya"}
上記の kafka-console-consumer.sh を起動したまま、もう一度上に書いた方法で echo Hello
してみると、今度はしばらく時間がたってからメッセージを受信することができた。
バッファされているから受信に時間がかかるということなんだと思う。
まとめ
docker-compose で fluent-plugin-kafka の match + kafka_buffered を手軽に試すことができるようになった。次は source (consume する側) のほうを試してみようと思う。