LoginSignup
9
12

More than 5 years have passed since last update.

Apache Kafka/Storm連携を試してみる

Last updated at Posted at 2016-02-21

これまで、Apache Storm動作を試してきました。

今回は、さらに、Apache Kafka連携によるストリーム処理の動作イメージを確認してみたいと思います。実際のApache Kafka/Storm連携イメージは、こんな感じです。
Qiita-kafkaStorm連携図.001.jpeg

⬛︎ Kafka動作環境をつくる

以前の記事"Mac OS X環境で、Apache Kafkaの基本動作を試してみる"とほぼ、同じ手順です。スタンドアロン環境としました。

  • kafka本体をインストールします。
$ brew install Caskroom/cask/java
$ brew install kafka
  • kafka設定ファイル編集します。
$ vi /usr/local/etc/kafka/server.properties
  ...
broker.id=1
  ...
  • zookeeperを起動します。
$ zkServer start
JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
  • kafkaを起動します。
$ kafka-server-start.sh /usr/local/etc/kafka/server.properties
  • topic: "kafkaStorm"を作成します。そして、topic"kafkaStorm"が正しく作成されたことを確認します。
$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaStorm
Created topic "kafkaStorm".

$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafkaStorm
Topic:kafkaStorm    PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: kafkaStorm   Partition: 0    Leader: 1   Replicas: 1 Isr: 1
  • kaka-pythonライブラリをインストールします。
$ pip install kafka-python==0.9.4
  • KafkaのProducer側サンプルアプリ"kafkaStorm_producer.py"を配備します。
kafkaStorm_producer.py
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer
from time import sleep
from datetime import datetime

kafka = KafkaClient("localhost:9092")

producer = SimpleProducer(kafka)

while 1:
  # "kafkaStorm" is the name of our topic
  now = "It is " + str(datetime.now().time())
  print now
  producer.send_messages("kafkaStorm", now )
  sleep(1)

⬛︎ Storm動作環境をつくる

以前の記事"Mac OS X環境で、Apache Stormの基本動作を試してみる"とほぼ、同じ手順です。

  • storm本体を"/usr/local/storm"にインストールします。
$ brew install wget
$ cd /usr/local/
$ wget http://ftp.jaist.ac.jp/pub/apache/storm/apache-storm-0.10.0/apache-storm-0.10.0.tar.gz

$ tar zxvf apache-storm-0.10.0.tar.gz
$ ln -s apache-storm-0.10.0 storm
$ rm apache-storm-0.10.0.tar.gz
  • storm環境設定を行います。(インストール先に応じて、"storm.local.dir"を変更します)
$ cd storm/conf
$ vi storm.yaml

...(snip)

storm.zookeeper.servers:
  - "127.0.0.1"

storm.local.dir: "/usr/local/storm"

nimbus.host: "127.0.0.1"

supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
  • stormコマンドが簡易に起動できるように、パス設定を行っておきます。
$ vi $HOME/.profile

...(snip)

export PATH=$PATH:/usr/local/storm/bin
  • "Nimbus"を起動します。
$ storm nimbus
Running: java -server -Ddaemon.name=nimbus -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/storm/conf -Xmx1024m -Dlogfile.name=nimbus.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.daemon.nimbus
  • "Supervisor"を起動します。
$ storm supervisor
Running: java -server -Ddaemon.name=supervisor -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/storm/conf -Xmx256m -Dlogfile.name=supervisor.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.daemon.supervisor
  • "UI"を起動します。
$ storm ui
Running: java -server -Ddaemon.name=ui -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/apache-storm-0.10.0:/usr/local/storm/conf -Xmx768m -Dlogfile.name=ui.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.ui.core
  • Strom Supervisorで動作させる"ExclamationTopology"環境を作成します。sbtコマンドを用いたビルド処理が完了したことを確認しておきます。
$ git clone https://github.com/ttsubo/scala-storm-starter.git
$ cd scala-storm-starter/
$ git checkout KafkaSpout
$ sbt compile package assembly
$ ls -l target/scala-2.11/
total 34928
drwxr-xr-x  3 ttsubo  staff       102  2 19 18:26 cache
drwxr-xr-x  3 ttsubo  staff       102  2 20 13:29 classes
-rw-r--r--  1 ttsubo  staff  17874229  2 20 17:28 scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar
-rw-r--r--  1 ttsubo  staff      7597  2 20 17:28 scala-storm-starter_2.11-0.0.2-SNAPSHOT.jar
drwxr-xr-x  2 ttsubo  staff        68  2 20 08:58 test-classes

⬛︎ "ExclamationTopology"をローカルモードで試してみる

早速、Apache Kafka/Storm連携によるストリーム処理の動作イメージを確認してみます。単に、現在時刻を1秒ごとに通知する程度のユーザアプリなので、ストリームと呼べるか疑わしいですが...

  • 先ほど配備した、KafkaのProducer側サンプルアプリ"kafkaStorm_producer.py"を起動します。
$ python kafkaStorm_producer.py 
It is 09:36:12.434021
It is 09:36:13.457584
It is 09:36:14.460644
It is 09:36:15.464444
It is 09:36:16.469271
It is 09:36:17.472175
It is 09:36:18.478534
It is 09:36:19.485352
It is 09:36:20.489528
It is 09:36:21.491859
It is 09:36:22.498474
It is 09:36:23.505260
It is 09:36:24.507176
It is 09:36:25.513852

... (snip)
  • 続いて、先ほど、"ExclamationTopology"環境を作成したディレクトリに移動して、"ExclamationTopology"をローカルモードで起動します。 kafkaのtopic"kafkaStorm"に保管されたストリームデータを、Storm側のSpout:word-spoutが取得して、そのあと、Bolt:exclaim1および、Bolt:exclaim2では、Tupleを受け取るごとに、"!!!"の文字列を追加していきます。
$ storm jar target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar storm.starter.topology.ExclamationTopology

...(snip)
7202 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout default [It is 09:36:12.434021]
7203 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 4 TUPLE: source: word-spout:7, stream: default, id: {-2736707202019925397=8535453313544767994}, [It is 09:36:12.434021]
7203 [Thread-24-exclaim1] INFO  b.s.d.executor - Processing received message FOR 4 TUPLE: source: word-spout:7, stream: default, id: {-2736707202019925397=8535453313544767994}, [It is 09:36:12.434021]
7204 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout __ack_init [-2736707202019925397 8535453313544767994 7]
7204 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-2736707202019925397 8535453313544767994 7]
7204 [Thread-24-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [It is 09:36:12.434021!!!]
7204 [Thread-16-__acker] INFO  b.s.d.executor - Processing received message FOR 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-2736707202019925397 8535453313544767994 7]
7205 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout default [It is 09:36:13.457584]
7205 [Thread-16-__acker] INFO  b.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-2736707202019925397 8535453313544767994 7]
7205 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: word-spout:7, stream: default, id: {-7111756359690180959=8707010670879263031}, [It is 09:36:13.457584]
7205 [Thread-16-__acker] INFO  b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: __ack_init, id: {}, [-2736707202019925397 8535453313544767994 7] TASK: 1 DELTA: 
7205 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout __ack_init [-7111756359690180959 8707010670879263031 7]
7205 [Thread-10-exclaim1] INFO  b.s.d.executor - Processing received message FOR 2 TUPLE: source: word-spout:7, stream: default, id: {-7111756359690180959=8707010670879263031}, [It is 09:36:13.457584]
7205 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-7111756359690180959 8707010670879263031 7]
7205 [Thread-10-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [It is 09:36:13.457584!!!]
7205 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout default [It is 09:36:14.460644]
7205 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word-spout:7, stream: default, id: {6911314124032831513=2770314375009569340}, [It is 09:36:14.460644]
7205 [Thread-10-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:2, stream: default, id: {-7111756359690180959=-7992097948249303649}, [It is 09:36:13.457584!!!]
7205 [Thread-16-__acker] INFO  b.s.d.executor - Processing received message FOR 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-7111756359690180959 8707010670879263031 7]
7206 [Thread-10-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 __ack_ack [-7111756359690180959 -1602198103229490008]
7206 [Thread-16-__acker] INFO  b.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-7111756359690180959 8707010670879263031 7]
7206 [Thread-16-__acker] INFO  b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: __ack_init, id: {}, [-7111756359690180959 8707010670879263031 7] TASK: 1 DELTA: 
7206 [Thread-10-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim1:2, stream: __ack_ack, id: {}, [-7111756359690180959 -1602198103229490008]
7206 [Thread-14-exclaim1] INFO  b.s.d.executor - Processing received message FOR 3 TUPLE: source: word-spout:7, stream: default, id: {6911314124032831513=2770314375009569340}, [It is 09:36:14.460644]
7206 [Thread-10-exclaim1] INFO  b.s.d.executor - BOLT ack TASK: 2 TIME:  TUPLE: source: word-spout:7, stream: default, id: {-7111756359690180959=8707010670879263031}, [It is 09:36:13.457584]
7206 [Thread-14-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [It is 09:36:14.460644!!!]
7206 [Thread-10-exclaim1] INFO  b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: default, id: {-7111756359690180959=8707010670879263031}, [It is 09:36:13.457584] TASK: 2 DELTA: 
7206 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout __ack_init [6911314124032831513 2770314375009569340 7]
7206 [Thread-18-exclaim2] INFO  b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:2, stream: default, id: {-7111756359690180959=-7992097948249303649}, [It is 09:36:13.457584!!!]
7206 [Thread-14-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:3, stream: default, id: {6911314124032831513=6305416489060296620}, [It is 09:36:14.460644!!!]
7206 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [6911314124032831513 2770314375009569340 7]
7207 [Thread-18-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [It is 09:36:13.457584!!!!!!]
7207 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout default [It is 09:36:15.464444]
7207 [Thread-18-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 __ack_ack [-7111756359690180959 -7992097948249303649]
7207 [Thread-14-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 __ack_ack [6911314124032831513 8211035036717084048]
7207 [Thread-14-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim1:3, stream: __ack_ack, id: {}, [6911314124032831513 8211035036717084048]
7207 [Thread-18-exclaim2] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [-7111756359690180959 -7992097948249303649]
7207 [Thread-16-__acker] INFO  b.s.d.executor - Processing received message FOR 1 TUPLE: source: exclaim1:2, stream: __ack_ack, id: {}, [-7111756359690180959 -1602198103229490008]
7207 [Thread-14-exclaim1] INFO  b.s.d.executor - BOLT ack TASK: 3 TIME:  TUPLE: source: word-spout:7, stream: default, id: {6911314124032831513=2770314375009569340}, [It is 09:36:14.460644]
7207 [Thread-18-exclaim2] INFO  b.s.d.executor - BOLT ack TASK: 6 TIME:  TUPLE: source: exclaim1:2, stream: default, id: {-7111756359690180959=-7992097948249303649}, [It is 09:36:13.457584!!!]
7207 [Thread-16-__acker] INFO  b.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: exclaim1:2, stream: __ack_ack, id: {}, [-7111756359690180959 -1602198103229490008]
7207 [Thread-14-exclaim1] INFO  b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: default, id: {6911314124032831513=2770314375009569340}, [It is 09:36:14.460644] TASK: 3 DELTA: 
7207 [Thread-18-exclaim2] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: default, id: {-7111756359690180959=-7992097948249303649}, [It is 09:36:13.457584!!!] TASK: 6 DELTA: 
7207 [Thread-16-__acker] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: __ack_ack, id: {}, [-7111756359690180959 -1602198103229490008] TASK: 1 DELTA: 
7207 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 4 TUPLE: source: word-spout:7, stream: default, id: {4141217291662411057=-1021888700141355268}, [It is 09:36:15.464444]
7207 [Thread-18-exclaim2] INFO  b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:3, stream: default, id: {6911314124032831513=6305416489060296620}, [It is 09:36:14.460644!!!]
7208 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout __ack_init [4141217291662411057 -1021888700141355268 7]
7207 [Thread-16-__acker] INFO  b.s.d.executor - Processing received message FOR 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [6911314124032831513 2770314375009569340 7]
7208 [Thread-18-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [It is 09:36:14.460644!!!!!!]
7208 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [4141217291662411057 -1021888700141355268 7]
7208 [Thread-16-__acker] INFO  b.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [6911314124032831513 2770314375009569340 7]
7208 [Thread-18-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 __ack_ack [6911314124032831513 6305416489060296620]
7208 [Thread-16-__acker] INFO  b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: __ack_init, id: {}, [6911314124032831513 2770314375009569340 7] TASK: 1 DELTA: 
7208 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout default [It is 09:36:16.469271]
7208 [Thread-16-__acker] INFO  b.s.d.executor - Processing received message FOR 1 TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [-7111756359690180959 -7992097948249303649]
7208 [Thread-18-exclaim2] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [6911314124032831513 6305416489060296620]
7208 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: word-spout:7, stream: default, id: {3052868074563147181=3745382070922932756}, [It is 09:36:16.469271]
7208 [Thread-18-exclaim2] INFO  b.s.d.executor - BOLT ack TASK: 6 TIME:  TUPLE: source: exclaim1:3, stream: default, id: {6911314124032831513=6305416489060296620}, [It is 09:36:14.460644!!!]
7209 [Thread-16-__acker] INFO  b.s.d.task - Emitting direct: 7; __acker __ack_ack [-7111756359690180959]
7209 [Thread-18-exclaim2] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:3, stream: default, id: {6911314124032831513=6305416489060296620}, [It is 09:36:14.460644!!!] TASK: 6 DELTA: 
7209 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout __ack_init [3052868074563147181 3745382070922932756 7]
7209 [Thread-10-exclaim1] INFO  b.s.d.executor - Processing received message FOR 2 TUPLE: source: word-spout:7, stream: default, id: {3052868074563147181=3745382070922932756}, [It is 09:36:16.469271]
7209 [Thread-16-__acker] INFO  b.s.d.executor - TRANSFERING tuple TASK: 7 TUPLE: source: __acker:1, stream: __ack_ack, id: {}, [-7111756359690180959]
7209 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [3052868074563147181 3745382070922932756 7]
7209 [Thread-16-__acker] INFO  b.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [-7111756359690180959 -7992097948249303649]
7209 [Thread-10-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [It is 09:36:16.469271!!!]
7209 [Thread-12-word-spout] INFO  b.s.d.executor - Processing received message FOR 7 TUPLE: source: __acker:1, stream: __ack_ack, id: {}, [-7111756359690180959]
7209 [Thread-10-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 5 TUPLE: source: exclaim1:2, stream: default, id: {3052868074563147181=-6638282129905823771}, [It is 09:36:16.469271!!!]
7209 [Thread-16-__acker] INFO  b.s.d.executor - Execute done TUPLE source: exclaim2:6, stream: __ack_ack, id: {}, [-7111756359690180959 -7992097948249303649] TASK: 1 DELTA: 
7209 [Thread-10-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 __ack_ack [3052868074563147181 -8063043445652341263]
7209 [Thread-16-__acker] INFO  b.s.d.executor - Processing received message FOR 1 TUPLE: source: exclaim1:3, stream: __ack_ack, id: {}, [6911314124032831513 8211035036717084048]
7209 [Thread-10-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim1:2, stream: __ack_ack, id: {}, [3052868074563147181 -8063043445652341263]
7209 [Thread-16-__acker] INFO  b.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: exclaim1:3, stream: __ack_ack, id: {}, [6911314124032831513 8211035036717084048]
7209 [Thread-10-exclaim1] INFO  b.s.d.executor - BOLT ack TASK: 2 TIME:  TUPLE: source: word-spout:7, stream: default, id: {3052868074563147181=3745382070922932756}, [It is 09:36:16.469271]
7209 [Thread-12-word-spout] INFO  b.s.d.executor - SPOUT Acking message -7111756359690180959 storm.kafka.PartitionManager$KafkaMessageId@2e10053a
7209 [Thread-10-exclaim1] INFO  b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: default, id: {3052868074563147181=3745382070922932756}, [It is 09:36:16.469271] TASK: 2 DELTA: 
7209 [Thread-16-__acker] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:3, stream: __ack_ack, id: {}, [6911314124032831513 8211035036717084048] TASK: 1 DELTA: 
7210 [Thread-16-__acker] INFO  b.s.d.executor - Processing received message FOR 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [4141217291662411057 -1021888700141355268 7]
7210 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout default [It is 09:36:17.472175]
7210 [Thread-22-exclaim2] INFO  b.s.d.executor - Processing received message FOR 5 TUPLE: source: exclaim1:2, stream: default, id: {3052868074563147181=-6638282129905823771}, [It is 09:36:16.469271!!!]
7210 [Thread-12-word-spout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word-spout:7, stream: default, id: {-7091949305103491056=3997162068754701889}, [It is 09:36:17.472175]
7210 [Thread-16-__acker] INFO  b.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [4141217291662411057 -1021888700141355268 7]
7210 [Thread-22-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [It is 09:36:16.469271!!!!!!]
7210 [Thread-16-__acker] INFO  b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: __ack_init, id: {}, [4141217291662411057 -1021888700141355268 7] TASK: 1 DELTA: 
7210 [Thread-12-word-spout] INFO  b.s.d.task - Emitting: word-spout __ack_init [-7091949305103491056 3997162068754701889 7]
7210 [Thread-22-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 __ack_ack [3052868074563147181 -6638282129905823771]
7211 [Thread-16-__acker] INFO  b.s.d.executor - Processing received message FOR 1 TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [6911314124032831513 6305416489060296620]
7211 [Thread-24-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:4, stream: default, id: {-2736707202019925397=8147493952586947714}, [It is 09:36:12.434021!!!]
7211 [Thread-22-exclaim2] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim2:5, stream: __ack_ack, id: {}, [3052868074563147181 -6638282129905823771]
7211 [Thread-16-__acker] INFO  b.s.d.task - Emitting direct: 7; __acker __ack_ack [6911314124032831513]
7211 [Thread-24-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 __ack_ack [-2736707202019925397 533028969185990008]
7211 [Thread-22-exclaim2] INFO  b.s.d.executor - BOLT ack TASK: 5 TIME:  TUPLE: source: exclaim1:2, stream: default, id: {3052868074563147181=-6638282129905823771}, [It is 09:36:16.469271!!!]
7211 [Thread-14-exclaim1] INFO  b.s.d.executor - Processing received message FOR 3 TUPLE: source: word-spout:7, stream: default, id: {-7091949305103491056=3997162068754701889}, [It is 09:36:17.472175]
7211 [Thread-16-__acker] INFO  b.s.d.executor - TRANSFERING tuple TASK: 7 TUPLE: source: __acker:1, stream: __ack_ack, id: {}, [6911314124032831513]
7211 [Thread-18-exclaim2] INFO  b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:4, stream: default, id: {-2736707202019925397=8147493952586947714}, [It is 09:36:12.434021!!!]
7211 [Thread-22-exclaim2] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: default, id: {3052868074563147181=-6638282129905823771}, [It is 09:36:16.469271!!!] TASK: 5 DELTA: 
7211 [Thread-18-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [It is 09:36:12.434021!!!!!!]

...(snip)

非常にわかりにくいですが、word-spoutで[It is 09:36:12.434021]を処理して、exclaim1で、[It is 09:36:12.434021!!!]と処理して、さらに、exclaim2で、[It is 09:36:12.434021!!!!!!]と処理している様子が確認できました。

⬛︎ "ExclamationTopology"をリモートモードで試してみる

  • 引き続き、"ExclamationTopology"をリモートモードで動作させてみます。 今回は、トポロジ名として、"ExclamationTopology"としてTopogyを動作させました。
$ storm jar target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar storm.starter.topology.ExclamationTopology ExclamationTopology
Running: java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar:/usr/local/storm/conf:/usr/local/apache-storm-0.10.0/bin -Dstorm.jar=target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar storm.starter.topology.ExclamationTopology ExclamationTopology
438  [main] INFO  b.s.u.Utils - Using defaults.yaml from resources
495  [main] INFO  b.s.u.Utils - Using storm.yaml from resources
521  [main] INFO  b.s.u.Utils - Using defaults.yaml from resources
529  [main] INFO  b.s.u.Utils - Using storm.yaml from resources
531  [main] INFO  b.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -6085723166361804975:-8023199696530217358
532  [main] INFO  b.s.s.a.AuthUtils - Got AutoCreds []
548  [main] INFO  b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
561  [main] INFO  b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
580  [main] INFO  b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
586  [main] INFO  b.s.StormSubmitter - Uploading topology jar target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar to assigned location: /usr/local/storm/nimbus/inbox/stormjar-e9d58ebd-0ec2-49df-8745-fcf9f931dd47.jar
733  [main] INFO  b.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /usr/local/storm/nimbus/inbox/stormjar-e9d58ebd-0ec2-49df-8745-fcf9f931dd47.jar
733  [main] INFO  b.s.StormSubmitter - Submitting topology ExclamationTopology in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-6085723166361804975:-8023199696530217358","topology.workers":3,"topology.debug":true}
889  [main] INFO  b.s.StormSubmitter - Finished submitting topology: ExclamationTopology
  • Storm UI画面にて、Storm動作環境を確認してみます。 "Topology Summary"欄に、"ExclamationTopology"の情報が表示されるようになりました。 Topology_summary.png

さらに、"ExclamationTopology"のサマリ情報として、Spout, boltでのデータ転送量なども確認できるようになりました。
Topology_detail.png

さらに、word-spout(KafkaSpout) → exclaim1(ExclamationBolt) → exclaim2(ExclamationBolt)とストリーム処理の様子も、ビジュアルに確認できるようになりました。
TopologyVisualization.png

  • 最後に、リモートモードで動作中の、"ExclamationTopology"トポロジを停止させます。
$ storm kill ExclamationTopology
Running: java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/storm/conf:/usr/local/apache-storm-0.10.0/bin backtype.storm.command.kill_topology ExclamationTopology
1157 [main] INFO  b.s.u.Utils - Using defaults.yaml from resources
1215 [main] INFO  b.s.u.Utils - Using storm.yaml from resources
1651 [main] INFO  b.s.u.Utils - Using defaults.yaml from resources
1661 [main] INFO  b.s.u.Utils - Using storm.yaml from resources
1672 [main] INFO  b.s.thrift - Connecting to Nimbus at 127.0.0.1:6627 as user: 
1672 [main] INFO  b.s.u.Utils - Using defaults.yaml from resources
1678 [main] INFO  b.s.u.Utils - Using storm.yaml from resources
1691 [main] INFO  b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
1736 [main] INFO  b.s.c.kill-topology - Killed topology: ExclamationTopology

⬛︎ Apache Kafka/Storm連携のミニ解説

Apache Kafka/Storm連携として、Storm側でkafka上のtopic”kafkaStorm”からストリームデータをconsumeするには、ExclamationTopologyにおいて、KafkaSpoutメソッドの動作条件を定義する必要があります。ちなみに、KafkaSpoutメソッドは、こちらのGithubリポジトリ(Github: apache/storm)にあります。
そして、KafkaSpout動作条件を、Zookeeperの分散key-valueストアに保管できるように、ZooKeeperに関わる設定も行う必要があるわけです。

ExclamationTopology.scala
package storm.starter.topology

import backtype.storm.{ Config, LocalCluster, StormSubmitter }
import backtype.storm.testing.TestWordSpout
import backtype.storm.topology.TopologyBuilder
import backtype.storm.utils.Utils
import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts, StringScheme}
import backtype.storm.spout.SchemeAsMultiScheme

object ExclamationTopology {
  def main(args: Array[String]) {
    import storm.starter.bolt.ExclamationBolt

    val builder: TopologyBuilder = new TopologyBuilder()

    val topic = "kafkaStorm"
    val kafkaZkConnect = "127.0.0.1:2181"
    val zkHosts = new ZkHosts(kafkaZkConnect, "/brokers")
    val zkRoot = "/kafkastorm"
    val zkSpoutId = "kafka-spout"
    val kafkaConfig = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId)
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    val kafkaSpout = new KafkaSpout(kafkaConfig)
    val kafkaSpoutId = "word-spout"
    builder.setSpout(kafkaSpoutId, kafkaSpout)

    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping(kafkaSpoutId)
    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1")

    val config = new Config()
    config.setDebug(true)

    if (args != null && args.length > 0) {
      config.setNumWorkers(3)
      StormSubmitter.submitTopology(args(0), config, builder.createTopology())
    } else {
      val cluster: LocalCluster = new LocalCluster()
      cluster.submitTopology("ExclamationTopology", config, builder.createTopology())
      Utils.sleep(5000)
      cluster.killTopology("ExclamationTopology")
      cluster.shutdown()
    }
  }
}

⬛︎ 終わりに

なんとか、Apache Kafka/Storm連携によるストリーム処理の動作イメージが確認することができました。
ただし、Apache Kafka/Storm連携に関わる技術要素が多岐にわたるため、その技術習得は、なかなか容易ではなさそうです。ここでは触れておりませんが、Apache Kafka/Storm連携においては、Apache ZooKeeperの技術知識も必要になってきます。
さらに、Apache Kafka/Storm連携クラスタ運用を目指す場合には、覚えなければいけない技術知識が、膨大に膨れ上がりそうなことがわかりました。このあたりを網羅した解説書の出版に期待したいところです。
今回は、ここまでにします。

⬛︎ 参照元

9
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
12