LoginSignup
1

More than 5 years have passed since last update.

Apache StormのScala版Boltサンプルを試してみる

Posted at

前回のブログ記事「Mac OS X環境で、Apache Stormの基本動作を試してみる」のつづきです。
Apache Stormは、multi-language対応なので、"Scala版 storm-starter"を試してみます。

⬛︎ Scala版 storm-starterを構築する

  • Githubから、Scala版 storm-starterリポジトリ情報をダウンロードして、ビルドします。 もともとは、paulfryzel/scala-storm-starterのリポジトリ をベースにして、最新版のStormで動作するように一部修正しております。
$ git clone https://github.com/ttsubo/scala-storm-starter.git
$ cd scala-storm-starter/
$ git checkout sample
$ brew install sbt
$ sbt compile package assembly
  • "Nimbus"を起動します
$ storm nimbus
Running: java -server -Ddaemon.name=nimbus -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/storm/conf -Xmx1024m -Dlogfile.name=nimbus.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.daemon.nimbus
  • "Supervisor"を起動します
$ storm supervisor
Running: java -server -Ddaemon.name=supervisor -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/storm/conf -Xmx256m -Dlogfile.name=supervisor.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.daemon.supervisor
  • "UI"を起動します
$ storm ui
Running: java -server -Ddaemon.name=ui -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/apache-storm-0.10.0:/usr/local/storm/conf -Xmx768m -Dlogfile.name=ui.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.ui.core
  • "ExclamationTopology"をローカルモードで動作させます。
$ storm jar target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar storm.starter.topology.ExclamationTopology

...(snip)

6159 [Thread-36-word] INFO  b.s.d.task - Emitting: word default [bertels]
6162 [Thread-36-word] INFO  b.s.d.executor - TRANSFERING tuple TASK: 4 TUPLE: source: word:10, stream: default, id: {}, [bertels]
6163 [Thread-38-word] INFO  b.s.d.task - Emitting: word default [jackson]
6168 [Thread-38-word] INFO  b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word:14, stream: default, id: {}, [jackson]
6171 [Thread-40-exclaim1] INFO  b.s.d.executor - Processing received message FOR 4 TUPLE: source: word:10, stream: default, id: {}, [bertels]
6172 [Thread-18-exclaim1] INFO  b.s.d.executor - Processing received message FOR 3 TUPLE: source: word:14, stream: default, id: {}, [jackson]
6174 [Thread-24-word] INFO  b.s.d.task - Emitting: word default [jackson]
6174 [Thread-10-word] INFO  b.s.d.task - Emitting: word default [bertels]
6174 [Thread-24-word] INFO  b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: word:9, stream: default, id: {}, [jackson]
6174 [Thread-10-word] INFO  b.s.d.executor - TRANSFERING tuple TASK: 4 TUPLE: source: word:8, stream: default, id: {}, [bertels]
6174 [Thread-42-word] INFO  b.s.d.task - Emitting: word default [jackson]
6174 [Thread-42-word] INFO  b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word:15, stream: default, id: {}, [jackson]
6174 [Thread-14-exclaim1] INFO  b.s.d.executor - Processing received message FOR 2 TUPLE: source: word:9, stream: default, id: {}, [jackson]
6179 [Thread-26-word] INFO  b.s.d.task - Emitting: word default [golda]
6179 [Thread-26-word] INFO  b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: word:11, stream: default, id: {}, [golda]
6190 [Thread-12-word] INFO  b.s.d.task - Emitting: word default [golda]
6190 [Thread-12-word] INFO  b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word:12, stream: default, id: {}, [golda]
6200 [Thread-18-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [jackson!!!]
6200 [Thread-14-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [jackson!!!]
6200 [Thread-40-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [bertels!!!]
6201 [Thread-40-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:4, stream: default, id: {}, [bertels!!!]
6201 [Thread-14-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 5 TUPLE: source: exclaim1:2, stream: default, id: {}, [jackson!!!]
6201 [Thread-18-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:3, stream: default, id: {}, [jackson!!!]
6201 [Thread-14-exclaim1] INFO  b.s.d.executor - BOLT ack TASK: 2 TIME:  TUPLE: source: word:9, stream: default, id: {}, [jackson]
6201 [Thread-40-exclaim1] INFO  b.s.d.executor - BOLT ack TASK: 4 TIME:  TUPLE: source: word:10, stream: default, id: {}, [bertels]
6201 [Thread-14-exclaim1] INFO  b.s.d.executor - Execute done TUPLE source: word:9, stream: default, id: {}, [jackson] TASK: 2 DELTA: 
6201 [Thread-40-exclaim1] INFO  b.s.d.executor - Execute done TUPLE source: word:10, stream: default, id: {}, [bertels] TASK: 4 DELTA: 
6201 [Thread-18-exclaim1] INFO  b.s.d.executor - BOLT ack TASK: 3 TIME:  TUPLE: source: word:14, stream: default, id: {}, [jackson]
6201 [Thread-34-exclaim2] INFO  b.s.d.executor - Processing received message FOR 5 TUPLE: source: exclaim1:2, stream: default, id: {}, [jackson!!!]
6201 [Thread-40-exclaim1] INFO  b.s.d.executor - Processing received message FOR 4 TUPLE: source: word:8, stream: default, id: {}, [bertels]
6201 [Thread-22-exclaim2] INFO  b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:4, stream: default, id: {}, [bertels!!!]
6201 [Thread-40-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [bertels!!!]
6201 [Thread-34-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [jackson!!!!!!]
6201 [Thread-14-exclaim1] INFO  b.s.d.executor - Processing received message FOR 2 TUPLE: source: word:11, stream: default, id: {}, [golda]
6201 [Thread-18-exclaim1] INFO  b.s.d.executor - Execute done TUPLE source: word:14, stream: default, id: {}, [jackson] TASK: 3 DELTA: 
6202 [Thread-34-exclaim2] INFO  b.s.d.executor - BOLT ack TASK: 5 TIME:  TUPLE: source: exclaim1:2, stream: default, id: {}, [jackson!!!]
6202 [Thread-14-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [golda!!!]
6202 [Thread-18-exclaim1] INFO  b.s.d.executor - Processing received message FOR 3 TUPLE: source: word:15, stream: default, id: {}, [jackson]
6202 [Thread-34-exclaim2] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: default, id: {}, [jackson!!!] TASK: 5 DELTA: 
6202 [Thread-18-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [jackson!!!]
6202 [Thread-14-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:2, stream: default, id: {}, [golda!!!]
6202 [Thread-22-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [bertels!!!!!!]
6202 [Thread-40-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 5 TUPLE: source: exclaim1:4, stream: default, id: {}, [bertels!!!]
6202 [Thread-22-exclaim2] INFO  b.s.d.executor - BOLT ack TASK: 6 TIME:  TUPLE: source: exclaim1:4, stream: default, id: {}, [bertels!!!]
6202 [Thread-18-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 5 TUPLE: source: exclaim1:3, stream: default, id: {}, [jackson!!!]
6202 [Thread-22-exclaim2] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:4, stream: default, id: {}, [bertels!!!] TASK: 6 DELTA: 
6202 [Thread-18-exclaim1] INFO  b.s.d.executor - BOLT ack TASK: 3 TIME:  TUPLE: source: word:15, stream: default, id: {}, [jackson]
6202 [Thread-22-exclaim2] INFO  b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:3, stream: default, id: {}, [jackson!!!]
6202 [Thread-18-exclaim1] INFO  b.s.d.executor - Execute done TUPLE source: word:15, stream: default, id: {}, [jackson] TASK: 3 DELTA: 
6202 [Thread-22-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [jackson!!!!!!]
6202 [Thread-18-exclaim1] INFO  b.s.d.executor - Processing received message FOR 3 TUPLE: source: word:12, stream: default, id: {}, [golda]
6202 [Thread-14-exclaim1] INFO  b.s.d.executor - BOLT ack TASK: 2 TIME:  TUPLE: source: word:11, stream: default, id: {}, [golda]
6203 [Thread-14-exclaim1] INFO  b.s.d.executor - Execute done TUPLE source: word:11, stream: default, id: {}, [golda] TASK: 2 DELTA: 
6203 [Thread-18-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [golda!!!]
6202 [Thread-40-exclaim1] INFO  b.s.d.executor - BOLT ack TASK: 4 TIME:  TUPLE: source: word:8, stream: default, id: {}, [bertels]
6203 [Thread-34-exclaim2] INFO  b.s.d.executor - Processing received message FOR 5 TUPLE: source: exclaim1:4, stream: default, id: {}, [bertels!!!]
6203 [Thread-18-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:3, stream: default, id: {}, [golda!!!]
6203 [Thread-40-exclaim1] INFO  b.s.d.executor - Execute done TUPLE source: word:8, stream: default, id: {}, [bertels] TASK: 4 DELTA: 
6203 [Thread-34-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [bertels!!!!!!]
6203 [Thread-18-exclaim1] INFO  b.s.d.executor - BOLT ack TASK: 3 TIME:  TUPLE: source: word:12, stream: default, id: {}, [golda]
6203 [Thread-34-exclaim2] INFO  b.s.d.executor - BOLT ack TASK: 5 TIME:  TUPLE: source: exclaim1:4, stream: default, id: {}, [bertels!!!]
6204 [Thread-18-exclaim1] INFO  b.s.d.executor - Execute done TUPLE source: word:12, stream: default, id: {}, [golda] TASK: 3 DELTA: 
6204 [Thread-34-exclaim2] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:4, stream: default, id: {}, [bertels!!!] TASK: 5 DELTA: 
6204 [Thread-22-exclaim2] INFO  b.s.d.executor - BOLT ack TASK: 6 TIME:  TUPLE: source: exclaim1:3, stream: default, id: {}, [jackson!!!]
6204 [Thread-34-exclaim2] INFO  b.s.d.executor - Processing received message FOR 5 TUPLE: source: exclaim1:3, stream: default, id: {}, [jackson!!!]
6204 [Thread-22-exclaim2] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:3, stream: default, id: {}, [jackson!!!] TASK: 6 DELTA: 
6204 [Thread-34-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [jackson!!!!!!]
6204 [Thread-22-exclaim2] INFO  b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:2, stream: default, id: {}, [golda!!!]
6204 [Thread-34-exclaim2] INFO  b.s.d.executor - BOLT ack TASK: 5 TIME:  TUPLE: source: exclaim1:3, stream: default, id: {}, [jackson!!!]
6204 [Thread-22-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [golda!!!!!!]
6205 [Thread-34-exclaim2] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:3, stream: default, id: {}, [jackson!!!] TASK: 5 DELTA: 
6205 [Thread-22-exclaim2] INFO  b.s.d.executor - BOLT ack TASK: 6 TIME:  TUPLE: source: exclaim1:2, stream: default, id: {}, [golda!!!]
6205 [Thread-22-exclaim2] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: default, id: {}, [golda!!!] TASK: 6 DELTA: 
6205 [Thread-22-exclaim2] INFO  b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:3, stream: default, id: {}, [golda!!!]
6205 [Thread-22-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [golda!!!!!!]
6205 [Thread-22-exclaim2] INFO  b.s.d.executor - BOLT ack TASK: 6 TIME:  TUPLE: source: exclaim1:3, stream: default, id: {}, [golda!!!]
6205 [Thread-22-exclaim2] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:3, stream: default, id: {}, [golda!!!] TASK: 6 DELTA: 
6209 [Thread-30-word] INFO  b.s.d.task - Emitting: word default [nathan]
6209 [Thread-16-word] INFO  b.s.d.task - Emitting: word default [bertels]
6210 [Thread-16-word] INFO  b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word:7, stream: default, id: {}, [bertels]
6210 [Thread-30-word] INFO  b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: word:16, stream: default, id: {}, [nathan]
6210 [Thread-18-exclaim1] INFO  b.s.d.executor - Processing received message FOR 3 TUPLE: source: word:7, stream: default, id: {}, [bertels]
6210 [Thread-14-exclaim1] INFO  b.s.d.executor - Processing received message FOR 2 TUPLE: source: word:16, stream: default, id: {}, [nathan]
6211 [Thread-18-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [bertels!!!]
6211 [Thread-14-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [nathan!!!]
6212 [Thread-14-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 5 TUPLE: source: exclaim1:2, stream: default, id: {}, [nathan!!!]
6212 [Thread-14-exclaim1] INFO  b.s.d.executor - BOLT ack TASK: 2 TIME:  TUPLE: source: word:16, stream: default, id: {}, [nathan]
6213 [Thread-14-exclaim1] INFO  b.s.d.executor - Execute done TUPLE source: word:16, stream: default, id: {}, [nathan] TASK: 2 DELTA: 
6212 [Thread-18-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 5 TUPLE: source: exclaim1:3, stream: default, id: {}, [bertels!!!]
6214 [Thread-18-exclaim1] INFO  b.s.d.executor - BOLT ack TASK: 3 TIME:  TUPLE: source: word:7, stream: default, id: {}, [bertels]
6215 [Thread-18-exclaim1] INFO  b.s.d.executor - Execute done TUPLE source: word:7, stream: default, id: {}, [bertels] TASK: 3 DELTA: 
6215 [Thread-34-exclaim2] INFO  b.s.d.executor - Processing received message FOR 5 TUPLE: source: exclaim1:2, stream: default, id: {}, [nathan!!!]
6216 [Thread-34-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [nathan!!!!!!]
6218 [Thread-34-exclaim2] INFO  b.s.d.executor - BOLT ack TASK: 5 TIME:  TUPLE: source: exclaim1:2, stream: default, id: {}, [nathan!!!]
6219 [Thread-34-exclaim2] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: default, id: {}, [nathan!!!] TASK: 5 DELTA: 
6219 [Thread-34-exclaim2] INFO  b.s.d.executor - Processing received message FOR 5 TUPLE: source: exclaim1:3, stream: default, id: {}, [bertels!!!]
6219 [Thread-34-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [bertels!!!!!!]
6219 [Thread-34-exclaim2] INFO  b.s.d.executor - BOLT ack TASK: 5 TIME:  TUPLE: source: exclaim1:3, stream: default, id: {}, [bertels!!!]
6219 [Thread-34-exclaim2] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:3, stream: default, id: {}, [bertels!!!] TASK: 5 DELTA: 
6221 [Thread-32-word] INFO  b.s.d.task - Emitting: word default [golda]
6222 [Thread-32-word] INFO  b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: word:13, stream: default, id: {}, [golda]
6223 [Thread-14-exclaim1] INFO  b.s.d.executor - Processing received message FOR 2 TUPLE: source: word:13, stream: default, id: {}, [golda]
6223 [Thread-14-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [golda!!!]
6223 [Thread-14-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:2, stream: default, id: {}, [golda!!!]
6223 [Thread-14-exclaim1] INFO  b.s.d.executor - BOLT ack TASK: 2 TIME:  TUPLE: source: word:13, stream: default, id: {}, [golda]
6223 [Thread-14-exclaim1] INFO  b.s.d.executor - Execute done TUPLE source: word:13, stream: default, id: {}, [golda] TASK: 2 DELTA: 
6224 [Thread-22-exclaim2] INFO  b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:2, stream: default, id: {}, [golda!!!]
6224 [Thread-22-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [golda!!!!!!]
6224 [Thread-22-exclaim2] INFO  b.s.d.executor - BOLT ack TASK: 6 TIME:  TUPLE: source: exclaim1:2, stream: default, id: {}, [golda!!!]
6224 [Thread-22-exclaim2] INFO  b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: default, id: {}, [golda!!!] TASK: 6 DELTA: 
6266 [Thread-36-word] INFO  b.s.d.task - Emitting: word default [bertels]
6266 [Thread-36-word] INFO  b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word:10, stream: default, id: {}, [bertels]
6266 [Thread-18-exclaim1] INFO  b.s.d.executor - Processing received message FOR 3 TUPLE: source: word:10, stream: default, id: {}, [bertels]
6266 [Thread-18-exclaim1] INFO  b.s.d.task - Emitting: exclaim1 default [bertels!!!]
6266 [Thread-18-exclaim1] INFO  b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:3, stream: default, id: {}, [bertels!!!]
6266 [Thread-18-exclaim1] INFO  b.s.d.executor - BOLT ack TASK: 3 TIME:  TUPLE: source: word:10, stream: default, id: {}, [bertels]
6267 [Thread-22-exclaim2] INFO  b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:3, stream: default, id: {}, [bertels!!!]
6267 [Thread-18-exclaim1] INFO  b.s.d.executor - Execute done TUPLE source: word:10, stream: default, id: {}, [bertels] TASK: 3 DELTA: 1
6267 [Thread-22-exclaim2] INFO  b.s.d.task - Emitting: exclaim2 default [bertels!!!!!!]

以上より、前回と同様に、word(TestWordSpout) → exclaim1(ExclamationBolt) → exclaim2(ExclamationBolt)と動作するストリーム処理において、"bertels” → ”bertels!!!” → ”bertels!!!!!!"のように動作している様子が確認できました。

⬛︎ Storm動作におけるストリーム処理って、どうなっているのか?

(1) Topology定義をさぐってみる

まず、TopologyBuilderクラスからインスタンスを生成するところからはじまります。
そして、Topology構成要素となる"Spout"と"Bolt"を定義することにより、ストリーム処理、すなわち、"Tuple"の振る舞いを定義することになります。
ここでは、word(TestWordSpout) → exclaim1(ExclamationBolt) → exclaim2(ExclamationBolt)と動作するストリーム処理を定義しております。

ExclamationTopology.scala
package storm.starter.topology

import backtype.storm.{ Config, LocalCluster, StormSubmitter }
import backtype.storm.testing.TestWordSpout
import backtype.storm.topology.TopologyBuilder
import backtype.storm.utils.Utils

object ExclamationTopology {
  def main(args: Array[String]) {
    import storm.starter.bolt.ExclamationBolt

    val builder: TopologyBuilder = new TopologyBuilder()

    builder.setSpout("word", new TestWordSpout(), 10)
    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word")
    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1")

    val config = new Config()
    config.setDebug(true)

    if (args != null && args.length > 0) {
      config.setNumWorkers(3)
      StormSubmitter.submitTopology(args(0), config, builder.createTopology())
    } else {
      val cluster: LocalCluster = new LocalCluster()
      cluster.submitTopology("ExclamationTopology", config, builder.createTopology())
      Utils.sleep(5000)
      cluster.killTopology("ExclamationTopology")
      cluster.shutdown()
    }
  }
}

ちなみに、Topologyモードについては、引数を指定しないと、ローカルモードとして動作して、引数を指定すると、リモートモードで動作する実装になっております。
StormSubmitter (Storm 0.10.0-SNAPSHOT API)

(2) Spoutをさぐってみる

先ほどのTopologyにて定義されたSpout"TestWordSpout"は、Githubリポジトリ"apache/storm"のjavaコードにて実装されております。

TestWordSpout.java
package org.apache.storm.testing;

import org.apache.storm.Config;
import org.apache.storm.topology.OutputFieldsDeclarer;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.HashMap;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class TestWordSpout extends BaseRichSpout {
    public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
    boolean _isDistributed;
    SpoutOutputCollector _collector;

    public TestWordSpout() {
        this(true);
    }

    public TestWordSpout(boolean isDistributed) {
        _isDistributed = isDistributed;
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
    }

    public void close() {

    }

    public void nextTuple() {
        Utils.sleep(100);
        final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word));
    }

    public void ack(Object msgId) {

    }

    public void fail(Object msgId) {

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        if(!_isDistributed) {
            Map<String, Object> ret = new HashMap<String, Object>();
            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
            return ret;
        } else {
            return null;
        }
    }    
}

nextTupleメソッドは、定期的に呼び出されることにより、Tupleが生成されます。

(3) Boltをさぐってみる

Spout: word → Bolt: exclaim1 → Bolt: exclaim2というTuple処理において、Bolt動作は、ExclamationBoltクラスで定義されております。

ExclamationBolt
package storm.starter.bolt

import backtype.storm.task.{ OutputCollector, TopologyContext }
import backtype.storm.topology.base.BaseRichBolt
import backtype.storm.topology.OutputFieldsDeclarer
import backtype.storm.tuple.{ Fields, Tuple, Values }
import java.util.{ Map => JMap }

class ExclamationBolt extends BaseRichBolt {
  var collector: OutputCollector = _

  override def prepare(config: JMap[_, _], context: TopologyContext, collector: OutputCollector) {
    this.collector = collector
  }

  override def execute(tuple: Tuple) {
    this.collector.emit(tuple, new Values(Exclaimer.exclaim(tuple.getString(0))))
    this.collector.ack(tuple)
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer) {
    declarer.declare(new Fields("word"))
  }
}

なお、Bolt:exclaim1では、Tupleを受け取るごとに、executeメソッドが起動されます。
次のBolt:exclaim2に引き渡すにあたって、"!!!"の文字列を追加します。
そして、Tuple処理が完了したことを示す、ackメソッドが起動されます。

Exclaimer.scala
package storm.starter.bolt

object Exclaimer {
  def exclaim(s: String): String = {
    s + "!!!"
  }
}

実際の"!!!"の文字列の追加は、exclaimメソッドで実施されます。
なお、Bolt: exclaim2でも、ExclamationBoltクラスで定義されておりますので、動作は同じになります。

⬛︎ 終わりに

今回、Apache Stormでのmulti-language対応としてをScala版Boltサンプルの動作を確認しました。
Scala版Boltサンプルでも、Apache Stormの基本動作を確認することができました。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1