前回のブログ記事「Mac OS X環境で、Apache Stormの基本動作を試してみる」のつづきです。
Apache Stormは、multi-language対応なので、"Scala版 storm-starter"を試してみます。
⬛︎ Scala版 storm-starterを構築する
- Githubから、Scala版 storm-starterリポジトリ情報をダウンロードして、ビルドします。
もともとは、paulfryzel/scala-storm-starterのリポジトリ
をベースにして、最新版のStormで動作するように一部修正しております。
$ git clone https://github.com/ttsubo/scala-storm-starter.git
$ cd scala-storm-starter/
$ git checkout sample
$ brew install sbt
$ sbt compile package assembly
- "Nimbus"を起動します
$ storm nimbus
Running: java -server -Ddaemon.name=nimbus -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/storm/conf -Xmx1024m -Dlogfile.name=nimbus.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.daemon.nimbus
- "Supervisor"を起動します
$ storm supervisor
Running: java -server -Ddaemon.name=supervisor -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/storm/conf -Xmx256m -Dlogfile.name=supervisor.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.daemon.supervisor
- "UI"を起動します
$ storm ui
Running: java -server -Ddaemon.name=ui -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-0.10.0 -Dstorm.log.dir=/usr/local/apache-storm-0.10.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-0.10.0/lib/asm-4.0.jar:/usr/local/apache-storm-0.10.0/lib/clojure-1.6.0.jar:/usr/local/apache-storm-0.10.0/lib/disruptor-2.10.4.jar:/usr/local/apache-storm-0.10.0/lib/hadoop-auth-2.4.0.jar:/usr/local/apache-storm-0.10.0/lib/kryo-2.21.jar:/usr/local/apache-storm-0.10.0/lib/log4j-api-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-core-2.1.jar:/usr/local/apache-storm-0.10.0/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-0.10.0/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-0.10.0/lib/minlog-1.2.jar:/usr/local/apache-storm-0.10.0/lib/reflectasm-1.07-shaded.jar:/usr/local/apache-storm-0.10.0/lib/servlet-api-2.5.jar:/usr/local/apache-storm-0.10.0/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-0.10.0/lib/storm-core-0.10.0.jar:/usr/local/apache-storm-0.10.0:/usr/local/storm/conf -Xmx768m -Dlogfile.name=ui.log -Dlog4j.configurationFile=/usr/local/apache-storm-0.10.0/log4j2/cluster.xml backtype.storm.ui.core
- "ExclamationTopology"をローカルモードで動作させます。
$ storm jar target/scala-2.11/scala-storm-starter-assembly-0.0.2-SNAPSHOT.jar storm.starter.topology.ExclamationTopology
...(snip)
6159 [Thread-36-word] INFO b.s.d.task - Emitting: word default [bertels]
6162 [Thread-36-word] INFO b.s.d.executor - TRANSFERING tuple TASK: 4 TUPLE: source: word:10, stream: default, id: {}, [bertels]
6163 [Thread-38-word] INFO b.s.d.task - Emitting: word default [jackson]
6168 [Thread-38-word] INFO b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word:14, stream: default, id: {}, [jackson]
6171 [Thread-40-exclaim1] INFO b.s.d.executor - Processing received message FOR 4 TUPLE: source: word:10, stream: default, id: {}, [bertels]
6172 [Thread-18-exclaim1] INFO b.s.d.executor - Processing received message FOR 3 TUPLE: source: word:14, stream: default, id: {}, [jackson]
6174 [Thread-24-word] INFO b.s.d.task - Emitting: word default [jackson]
6174 [Thread-10-word] INFO b.s.d.task - Emitting: word default [bertels]
6174 [Thread-24-word] INFO b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: word:9, stream: default, id: {}, [jackson]
6174 [Thread-10-word] INFO b.s.d.executor - TRANSFERING tuple TASK: 4 TUPLE: source: word:8, stream: default, id: {}, [bertels]
6174 [Thread-42-word] INFO b.s.d.task - Emitting: word default [jackson]
6174 [Thread-42-word] INFO b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word:15, stream: default, id: {}, [jackson]
6174 [Thread-14-exclaim1] INFO b.s.d.executor - Processing received message FOR 2 TUPLE: source: word:9, stream: default, id: {}, [jackson]
6179 [Thread-26-word] INFO b.s.d.task - Emitting: word default [golda]
6179 [Thread-26-word] INFO b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: word:11, stream: default, id: {}, [golda]
6190 [Thread-12-word] INFO b.s.d.task - Emitting: word default [golda]
6190 [Thread-12-word] INFO b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word:12, stream: default, id: {}, [golda]
6200 [Thread-18-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [jackson!!!]
6200 [Thread-14-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [jackson!!!]
6200 [Thread-40-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [bertels!!!]
6201 [Thread-40-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:4, stream: default, id: {}, [bertels!!!]
6201 [Thread-14-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 5 TUPLE: source: exclaim1:2, stream: default, id: {}, [jackson!!!]
6201 [Thread-18-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:3, stream: default, id: {}, [jackson!!!]
6201 [Thread-14-exclaim1] INFO b.s.d.executor - BOLT ack TASK: 2 TIME: TUPLE: source: word:9, stream: default, id: {}, [jackson]
6201 [Thread-40-exclaim1] INFO b.s.d.executor - BOLT ack TASK: 4 TIME: TUPLE: source: word:10, stream: default, id: {}, [bertels]
6201 [Thread-14-exclaim1] INFO b.s.d.executor - Execute done TUPLE source: word:9, stream: default, id: {}, [jackson] TASK: 2 DELTA:
6201 [Thread-40-exclaim1] INFO b.s.d.executor - Execute done TUPLE source: word:10, stream: default, id: {}, [bertels] TASK: 4 DELTA:
6201 [Thread-18-exclaim1] INFO b.s.d.executor - BOLT ack TASK: 3 TIME: TUPLE: source: word:14, stream: default, id: {}, [jackson]
6201 [Thread-34-exclaim2] INFO b.s.d.executor - Processing received message FOR 5 TUPLE: source: exclaim1:2, stream: default, id: {}, [jackson!!!]
6201 [Thread-40-exclaim1] INFO b.s.d.executor - Processing received message FOR 4 TUPLE: source: word:8, stream: default, id: {}, [bertels]
6201 [Thread-22-exclaim2] INFO b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:4, stream: default, id: {}, [bertels!!!]
6201 [Thread-40-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [bertels!!!]
6201 [Thread-34-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [jackson!!!!!!]
6201 [Thread-14-exclaim1] INFO b.s.d.executor - Processing received message FOR 2 TUPLE: source: word:11, stream: default, id: {}, [golda]
6201 [Thread-18-exclaim1] INFO b.s.d.executor - Execute done TUPLE source: word:14, stream: default, id: {}, [jackson] TASK: 3 DELTA:
6202 [Thread-34-exclaim2] INFO b.s.d.executor - BOLT ack TASK: 5 TIME: TUPLE: source: exclaim1:2, stream: default, id: {}, [jackson!!!]
6202 [Thread-14-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [golda!!!]
6202 [Thread-18-exclaim1] INFO b.s.d.executor - Processing received message FOR 3 TUPLE: source: word:15, stream: default, id: {}, [jackson]
6202 [Thread-34-exclaim2] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: default, id: {}, [jackson!!!] TASK: 5 DELTA:
6202 [Thread-18-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [jackson!!!]
6202 [Thread-14-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:2, stream: default, id: {}, [golda!!!]
6202 [Thread-22-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [bertels!!!!!!]
6202 [Thread-40-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 5 TUPLE: source: exclaim1:4, stream: default, id: {}, [bertels!!!]
6202 [Thread-22-exclaim2] INFO b.s.d.executor - BOLT ack TASK: 6 TIME: TUPLE: source: exclaim1:4, stream: default, id: {}, [bertels!!!]
6202 [Thread-18-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 5 TUPLE: source: exclaim1:3, stream: default, id: {}, [jackson!!!]
6202 [Thread-22-exclaim2] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:4, stream: default, id: {}, [bertels!!!] TASK: 6 DELTA:
6202 [Thread-18-exclaim1] INFO b.s.d.executor - BOLT ack TASK: 3 TIME: TUPLE: source: word:15, stream: default, id: {}, [jackson]
6202 [Thread-22-exclaim2] INFO b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:3, stream: default, id: {}, [jackson!!!]
6202 [Thread-18-exclaim1] INFO b.s.d.executor - Execute done TUPLE source: word:15, stream: default, id: {}, [jackson] TASK: 3 DELTA:
6202 [Thread-22-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [jackson!!!!!!]
6202 [Thread-18-exclaim1] INFO b.s.d.executor - Processing received message FOR 3 TUPLE: source: word:12, stream: default, id: {}, [golda]
6202 [Thread-14-exclaim1] INFO b.s.d.executor - BOLT ack TASK: 2 TIME: TUPLE: source: word:11, stream: default, id: {}, [golda]
6203 [Thread-14-exclaim1] INFO b.s.d.executor - Execute done TUPLE source: word:11, stream: default, id: {}, [golda] TASK: 2 DELTA:
6203 [Thread-18-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [golda!!!]
6202 [Thread-40-exclaim1] INFO b.s.d.executor - BOLT ack TASK: 4 TIME: TUPLE: source: word:8, stream: default, id: {}, [bertels]
6203 [Thread-34-exclaim2] INFO b.s.d.executor - Processing received message FOR 5 TUPLE: source: exclaim1:4, stream: default, id: {}, [bertels!!!]
6203 [Thread-18-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:3, stream: default, id: {}, [golda!!!]
6203 [Thread-40-exclaim1] INFO b.s.d.executor - Execute done TUPLE source: word:8, stream: default, id: {}, [bertels] TASK: 4 DELTA:
6203 [Thread-34-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [bertels!!!!!!]
6203 [Thread-18-exclaim1] INFO b.s.d.executor - BOLT ack TASK: 3 TIME: TUPLE: source: word:12, stream: default, id: {}, [golda]
6203 [Thread-34-exclaim2] INFO b.s.d.executor - BOLT ack TASK: 5 TIME: TUPLE: source: exclaim1:4, stream: default, id: {}, [bertels!!!]
6204 [Thread-18-exclaim1] INFO b.s.d.executor - Execute done TUPLE source: word:12, stream: default, id: {}, [golda] TASK: 3 DELTA:
6204 [Thread-34-exclaim2] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:4, stream: default, id: {}, [bertels!!!] TASK: 5 DELTA:
6204 [Thread-22-exclaim2] INFO b.s.d.executor - BOLT ack TASK: 6 TIME: TUPLE: source: exclaim1:3, stream: default, id: {}, [jackson!!!]
6204 [Thread-34-exclaim2] INFO b.s.d.executor - Processing received message FOR 5 TUPLE: source: exclaim1:3, stream: default, id: {}, [jackson!!!]
6204 [Thread-22-exclaim2] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:3, stream: default, id: {}, [jackson!!!] TASK: 6 DELTA:
6204 [Thread-34-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [jackson!!!!!!]
6204 [Thread-22-exclaim2] INFO b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:2, stream: default, id: {}, [golda!!!]
6204 [Thread-34-exclaim2] INFO b.s.d.executor - BOLT ack TASK: 5 TIME: TUPLE: source: exclaim1:3, stream: default, id: {}, [jackson!!!]
6204 [Thread-22-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [golda!!!!!!]
6205 [Thread-34-exclaim2] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:3, stream: default, id: {}, [jackson!!!] TASK: 5 DELTA:
6205 [Thread-22-exclaim2] INFO b.s.d.executor - BOLT ack TASK: 6 TIME: TUPLE: source: exclaim1:2, stream: default, id: {}, [golda!!!]
6205 [Thread-22-exclaim2] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: default, id: {}, [golda!!!] TASK: 6 DELTA:
6205 [Thread-22-exclaim2] INFO b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:3, stream: default, id: {}, [golda!!!]
6205 [Thread-22-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [golda!!!!!!]
6205 [Thread-22-exclaim2] INFO b.s.d.executor - BOLT ack TASK: 6 TIME: TUPLE: source: exclaim1:3, stream: default, id: {}, [golda!!!]
6205 [Thread-22-exclaim2] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:3, stream: default, id: {}, [golda!!!] TASK: 6 DELTA:
6209 [Thread-30-word] INFO b.s.d.task - Emitting: word default [nathan]
6209 [Thread-16-word] INFO b.s.d.task - Emitting: word default [bertels]
6210 [Thread-16-word] INFO b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word:7, stream: default, id: {}, [bertels]
6210 [Thread-30-word] INFO b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: word:16, stream: default, id: {}, [nathan]
6210 [Thread-18-exclaim1] INFO b.s.d.executor - Processing received message FOR 3 TUPLE: source: word:7, stream: default, id: {}, [bertels]
6210 [Thread-14-exclaim1] INFO b.s.d.executor - Processing received message FOR 2 TUPLE: source: word:16, stream: default, id: {}, [nathan]
6211 [Thread-18-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [bertels!!!]
6211 [Thread-14-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [nathan!!!]
6212 [Thread-14-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 5 TUPLE: source: exclaim1:2, stream: default, id: {}, [nathan!!!]
6212 [Thread-14-exclaim1] INFO b.s.d.executor - BOLT ack TASK: 2 TIME: TUPLE: source: word:16, stream: default, id: {}, [nathan]
6213 [Thread-14-exclaim1] INFO b.s.d.executor - Execute done TUPLE source: word:16, stream: default, id: {}, [nathan] TASK: 2 DELTA:
6212 [Thread-18-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 5 TUPLE: source: exclaim1:3, stream: default, id: {}, [bertels!!!]
6214 [Thread-18-exclaim1] INFO b.s.d.executor - BOLT ack TASK: 3 TIME: TUPLE: source: word:7, stream: default, id: {}, [bertels]
6215 [Thread-18-exclaim1] INFO b.s.d.executor - Execute done TUPLE source: word:7, stream: default, id: {}, [bertels] TASK: 3 DELTA:
6215 [Thread-34-exclaim2] INFO b.s.d.executor - Processing received message FOR 5 TUPLE: source: exclaim1:2, stream: default, id: {}, [nathan!!!]
6216 [Thread-34-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [nathan!!!!!!]
6218 [Thread-34-exclaim2] INFO b.s.d.executor - BOLT ack TASK: 5 TIME: TUPLE: source: exclaim1:2, stream: default, id: {}, [nathan!!!]
6219 [Thread-34-exclaim2] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: default, id: {}, [nathan!!!] TASK: 5 DELTA:
6219 [Thread-34-exclaim2] INFO b.s.d.executor - Processing received message FOR 5 TUPLE: source: exclaim1:3, stream: default, id: {}, [bertels!!!]
6219 [Thread-34-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [bertels!!!!!!]
6219 [Thread-34-exclaim2] INFO b.s.d.executor - BOLT ack TASK: 5 TIME: TUPLE: source: exclaim1:3, stream: default, id: {}, [bertels!!!]
6219 [Thread-34-exclaim2] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:3, stream: default, id: {}, [bertels!!!] TASK: 5 DELTA:
6221 [Thread-32-word] INFO b.s.d.task - Emitting: word default [golda]
6222 [Thread-32-word] INFO b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: word:13, stream: default, id: {}, [golda]
6223 [Thread-14-exclaim1] INFO b.s.d.executor - Processing received message FOR 2 TUPLE: source: word:13, stream: default, id: {}, [golda]
6223 [Thread-14-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [golda!!!]
6223 [Thread-14-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:2, stream: default, id: {}, [golda!!!]
6223 [Thread-14-exclaim1] INFO b.s.d.executor - BOLT ack TASK: 2 TIME: TUPLE: source: word:13, stream: default, id: {}, [golda]
6223 [Thread-14-exclaim1] INFO b.s.d.executor - Execute done TUPLE source: word:13, stream: default, id: {}, [golda] TASK: 2 DELTA:
6224 [Thread-22-exclaim2] INFO b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:2, stream: default, id: {}, [golda!!!]
6224 [Thread-22-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [golda!!!!!!]
6224 [Thread-22-exclaim2] INFO b.s.d.executor - BOLT ack TASK: 6 TIME: TUPLE: source: exclaim1:2, stream: default, id: {}, [golda!!!]
6224 [Thread-22-exclaim2] INFO b.s.d.executor - Execute done TUPLE source: exclaim1:2, stream: default, id: {}, [golda!!!] TASK: 6 DELTA:
6266 [Thread-36-word] INFO b.s.d.task - Emitting: word default [bertels]
6266 [Thread-36-word] INFO b.s.d.executor - TRANSFERING tuple TASK: 3 TUPLE: source: word:10, stream: default, id: {}, [bertels]
6266 [Thread-18-exclaim1] INFO b.s.d.executor - Processing received message FOR 3 TUPLE: source: word:10, stream: default, id: {}, [bertels]
6266 [Thread-18-exclaim1] INFO b.s.d.task - Emitting: exclaim1 default [bertels!!!]
6266 [Thread-18-exclaim1] INFO b.s.d.executor - TRANSFERING tuple TASK: 6 TUPLE: source: exclaim1:3, stream: default, id: {}, [bertels!!!]
6266 [Thread-18-exclaim1] INFO b.s.d.executor - BOLT ack TASK: 3 TIME: TUPLE: source: word:10, stream: default, id: {}, [bertels]
6267 [Thread-22-exclaim2] INFO b.s.d.executor - Processing received message FOR 6 TUPLE: source: exclaim1:3, stream: default, id: {}, [bertels!!!]
6267 [Thread-18-exclaim1] INFO b.s.d.executor - Execute done TUPLE source: word:10, stream: default, id: {}, [bertels] TASK: 3 DELTA: 1
6267 [Thread-22-exclaim2] INFO b.s.d.task - Emitting: exclaim2 default [bertels!!!!!!]
以上より、前回と同様に、word(TestWordSpout) → exclaim1(ExclamationBolt) → exclaim2(ExclamationBolt)と動作するストリーム処理において、"bertels” → ”bertels!!!” → ”bertels!!!!!!"のように動作している様子が確認できました。
⬛︎ Storm動作におけるストリーム処理って、どうなっているのか?
(1) Topology定義をさぐってみる
まず、TopologyBuilderクラスからインスタンスを生成するところからはじまります。
そして、Topology構成要素となる"Spout"と"Bolt"を定義することにより、ストリーム処理、すなわち、"Tuple"の振る舞いを定義することになります。
ここでは、word(TestWordSpout) → exclaim1(ExclamationBolt) → exclaim2(ExclamationBolt)と動作するストリーム処理を定義しております。
package storm.starter.topology
import backtype.storm.{ Config, LocalCluster, StormSubmitter }
import backtype.storm.testing.TestWordSpout
import backtype.storm.topology.TopologyBuilder
import backtype.storm.utils.Utils
object ExclamationTopology {
def main(args: Array[String]) {
import storm.starter.bolt.ExclamationBolt
val builder: TopologyBuilder = new TopologyBuilder()
builder.setSpout("word", new TestWordSpout(), 10)
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word")
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1")
val config = new Config()
config.setDebug(true)
if (args != null && args.length > 0) {
config.setNumWorkers(3)
StormSubmitter.submitTopology(args(0), config, builder.createTopology())
} else {
val cluster: LocalCluster = new LocalCluster()
cluster.submitTopology("ExclamationTopology", config, builder.createTopology())
Utils.sleep(5000)
cluster.killTopology("ExclamationTopology")
cluster.shutdown()
}
}
}
ちなみに、Topologyモードについては、引数を指定しないと、ローカルモードとして動作して、引数を指定すると、リモートモードで動作する実装になっております。
StormSubmitter (Storm 0.10.0-SNAPSHOT API)
(2) Spoutをさぐってみる
先ほどのTopologyにて定義されたSpout"TestWordSpout"は、Githubリポジトリ"apache/storm"のjavaコードにて実装されております。
package org.apache.storm.testing;
import org.apache.storm.Config;
import org.apache.storm.topology.OutputFieldsDeclarer;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.HashMap;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestWordSpout extends BaseRichSpout {
public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
boolean _isDistributed;
SpoutOutputCollector _collector;
public TestWordSpout() {
this(true);
}
public TestWordSpout(boolean isDistributed) {
_isDistributed = isDistributed;
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
public void close() {
}
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
if(!_isDistributed) {
Map<String, Object> ret = new HashMap<String, Object>();
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
return ret;
} else {
return null;
}
}
}
nextTupleメソッドは、定期的に呼び出されることにより、Tupleが生成されます。
(3) Boltをさぐってみる
Spout: word → Bolt: exclaim1 → Bolt: exclaim2というTuple処理において、Bolt動作は、ExclamationBoltクラスで定義されております。
package storm.starter.bolt
import backtype.storm.task.{ OutputCollector, TopologyContext }
import backtype.storm.topology.base.BaseRichBolt
import backtype.storm.topology.OutputFieldsDeclarer
import backtype.storm.tuple.{ Fields, Tuple, Values }
import java.util.{ Map => JMap }
class ExclamationBolt extends BaseRichBolt {
var collector: OutputCollector = _
override def prepare(config: JMap[_, _], context: TopologyContext, collector: OutputCollector) {
this.collector = collector
}
override def execute(tuple: Tuple) {
this.collector.emit(tuple, new Values(Exclaimer.exclaim(tuple.getString(0))))
this.collector.ack(tuple)
}
override def declareOutputFields(declarer: OutputFieldsDeclarer) {
declarer.declare(new Fields("word"))
}
}
なお、Bolt:exclaim1では、Tupleを受け取るごとに、executeメソッドが起動されます。
次のBolt:exclaim2に引き渡すにあたって、"!!!"の文字列を追加します。
そして、Tuple処理が完了したことを示す、ackメソッドが起動されます。
package storm.starter.bolt
object Exclaimer {
def exclaim(s: String): String = {
s + "!!!"
}
}
実際の"!!!"の文字列の追加は、exclaimメソッドで実施されます。
なお、Bolt: exclaim2でも、ExclamationBoltクラスで定義されておりますので、動作は同じになります。
⬛︎ 終わりに
今回、Apache Stormでのmulti-language対応としてをScala版Boltサンプルの動作を確認しました。
Scala版Boltサンプルでも、Apache Stormの基本動作を確認することができました。