これまで、Apache Storm動作を試してきました。
今回は、さらに、Apache Kafka連携によるストリーム処理の動作イメージを確認してみたいと思います。実際のApache Kafka/Storm連携イメージは、こんな感じです。
⬛︎ Kafka動作環境をつくる
以前の記事"Mac OS X環境で、Apache Kafkaの基本動作を試してみる"とほぼ、同じ手順です。スタンドアロン環境としました。
- kafka本体をインストールします。
$ brew install Caskroom/cask/java
$ brew install kafka
- kafka設定ファイル編集します。
$ vi /usr/local/etc/kafka/server.properties
...
broker.id=1
...
- zookeeperを起動します。
$ zkServer start
JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
- kafkaを起動します。
$ kafka-server-start.sh /usr/local/etc/kafka/server.properties
- topic: "kafkaStorm"を作成します。そして、topic"kafkaStorm"が正しく作成されたことを確認します。
$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaStorm
Created topic "kafkaStorm".
$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafkaStorm
Topic:kafkaStorm PartitionCount:1 ReplicationFactor:1 Configs:
Topic: kafkaStorm Partition: 0 Leader: 1 Replicas: 1 Isr: 1
- kaka-pythonライブラリをインストールします。
$ pip install kafka-python==0.9.4
- KafkaのProducer側サンプルアプリ"kafkaStorm_producer.py"を配備します。
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer
from time import sleep
from datetime import datetime
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
while 1:
# "kafkaStorm" is the name of our topic
now = "It is " + str(datetime.now().time())
print now
producer.send_messages("kafkaStorm", now )
sleep(1)
⬛︎ Storm動作環境をつくる
以前の記事"Mac OS X環境で、Apache Stormの基本動作を試してみる"とほぼ、同じ手順です。
- storm本体を"/usr/local/storm"にインストールします。
$ brew install wget
$ cd /usr/local/
$ wget http://ftp.jaist.ac.jp/pub/apache/storm/apache-storm-0.10.0/apache-storm-0.10.0.tar.gz
$ tar zxvf apache-storm-0.10.0.tar.gz
$ ln -s apache-storm-0.10.0 storm
$ rm apache-storm-0.10.0.tar.gz
- storm環境設定を行います。(インストール先に応じて、"storm.local.dir"を変更します)
$ cd storm/conf
$ vi storm.yaml
...(snip)
storm.zookeeper.servers:
- "127.0.0.1"
storm.local.dir: "/usr/local/storm"
nimbus.host: "127.0.0.1"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
- stormコマンドが簡易に起動できるように、パス設定を行っておきます。
$ vi $HOME/.profile
...(snip)
export PATH=$PATH:/usr/local/storm/bin
- "Nimbus"を起動します。
$ storm nimbus
Running: java -server -Ddaemon.name=nimbus -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/storm/conf -Xmx1024m -Dlogfile.name=nimbus.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.daemon.nimbus
- "Supervisor"を起動します。
$ storm supervisor
Running: java -server -Ddaemon.name=supervisor -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/storm/conf -Xmx256m -Dlogfile.name=supervisor.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.daemon.supervisor
- "UI"を起動します。
$ storm ui
Running: java -server -Ddaemon.name=ui -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/apache-storm-0.10.0:/usr/local/storm/conf -Xmx768m -Dlogfile.name=ui.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.ui.core
- Strom Supervisorで動作させる"ExclamationTopology"環境を作成します。sbtコマンドを用いたビルド処理が完了したことを確認しておきます。
$ git clone https://github.com/ttsubo/scala-storm-starter.git
$ cd scala-storm-starter/
$ git checkout KafkaSpout
$ sbt compile package assembly
$ ls -l target/scala-2.11/
total 34928
drwxr-xr-x 3 ttsubo staff 102 2 19 18:26 cache
drwxr-xr-x 3 ttsubo staff 102 2 20 13:29 classes
-rw-r--r-- 1 ttsubo staff 17874229 2 20 17:28 scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar
-rw-r--r-- 1 ttsubo staff 7597 2 20 17:28 scala-storm-starter_2.11-0.0.2-SNAPSHOT.jar
drwxr-xr-x 2 ttsubo staff 68 2 20 08:58 test-classes
⬛︎ "ExclamationTopology"をローカルモードで試してみる
早速、Apache Kafka/Storm連携によるストリーム処理の動作イメージを確認してみます。単に、現在時刻を1秒ごとに通知する程度のユーザアプリなので、ストリームと呼べるか疑わしいですが...
- 先ほど配備した、KafkaのProducer側サンプルアプリ"kafkaStorm_producer.py"を起動します。
$ python kafkaStorm_producer.py
It is 09:36:12.434021
It is 09:36:13.457584
It is 09:36:14.460644
It is 09:36:15.464444
It is 09:36:16.469271
It is 09:36:17.472175
It is 09:36:18.478534
It is 09:36:19.485352
It is 09:36:20.489528
It is 09:36:21.491859
It is 09:36:22.498474
It is 09:36:23.505260
It is 09:36:24.507176
It is 09:36:25.513852
... (snip)
- 続いて、先ほど、"ExclamationTopology"環境を作成したディレクトリに移動して、"ExclamationTopology"をローカルモードで起動します。
kafkaのtopic"kafkaStorm"に保管されたストリームデータを、Storm側のSpout:word-spoutが取得して、そのあと、Bolt:exclaim1および、Bolt:exclaim2では、Tupleを受け取るごとに、"!!!"の文字列を追加していきます。
$ storm jar target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar storm.starter.topology.ExclamationTopology
...(snip)
7202 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout default [It is 09:36:12.434021]
7203 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 4 TUPLE: source: word-spout:7, stream: default, id: {-2736707202019925397=8535453313544767994}, [It is 09:36:12.434021]
7203 [Thread-24-exclaim1] INFO b.s.d.executor - Processing received message FOR 4 TUPLE: source: word-spout:7, stream: default, id: {-2736707202019925397=8535453313544767994}, [It is 09:36:12.434021]
7204 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout __ack_init [-2736707202019925397 8535453313544767994 7]
7204 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-2736707202019925397 8535453313544767994 7]
7204 [Thread-24-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [It is 09:36:12.434021!!!]
7204 [Thread-16-__acker] INFO b.s.d.executor - Processing received message FOR 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-2736707202019925397 8535453313544767994 7]
7205 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout default [It is 09:36:13.457584]
7205 [Thread-16-__acker] INFO b.s.d.executor - BOLT ack TASK: 1 TIME: TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-2736707202019925397 8535453313544767994 7]
7205 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: word-spout:7, stream: default, id: {-7111756359690180959=8707010670879263031}, [It is 09:36:13.457584]
7205 [Thread-16-__acker] INFO b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: __ack_init, id: {}, [-2736707202019925397 8535453313544767994 7] TASK: 1 DELTA:
7205 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout __ack_init [-7111756359690180959 8707010670879263031 7]
7205 [Thread-10-exclaim1] INFO b.s.d.executor - Processing received message FOR 2 TUPLE: source: word-spout:7, stream: default, id: {-7111756359690180959=8707010670879263031}, [It is 09:36:13.457584]
7205 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-7111756359690180959 8707010670879263031 7]
7205 [Thread-10-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [It is 09:36:13.457584!!!]
7205 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout default [It is 09:36:14.460644]
7205 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word-spout:7, stream: default, id: {6911314124032831513=2770314375009569340}, [It is 09:36:14.460644]
7205 [Thread-10-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:2, stream: default, id: {-7111756359690180959=-7992097948249303649}, [It is 09:36:13.457584!!!]
7205 [Thread-16-__acker] INFO b.s.d.executor - Processing received message FOR 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-7111756359690180959 8707010670879263031 7]
7206 [Thread-10-exclaim1] INFO b.s.d.task - Emitting: exclaim1 __ack_ack [-7111756359690180959 -1602198103229490008]
7206 [Thread-16-__acker] INFO b.s.d.executor - BOLT ack TASK: 1 TIME: TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [-7111756359690180959 8707010670879263031 7]
7206 [Thread-16-__acker] INFO b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: __ack_init, id: {}, [-7111756359690180959 8707010670879263031 7] TASK: 1 DELTA:
7206 [Thread-10-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim1:2, stream: __ack_ack, id: {}, [-7111756359690180959 -1602198103229490008]
7206 [Thread-14-exclaim1] INFO b.s.d.executor - Processing received message FOR 3 TUPLE: source: word-spout:7, stream: default, id: {6911314124032831513=2770314375009569340}, [It is 09:36:14.460644]
7206 [Thread-10-exclaim1] INFO b.s.d.executor - BOLT ack TASK: 2 TIME: TUPLE: source: word-spout:7, stream: default, id: {-7111756359690180959=8707010670879263031}, [It is 09:36:13.457584]
7206 [Thread-14-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [It is 09:36:14.460644!!!]
7206 [Thread-10-exclaim1] INFO b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: default, id: {-7111756359690180959=8707010670879263031}, [It is 09:36:13.457584] TASK: 2 DELTA:
7206 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout __ack_init [6911314124032831513 2770314375009569340 7]
7206 [Thread-18-exclaim2] INFO b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:2, stream: default, id: {-7111756359690180959=-7992097948249303649}, [It is 09:36:13.457584!!!]
7206 [Thread-14-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:3, stream: default, id: {6911314124032831513=6305416489060296620}, [It is 09:36:14.460644!!!]
7206 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [6911314124032831513 2770314375009569340 7]
7207 [Thread-18-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [It is 09:36:13.457584!!!!!!]
7207 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout default [It is 09:36:15.464444]
7207 [Thread-18-exclaim2] INFO b.s.d.task - Emitting: exclaim2 __ack_ack [-7111756359690180959 -7992097948249303649]
7207 [Thread-14-exclaim1] INFO b.s.d.task - Emitting: exclaim1 __ack_ack [6911314124032831513 8211035036717084048]
7207 [Thread-14-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim1:3, stream: __ack_ack, id: {}, [6911314124032831513 8211035036717084048]
7207 [Thread-18-exclaim2] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [-7111756359690180959 -7992097948249303649]
7207 [Thread-16-__acker] INFO b.s.d.executor - Processing received message FOR 1 TUPLE: source: exclaim1:2, stream: __ack_ack, id: {}, [-7111756359690180959 -1602198103229490008]
7207 [Thread-14-exclaim1] INFO b.s.d.executor - BOLT ack TASK: 3 TIME: TUPLE: source: word-spout:7, stream: default, id: {6911314124032831513=2770314375009569340}, [It is 09:36:14.460644]
7207 [Thread-18-exclaim2] INFO b.s.d.executor - BOLT ack TASK: 6 TIME: TUPLE: source: exclaim1:2, stream: default, id: {-7111756359690180959=-7992097948249303649}, [It is 09:36:13.457584!!!]
7207 [Thread-16-__acker] INFO b.s.d.executor - BOLT ack TASK: 1 TIME: TUPLE: source: exclaim1:2, stream: __ack_ack, id: {}, [-7111756359690180959 -1602198103229490008]
7207 [Thread-14-exclaim1] INFO b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: default, id: {6911314124032831513=2770314375009569340}, [It is 09:36:14.460644] TASK: 3 DELTA:
7207 [Thread-18-exclaim2] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: default, id: {-7111756359690180959=-7992097948249303649}, [It is 09:36:13.457584!!!] TASK: 6 DELTA:
7207 [Thread-16-__acker] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: __ack_ack, id: {}, [-7111756359690180959 -1602198103229490008] TASK: 1 DELTA:
7207 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 4 TUPLE: source: word-spout:7, stream: default, id: {4141217291662411057=-1021888700141355268}, [It is 09:36:15.464444]
7207 [Thread-18-exclaim2] INFO b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:3, stream: default, id: {6911314124032831513=6305416489060296620}, [It is 09:36:14.460644!!!]
7208 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout __ack_init [4141217291662411057 -1021888700141355268 7]
7207 [Thread-16-__acker] INFO b.s.d.executor - Processing received message FOR 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [6911314124032831513 2770314375009569340 7]
7208 [Thread-18-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [It is 09:36:14.460644!!!!!!]
7208 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [4141217291662411057 -1021888700141355268 7]
7208 [Thread-16-__acker] INFO b.s.d.executor - BOLT ack TASK: 1 TIME: TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [6911314124032831513 2770314375009569340 7]
7208 [Thread-18-exclaim2] INFO b.s.d.task - Emitting: exclaim2 __ack_ack [6911314124032831513 6305416489060296620]
7208 [Thread-16-__acker] INFO b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: __ack_init, id: {}, [6911314124032831513 2770314375009569340 7] TASK: 1 DELTA:
7208 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout default [It is 09:36:16.469271]
7208 [Thread-16-__acker] INFO b.s.d.executor - Processing received message FOR 1 TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [-7111756359690180959 -7992097948249303649]
7208 [Thread-18-exclaim2] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [6911314124032831513 6305416489060296620]
7208 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: word-spout:7, stream: default, id: {3052868074563147181=3745382070922932756}, [It is 09:36:16.469271]
7208 [Thread-18-exclaim2] INFO b.s.d.executor - BOLT ack TASK: 6 TIME: TUPLE: source: exclaim1:3, stream: default, id: {6911314124032831513=6305416489060296620}, [It is 09:36:14.460644!!!]
7209 [Thread-16-__acker] INFO b.s.d.task - Emitting direct: 7; __acker __ack_ack [-7111756359690180959]
7209 [Thread-18-exclaim2] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:3, stream: default, id: {6911314124032831513=6305416489060296620}, [It is 09:36:14.460644!!!] TASK: 6 DELTA:
7209 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout __ack_init [3052868074563147181 3745382070922932756 7]
7209 [Thread-10-exclaim1] INFO b.s.d.executor - Processing received message FOR 2 TUPLE: source: word-spout:7, stream: default, id: {3052868074563147181=3745382070922932756}, [It is 09:36:16.469271]
7209 [Thread-16-__acker] INFO b.s.d.executor - TRANSFERING tuple TASK: 7 TUPLE: source: __acker:1, stream: __ack_ack, id: {}, [-7111756359690180959]
7209 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [3052868074563147181 3745382070922932756 7]
7209 [Thread-16-__acker] INFO b.s.d.executor - BOLT ack TASK: 1 TIME: TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [-7111756359690180959 -7992097948249303649]
7209 [Thread-10-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [It is 09:36:16.469271!!!]
7209 [Thread-12-word-spout] INFO b.s.d.executor - Processing received message FOR 7 TUPLE: source: __acker:1, stream: __ack_ack, id: {}, [-7111756359690180959]
7209 [Thread-10-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 5 TUPLE: source: exclaim1:2, stream: default, id: {3052868074563147181=-6638282129905823771}, [It is 09:36:16.469271!!!]
7209 [Thread-16-__acker] INFO b.s.d.executor - Execute done TUPLE source: exclaim2:6, stream: __ack_ack, id: {}, [-7111756359690180959 -7992097948249303649] TASK: 1 DELTA:
7209 [Thread-10-exclaim1] INFO b.s.d.task - Emitting: exclaim1 __ack_ack [3052868074563147181 -8063043445652341263]
7209 [Thread-16-__acker] INFO b.s.d.executor - Processing received message FOR 1 TUPLE: source: exclaim1:3, stream: __ack_ack, id: {}, [6911314124032831513 8211035036717084048]
7209 [Thread-10-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim1:2, stream: __ack_ack, id: {}, [3052868074563147181 -8063043445652341263]
7209 [Thread-16-__acker] INFO b.s.d.executor - BOLT ack TASK: 1 TIME: TUPLE: source: exclaim1:3, stream: __ack_ack, id: {}, [6911314124032831513 8211035036717084048]
7209 [Thread-10-exclaim1] INFO b.s.d.executor - BOLT ack TASK: 2 TIME: TUPLE: source: word-spout:7, stream: default, id: {3052868074563147181=3745382070922932756}, [It is 09:36:16.469271]
7209 [Thread-12-word-spout] INFO b.s.d.executor - SPOUT Acking message -7111756359690180959 storm.kafka.PartitionManager$KafkaMessageId@2e10053a
7209 [Thread-10-exclaim1] INFO b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: default, id: {3052868074563147181=3745382070922932756}, [It is 09:36:16.469271] TASK: 2 DELTA:
7209 [Thread-16-__acker] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:3, stream: __ack_ack, id: {}, [6911314124032831513 8211035036717084048] TASK: 1 DELTA:
7210 [Thread-16-__acker] INFO b.s.d.executor - Processing received message FOR 1 TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [4141217291662411057 -1021888700141355268 7]
7210 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout default [It is 09:36:17.472175]
7210 [Thread-22-exclaim2] INFO b.s.d.executor - Processing received message FOR 5 TUPLE: source: exclaim1:2, stream: default, id: {3052868074563147181=-6638282129905823771}, [It is 09:36:16.469271!!!]
7210 [Thread-12-word-spout] INFO b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word-spout:7, stream: default, id: {-7091949305103491056=3997162068754701889}, [It is 09:36:17.472175]
7210 [Thread-16-__acker] INFO b.s.d.executor - BOLT ack TASK: 1 TIME: TUPLE: source: word-spout:7, stream: __ack_init, id: {}, [4141217291662411057 -1021888700141355268 7]
7210 [Thread-22-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [It is 09:36:16.469271!!!!!!]
7210 [Thread-16-__acker] INFO b.s.d.executor - Execute done TUPLE source: word-spout:7, stream: __ack_init, id: {}, [4141217291662411057 -1021888700141355268 7] TASK: 1 DELTA:
7210 [Thread-12-word-spout] INFO b.s.d.task - Emitting: word-spout __ack_init [-7091949305103491056 3997162068754701889 7]
7210 [Thread-22-exclaim2] INFO b.s.d.task - Emitting: exclaim2 __ack_ack [3052868074563147181 -6638282129905823771]
7211 [Thread-16-__acker] INFO b.s.d.executor - Processing received message FOR 1 TUPLE: source: exclaim2:6, stream: __ack_ack, id: {}, [6911314124032831513 6305416489060296620]
7211 [Thread-24-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:4, stream: default, id: {-2736707202019925397=8147493952586947714}, [It is 09:36:12.434021!!!]
7211 [Thread-22-exclaim2] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: exclaim2:5, stream: __ack_ack, id: {}, [3052868074563147181 -6638282129905823771]
7211 [Thread-16-__acker] INFO b.s.d.task - Emitting direct: 7; __acker __ack_ack [6911314124032831513]
7211 [Thread-24-exclaim1] INFO b.s.d.task - Emitting: exclaim1 __ack_ack [-2736707202019925397 533028969185990008]
7211 [Thread-22-exclaim2] INFO b.s.d.executor - BOLT ack TASK: 5 TIME: TUPLE: source: exclaim1:2, stream: default, id: {3052868074563147181=-6638282129905823771}, [It is 09:36:16.469271!!!]
7211 [Thread-14-exclaim1] INFO b.s.d.executor - Processing received message FOR 3 TUPLE: source: word-spout:7, stream: default, id: {-7091949305103491056=3997162068754701889}, [It is 09:36:17.472175]
7211 [Thread-16-__acker] INFO b.s.d.executor - TRANSFERING tuple TASK: 7 TUPLE: source: __acker:1, stream: __ack_ack, id: {}, [6911314124032831513]
7211 [Thread-18-exclaim2] INFO b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:4, stream: default, id: {-2736707202019925397=8147493952586947714}, [It is 09:36:12.434021!!!]
7211 [Thread-22-exclaim2] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: default, id: {3052868074563147181=-6638282129905823771}, [It is 09:36:16.469271!!!] TASK: 5 DELTA:
7211 [Thread-18-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [It is 09:36:12.434021!!!!!!]
...(snip)
非常にわかりにくいですが、word-spoutで[It is 09:36:12.434021]を処理して、exclaim1で、[It is 09:36:12.434021!!!]と処理して、さらに、exclaim2で、[It is 09:36:12.434021!!!!!!]と処理している様子が確認できました。
⬛︎ "ExclamationTopology"をリモートモードで試してみる
- 引き続き、"ExclamationTopology"をリモートモードで動作させてみます。
今回は、トポロジ名として、"ExclamationTopology"としてTopogyを動作させました。
$ storm jar target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar storm.starter.topology.ExclamationTopology ExclamationTopology
Running: java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar:/usr/local/storm/conf:/usr/local/apache-storm-0.10.0/bin -Dstorm.jar=target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar storm.starter.topology.ExclamationTopology ExclamationTopology
438 [main] INFO b.s.u.Utils - Using defaults.yaml from resources
495 [main] INFO b.s.u.Utils - Using storm.yaml from resources
521 [main] INFO b.s.u.Utils - Using defaults.yaml from resources
529 [main] INFO b.s.u.Utils - Using storm.yaml from resources
531 [main] INFO b.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -6085723166361804975:-8023199696530217358
532 [main] INFO b.s.s.a.AuthUtils - Got AutoCreds []
548 [main] INFO b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
561 [main] INFO b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
580 [main] INFO b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
586 [main] INFO b.s.StormSubmitter - Uploading topology jar target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar to assigned location: /usr/local/storm/nimbus/inbox/stormjar-e9d58ebd-0ec2-49df-8745-fcf9f931dd47.jar
733 [main] INFO b.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /usr/local/storm/nimbus/inbox/stormjar-e9d58ebd-0ec2-49df-8745-fcf9f931dd47.jar
733 [main] INFO b.s.StormSubmitter - Submitting topology ExclamationTopology in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-6085723166361804975:-8023199696530217358","topology.workers":3,"topology.debug":true}
889 [main] INFO b.s.StormSubmitter - Finished submitting topology: ExclamationTopology
さらに、"ExclamationTopology"のサマリ情報として、Spout, boltでのデータ転送量なども確認できるようになりました。
さらに、word-spout(KafkaSpout) → exclaim1(ExclamationBolt) → exclaim2(ExclamationBolt)とストリーム処理の様子も、ビジュアルに確認できるようになりました。
- 最後に、リモートモードで動作中の、"ExclamationTopology"トポロジを停止させます。
$ storm kill ExclamationTopology
Running: java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/storm/conf:/usr/local/apache-storm-0.10.0/bin backtype.storm.command.kill_topology ExclamationTopology
1157 [main] INFO b.s.u.Utils - Using defaults.yaml from resources
1215 [main] INFO b.s.u.Utils - Using storm.yaml from resources
1651 [main] INFO b.s.u.Utils - Using defaults.yaml from resources
1661 [main] INFO b.s.u.Utils - Using storm.yaml from resources
1672 [main] INFO b.s.thrift - Connecting to Nimbus at 127.0.0.1:6627 as user:
1672 [main] INFO b.s.u.Utils - Using defaults.yaml from resources
1678 [main] INFO b.s.u.Utils - Using storm.yaml from resources
1691 [main] INFO b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
1736 [main] INFO b.s.c.kill-topology - Killed topology: ExclamationTopology
⬛︎ Apache Kafka/Storm連携のミニ解説
Apache Kafka/Storm連携として、Storm側でkafka上のtopic”kafkaStorm”からストリームデータをconsumeするには、ExclamationTopologyにおいて、KafkaSpoutメソッドの動作条件を定義する必要があります。ちなみに、KafkaSpoutメソッドは、こちらのGithubリポジトリ(Github: apache/storm)にあります。
そして、KafkaSpout動作条件を、Zookeeperの分散key-valueストアに保管できるように、ZooKeeperに関わる設定も行う必要があるわけです。
package storm.starter.topology
import backtype.storm.{ Config, LocalCluster, StormSubmitter }
import backtype.storm.testing.TestWordSpout
import backtype.storm.topology.TopologyBuilder
import backtype.storm.utils.Utils
import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts, StringScheme}
import backtype.storm.spout.SchemeAsMultiScheme
object ExclamationTopology {
def main(args: Array[String]) {
import storm.starter.bolt.ExclamationBolt
val builder: TopologyBuilder = new TopologyBuilder()
val topic = "kafkaStorm"
val kafkaZkConnect = "127.0.0.1:2181"
val zkHosts = new ZkHosts(kafkaZkConnect, "/brokers")
val zkRoot = "/kafkastorm"
val zkSpoutId = "kafka-spout"
val kafkaConfig = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId)
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
val kafkaSpout = new KafkaSpout(kafkaConfig)
val kafkaSpoutId = "word-spout"
builder.setSpout(kafkaSpoutId, kafkaSpout)
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping(kafkaSpoutId)
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1")
val config = new Config()
config.setDebug(true)
if (args != null && args.length > 0) {
config.setNumWorkers(3)
StormSubmitter.submitTopology(args(0), config, builder.createTopology())
} else {
val cluster: LocalCluster = new LocalCluster()
cluster.submitTopology("ExclamationTopology", config, builder.createTopology())
Utils.sleep(5000)
cluster.killTopology("ExclamationTopology")
cluster.shutdown()
}
}
}
⬛︎ 終わりに
なんとか、Apache Kafka/Storm連携によるストリーム処理の動作イメージが確認することができました。
ただし、Apache Kafka/Storm連携に関わる技術要素が多岐にわたるため、その技術習得は、なかなか容易ではなさそうです。ここでは触れておりませんが、Apache Kafka/Storm連携においては、Apache ZooKeeperの技術知識も必要になってきます。
さらに、Apache Kafka/Storm連携クラスタ運用を目指す場合には、覚えなければいけない技術知識が、膨大に膨れ上がりそうなことがわかりました。このあたりを網羅した解説書の出版に期待したいところです。
今回は、ここまでにします。