Help us understand the problem. What is going on with this article?

KafkaからTreasure DataにブリッジするDocker Compose

More than 3 years have passed since last update.

 td-agentコンテナとKafka Consumerコンテナを使いKafkaからTreasure DataへブリッジするDocker Composeサービスを起動します。別のポストではPySpark Streamingのウィンドウ集計した結果をKafkaのトピックに出力するコードを書きました。このストリーム処理はデータパイプラインの前処理やエンリッチメントに相当します。後続にビッグデータのバッチ処理を想定してTreasure Dataに保存します。

Docker Compose

 最初に今回作成するプロジェクトのディレクトリ構成です。

$ tree -a
.
├── docker-compose.yml
├── .env
├── .gitignore
├── kafka-bridge
│   ├── Dockerfile
│   ├── fluentd-consumer.properties
│   └── log4j.properties
└── td-agent2
    ├── Dockerfile
    └── td-agent.conf

2 directories, 9 files

docker-compose.yml

 td-agentとKafka ConsumerサービスはそれぞれDockefileを書いてビルドします。Kafkaはlandoop/fast-data-devを利用します。Confluent Open Sourceを同梱しているためKafkaとZooKeeperも起動します。

docker-compose.yml
version: '2'
services:
  kafka-stack:
    image: landoop/fast-data-dev
    environment:
      - FORWARDLOGS=0
      - RUNTESTS=0
      - ADV_HOST=<仮想マシンのパブリックIPアドレス>
    ports:
      - 3030:3030
      - 9092:9092
      - 2181:2181
      - 8081:8081
  td-agent2:
    build: ./td-agent2
    env_file:
      - ./.env
    ports:
      - 24224:24224
  kafka-bridge:
    build: ./kafka-bridge
    depends_on:
      - td-agent

.env

 Treasure Dataの接続情報は環境変数ファイルの.envに記述しDocker Composeから読み込みます。

td-agent2/.env
TD_API_KEY=<YOUR API KEY>
TD_ENDPOINT=<TD ENDPOINT>

td-agent2

 td-agentのDockerイメージを作成します。

Dockerfile

 Overview of Server-Side Agent (td-agent)のインストール手順に従います。install-ubuntu-xenial-td-agent2.shの中ではsudoも必要です。

td-agent2/Dockerfile
FROM ubuntu:xenial

RUN apt-get update && apt-get install sudo curl -y
RUN curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-xenial-td-agent2.sh | sh
RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*

ADD td-agent.conf /etc/td-agent/
EXPOSE 24224
CMD ["/usr/sbin/td-agent"]

td-agent.conf

 td-agent.confは環境変数を参照することができます。Treasure Dataへの接続情報を.envファイルから取得します。

td-agent2/td-agent.conf
<match td.*.*>
  @type tdlog
  endpoint "#{ENV['TD_ENDPOINT']}"
  apikey "#{ENV['TD_API_KEY']}"
  auto_create_table
  buffer_type file
  buffer_path /var/log/td-agent/buffer/td
  use_ssl true
  num_threads 8
</match>

<source>
  @type forward
</source>

kafka-fluentd-consumer

 KafkaからTreasure Dataへのブリッジにはkafka-fluentd-consumerのJarを利用します。

Dockerfile

コンパイル済のkafka-fluentd-consumer-0.3.1-all.jarをダウンロードします。

kafka-bridge/Dockerfile
FROM java:8-jre
ARG KAFKA_FLUENTD_CONSUMER_VERSION=0.3.1

WORKDIR /app

RUN wget -q -O kafka-fluentd-consumer-all.jar https://github.com/treasure-data/kafka-fluentd-consumer/releases/download/v$KAFKA_FLUENTD_CONSUMER_VERSION/kafka-fluentd-consumer-$KAFKA_FLUENTD_CONSUMER_VERSION-all.jar

ADD log4j.properties .
ADD fluentd-consumer.properties .

CMD ["java", "-Dlog4j.configuration=file:///app/log4j.properties", "-jar", "kafka-fluentd-consumer-all.jar", "fluentd-consumer.properties"]

fluentd-consumer.properties

 デフォルトの設定から以下を変更します。fluentd.connectzookeeper.connectはdocker-compose.ymlを使う場合はそれぞれサービス名を指定します。

  • fluentd.connect=:24224
  • fluentd.tag.prefix=td.<データベース名>.
  • fluentd.consumer.topics=<トピック名>
  • zookeeper.connect=:2181
  • group.id=<コンシューマグループ名>
kafka-bridge/fluentd-consumer.properties
# Fluentd instance destinations.
fluentd.connect=td-agent2:24224

# Dynamic event tag with topic name. 
fluentd.tag.prefix=td.sensortag_dev.

# Consumed topics. 
fluentd.consumer.topics=sensortag-sink

# The number of threads per consumer streams
fluentd.consumer.threads=1

# The path for backup un-flushed events during shutdown.
fluentd.consumer.backup.dir=/tmp/fluentd-consumer-backup/

# Kafka Consumer related parameters
zookeeper.connect=kafka-stack:2181
group.id=my-sensortag-sink-group
zookeeper.session.timeout.ms=400
zookeeper.sync.time.ms=200
auto.commit.interval.ms=1000

log4j.properties

 log4j.propertiesはデフォルトのまま使います。

kafka-bridge/log4j.properties
# log4j logging configuration.
# This is based on Pinterest's secor

# root logger.
log4j.rootLogger=DEBUG, ROLLINGFILE

log4j.appender.ROLLINGFILE = org.apache.log4j.RollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=INFO
log4j.appender.ROLLINGFILE.File=/tmp/fluentd-consumer.log
# keep log files up to 1G
log4j.appender.ROLLINGFILE.MaxFileSize=20MB
log4j.appender.ROLLINGFILE.MaxBackupIndex=50
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [%t] (%C:%L) %-5p %m%n

動作確認

 Docker Composeのサービスを起動します。

$ docker-compose up -d

 td-agentのバージョンを確認します。

$ docker-compose exec td-agent2 td-agent --version
td-agent 0.12.35

 Spark Streamingを使ったウィンドウ集計のサンプルのようにfluentd.consumer.topicsに指定したtopicへJSONフォーマットでデータを送信します。

 テストとしてkafka-console-producerから直接JSONを送信してみます。

$ docker-compose exec kafka-stack kafka-console-producer \
    --broker-list localhost:9092 \
    --topic sensortag-sink

 コマンドを実行後の待機状態でJSON文字列を入力します。

{"bid": "B0:B4:48:BD:DA:03", "time": 1501654353, "humidity": 27.152099609375, "objecttemp": 21.6875, "ambient": 27.09375, "rh": 78.4423828125}

 td-agentはファイルバッファを作成してデフォルトでは5分間隔でTreasure Dataへデータがアップロードします。

$ docker-compose exec td-agent2 ls /var/log/td-agent/buffer
td.sensortag_dev.sensortag_sink.b555bf24951c65554.log

treasure-data.png

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away