Stormプロジェクトを作成、実行する
- Apache Stormを使って、分散処理を行う方法です。
- Web上にあるコードはリモートモード(1台のマシンで実行するエミュレータモードで終わっているものがほとんど。)ではなかったりと、色々四苦八苦しながら構築したノウハウを公開します。
1 環境構築
1.1 JDKとJavaの開発環境を整えます。
ホスト | 用途 | JDK |
---|---|---|
MacOS X | 開発環境 | Oracle Java SE Development Kit 7u79 JDK 7u80 with Netbeansでも問題ありません。 |
CentOS 6.6 | Nimbus&Supervisors | yum -y install java-1.7.0-openjdk-devel.x86_64 |
alternatives --set java /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java
alternatives --set javac /usr/lib/jvm/java-1.7.0-openjdk.x86_64/bin/javac
alternatives --set jar /usr/lib/jvm/java-1.7.0-openjdk.x86_64/bin/jar
export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk.x86_64
export JRE_HOME=/usr/lib/jvm/java-1.7.0-openjdk.x86_64/jre
export PATH=$PATH:/usr/lib/jvm/java-1.7.0-openjdk.x86_64/bin:/usr/lib/jvm/java-1.7.0-openjdk.x86_64/jre/bin
- Nimbus&Supervisorsの
java -version
コマンドの結果
java -version
# java version "1.7.0_79"
# OpenJDK Runtime Environment (rhel-2.5.5.1.el6_6-x86_64 u79-b14)
# OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode)
javac -version
# javac 1.7.0_79
1.2 Stormをダウンロード、インストールします。
- 下記スクリプトを使って、Apache Storm公式ページからダウンロードします。
sudo mkdir /usr/local
sudo mkdir /usr/local/downlad
sudo cd /usr/local/download
sudo curl http://ftp.riken.jp/net/apache/storm/apache-storm-0.9.4/apache-storm-0.9.4.tar.gz -o apache-storm-0.9.4.tar.gz
sudo tar xvzf apache-storm-0.9.4.tar.gz
sudo mv apache-storm-0.9.4 /usr/local/
sudo ln -s /usr/local/apache-storm-0.9.4/ /usr/local/storm
cd /usr/local/storm
echo 'export PATH=$PATH:/usr/local/storm/bin' >> ~/.bashrc
export PATH=$PATH:/usr/local/storm/bin
1.3 Mavenをダウンロード、インストールします。
sudo cd /usr/local/download
sudo curl http://ftp.tsukuba.wide.ad.jp/software/apache/maven/maven-3/3.3.1/binaries/apache-maven-3.3.1-bin.tar.gz -o apache-maven-3.3.1-bin.tar.gz
sudo tar xvzf apache-maven-3.3.1-bin.tar.gz
sudo mv apache-maven-3.3.1 /usr/local/
sudo ln -s /usr/local/apache-maven-3.3.1/ /usr/local/maven
echo 'export PATH=$PATH:/usr/local/maven/bin' >> ~/.bashrc
export PATH=$PATH:/usr/local/maven/bin
mvn -version
1.4 サンプルコードをダウンロードします。
mkdir ~/storm-project
cd ~/storm-project/
git clone git://github.com/nathanmarz/storm-starter
cd ~/storm-project/storm-starter/
mv m2-pom.xml pom.xml
1.5 Mavenを使ってコンパイルします。
mvn compile exec:java -Dstorm.topology=storm.starter.WordCountTopology
1.6 Mavenを使って、トポロジをsubmitできる形にパッケージングします。
mvn package
2 統合開発環境(IDE)を使ってStormを動かす。
- Netbeansを使いました。
2.1 New ProjectでMavenを選択します。
2.2 プロジェクト名等を指定します。
2.3 pom.xmlを編集します。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>jp.soushi</groupId>
<artifactId>SampleStorm</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>storm-test</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>jp.soushi.samplestorm.PrimeNumberTopology</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>install</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm-lib</artifactId>
<version>0.9.0</version>
<scope>provided</scope>
<!-- keep storm out of the jar-with-dependencies
<scope>provided</scope> -->
</dependency>
</dependencies>
<repositories>
<repository>
<id>github-releases</id>
<url>http://oss.sonatype.org/content/repositories/github-releases/</url>
</repository>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
</project>
2.4 NumberSpoutクラス、PrimeNumberBoltクラス、PrimeNumberTopologyクラスを作成します。
==パッケージ名は変更すること!==
- NumberSpout.java
package jp.soushi.samplestorm;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
public class NumberSpout extends BaseRichSpout
{
private SpoutOutputCollector collector;
private static int currentNumber = 1;
@Override
public void open( Map conf, TopologyContext context, SpoutOutputCollector collector )
{
this.collector = collector;
}
@Override
public void nextTuple()
{
// Emit the next number
collector.emit( new Values( new Integer( currentNumber++ ) ) );
}
@Override
public void ack(Object id)
{
}
@Override
public void fail(Object id)
{
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare( new Fields( "number" ) );
}
}
- PrimeNumberBolt.java
package jp.soushi.samplestorm;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class PrimeNumberBolt extends BaseRichBolt
{
private OutputCollector collector;
public void prepare( Map conf, TopologyContext context, OutputCollector collector )
{
this.collector = collector;
}
public void execute( Tuple tuple )
{
int number = tuple.getInteger( 0 );
if( isPrime( number) )
{
System.out.println( number );
}
collector.ack( tuple );
}
public void declareOutputFields( OutputFieldsDeclarer declarer )
{
declarer.declare( new Fields( "number" ) );
}
private boolean isPrime( int n )
{
if( n == 1 || n == 2 || n == 3 )
{
return true;
}
// Is n an even number?
if( n % 2 == 0 )
{
return false;
}
//if not, then just check the odds
for( int i=3; i*i<=n; i+=2 )
{
if( n % i == 0)
{
return false;
}
}
return true;
}
}
- PrimeNumberTopology.java
package jp.soushi.samplestorm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
public class PrimeNumberTopology
{
public static void main(String[] args)
{
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout( "spout", new NumberSpout() );
builder.setBolt( "prime", new PrimeNumberBolt() )
.shuffleGrouping("spout");
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
==この時点で、import部分に赤線が付いている。気にせずクリーンビルドすると消えます。==
2.5 プロジェクトを右クリック->Propaties->RunでメインクラスをPrimeNumberTopologyに設定します。
2.6 クリーンビルドします。(赤線エラーが消えます)
2.7 実行します。
- 数字が表示されていれば成功です。
3 リモートモード(分散環境)で実行します。
- リモートモード(分散モード)で実行してみます。
https://netbeans.org/kb/docs/javaee/maven-entapp_ja.html#intro
http://www.javaworld.com/article/2078672/big-data/open-source-tools-open-source-java-projects-storm.html
3.1 nimbusの場所を指定するファイルを作成します。
mkdir ~/.storm
echo 'nimbus.host: "131.113.102.201"' > ~/.storm/storm.yaml
3.2 javaファイルをすべて書き換えます。
==PrimeNumberクラスを使うのをやめて、下記のクラスを定義します。
-
ExclamationTopology.java(追加)
-
ExclamationSpout.java(追加)
-
ExclamationBolt.java(追加)
-
AddQuestionBolt.java(追加)
-
トポロジ定義ファイル(ExclamationTopology.java)
package jp.soushi.samplestorm;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
public class ExclamationTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new ExclamationSpout(), 5);
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
builder.setBolt("question1", new AddQuestionBolt(), 3).shuffleGrouping("exclaim1");
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(31);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology( "exclamationtopology", conf, builder.createTopology() );
}
}
- ボルト定義ファイル(ExclamationBolt.java)
package jp.soushi.samplestorm;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
- ボルト定義ファイル2(AddQuestionBolt.java)
package jp.soushi.samplestorm;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class AddQuestionBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
- スパウト定義ファイル(ExclamationSpout.java)
package jp.soushi.samplestorm;
import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ExclamationSpout extends BaseRichSpout {
public static Logger LOG = LoggerFactory.getLogger(ExclamationSpout.class);
boolean _isDistributed;
SpoutOutputCollector _collector;
public ExclamationSpout() {
this(true);
}
public ExclamationSpout(boolean isDistributed) {
_isDistributed = isDistributed;
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
public void close() {
}
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
if(!_isDistributed) {
Map<String, Object> ret = new HashMap<String, Object>();
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
return ret;
} else {
return null;
}
}
}
3.2 pom.xmlファイルを書き換えます。
- storm jarコマンドで、トポロジをsubmitすると、下記のエラーが発生します。これは、storm本体(storm.jar)を実行ファイルに含めているせいです。そこで、pom.xmlファイルを修正し、mvn package時に、storm.jarファイルを含めないようにする必要があります。
Exception in thread "main" java.lang.ExceptionInInitializerError
at backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:106)
at jp.soushi.samplestorm.PrimeNumberTopology.main(PrimeNumberTopology.java:36)
Caused by: java.lang.RuntimeException: Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar. [jar:file:/Users/soushi/Desktop/SampleStorm/target/lib/storm-core-0.9.0.jar!/defaults.yaml, jar:file:/usr/local/apache-storm-0.9.3/lib/storm-core-0.9.3.jar!/defaults.yaml]
at backtype.storm.utils.Utils.findAndReadConfigFile(Utils.java:133)
at backtype.storm.utils.Utils.readDefaultConfig(Utils.java:160)
at backtype.storm.utils.Utils.readStormConfig(Utils.java:184)
at backtype.storm.utils.Utils.<clinit>(Utils.java:71)
... 2 more
- pom.xmlファイルを次のように編集します。(provided行を追加するだけです。)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>jp.soushi</groupId>
<artifactId>SampleStorm</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>storm-test</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>jp.soushi.samplestorm.ExclamationTopology</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>install</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm-lib</artifactId>
<version>0.9.0</version>
<scope>provided</scope>
<!-- keep storm out of the jar-with-dependencies
<scope>provided</scope> -->
</dependency>
</dependencies>
<repositories>
<repository>
<id>github-releases</id>
<url>http://oss.sonatype.org/content/repositories/github-releases/</url>
</repository>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
</project>
3.3 Filesタブに切り替え、ルートディレクトリを右クリック->Tools->Show In Finderを選択します。(Finderでプロジェクトが保存されているディレクトリを表示します。)
3.4 Terminalでそのディレクトリに移動します。
cd /Users/soushi/Desktop/SampleStorm
3.5 mvn cleanコマンドでクリーンした後、mvn compileコマンドでコンパイルし、mvn packageコマンドでjarファイルを生成します。
mvn clean
mvn compile
mvn package
3.6 targetフォルダ内にjarファイルが生成されます。
3.7 Nimbusにトポロジをsubmitします
command | arg1 | arg2 | arg3 |
---|---|---|---|
storm | jar | jarのパス | メインクラス |
cd /Users/soushi/Desktop/SampleStorm
storm jar /Users/soushi/Desktop/SampleStorm/target/SampleStorm-1.0-SNAPSHOT.jar jp.soushi.samplestorm.PrimeNumberTopology
Trouble shooting
トポロジがしばらくするとkillされてしまう
ログファイルを確認し、Async loop detectedが発生している場合
- 【原因】トポロジ内にループが発生しています。
- 【解決法】トポロジを定義しているJavaファイル(*Topology.java)を確認し、同じボルトクラスを2回使っていないかを確認します。同一のボルトクラスを使いまわさないよう変更してください。
- ボルトを使いまわす場合は、ラベルを変えてループにならないようにしてください。
java.nio.channels.UnresolvedAddressExceptionが発生している場合
- 【原因】各ノードの/etc/hostsファイルが正しく記述されておらず、ホスト名を解決できていません。
- 【解決法】全ノード(Nimbus,Supervisors含む)の/etc/hostsファイルを正しく記述しているかを確認します。(ホスト名が解決できないとエラーが発生します)例えば、master1というホスト名を持つノードの場合、下記のように記述します。
127.0.0.1 master1 master1.yamanaka.ics.keio.ac.jp localhost localhost.localdomain
::1 master1 master1.yamanaka.ics.keio.ac.jp localhost localhost.localdomain
192.168.56.201 master1 master1.yamanaka.ics.keio.ac.jp
192.168.56.202 master2 master2.yamanaka.ics.keio.ac.jp
192.168.56.203 master3 master3.yamanaka.ics.keio.ac.jp
192.168.56.171 cluster1 cluster1.yamanaka.ics.keio.ac.jp
192.168.56.172 cluster2 cluster2.yamanaka.ics.keio.ac.jp
192.168.56.173 cluster3 cluster3.yamanaka.ics.keio.ac.jp
192.168.56.174 cluster4 cluster4.yamanaka.ics.keio.ac.jp
org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refusedが発生している場合
- 様々な原因が考えられます。Trouble shootingの全ての項目を確認してください。ログを読むとより詳細な情報が得られる場合があります。また、Storm公式ページのTrouble shootingもご覧ください。
指定したWorker数までWorkerが起動しない
- 【原因】Nimbusノードの、storm.yamlの- storm.yamlのsupervisor.slots.ports:が正しく記述されていないことが原因です。デフォルトでは4つのポート(6700,6701,6702,6703番)が設定されているため、デフォルト値のままだとノード数×4つのworkerまでしか起動できません。
- 【解決法】Nimbusノードのstorm.yamlファイルの記述を修正します。1ノードあたり10workerを起動したい場合は、10ポート記述します。(合計起動可能worker数はノード数×ポート数になる点に注意)
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
- 6704
- 6705
- 6706
- 6707
- 6708
- 6709
その他上手く動かない場合
- Zookeeperが通信できているかを確認します。
- Nimbusを複数起動している場合は、1台のみで起動させます。
- Nimbusノードのstorm.yamlに記述されているsupervisor.slots.ports:に、充分な数のポートが記載されているかを確認します。このポートは、同時に起動させたいワーカーの数だけ必要となります。例えば、下記のように、Worker数5で起動したトポロジを動かすためには、最低でも5ポート以上を記述しておく必要があります。また、Dockerコンテナ内でStormを実行する場合には、EXPOSEでポートを開けておく必要があります。
- サンプルコードを動かしてみてください。下記の3つのファイルを含むMavenを使ったJavaプロジェクトを作成し、nimbusノードにプロジェクトのフォルダごとscpで転送します。次に、Nimbusノード上でmvnを使ってコンパイルを行い、その後パッケージング(jarを作成)をします。最後に、storm jarコマンドを実行すると、jarファイルがsupervisorsに自動的に配布され、実行されます。
mvn clean
mvn compile
mvn package
storm jar ~/SampleStorm/target/SampleStorm-1.0-SNAPSHOT.jar jp.soushi.samplestorm.ExclamationTopology
- トポロジ定義ファイル(ExclamationTopology.java)の例(動作確認済)
package jp.soushi.samplestorm;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
public class ExclamationTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new ExclamationSpout());
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(11);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology( "exclamationtopology", conf, builder.createTopology() );
}
}
- ボルト定義ファイル(ExclamationBolt.java)の例(動作確認済)
package jp.soushi.samplestorm;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
- スパウト定義ファイルの例(動作確認済)
package jp.soushi.samplestorm;
import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ExclamationSpout extends BaseRichSpout {
public static Logger LOG = LoggerFactory.getLogger(ExclamationSpout.class);
boolean _isDistributed;
SpoutOutputCollector _collector;
public ExclamationSpout() {
this(true);
}
public ExclamationSpout(boolean isDistributed) {
_isDistributed = isDistributed;
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
public void close() {
}
public void nextTuple() {
Utils.sleep(1000);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
if(!_isDistributed) {
Map<String, Object> ret = new HashMap<String, Object>();
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
return ret;
} else {
return null;
}
}
}
- pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>jp.soushi</groupId>
<artifactId>adcontex</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<name>adcontex</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>jp.soushi.adcontex.AdContexTopology</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>install</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<configuration>
<!-- put your configurations here -->
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>central</id>
<url>https://repo1.maven.org/maven2</url>
<releases>
<enabled>true</enabled>
</releases>
</repository>
<repository>
<id>github-releases</id>
<url>http://oss.sonatype.org/content/repositories/github-releases/</url>
</repository>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
<repository>
<id>local-project-libraries</id>
<name>Local project libraries</name>
<url>file://${project.basedir}/lib</url>
<layout>default</layout>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm-lib</artifactId>
<version>0.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.3.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
References
URL | comment |
---|---|
JAVA WORLD:HOW-TO Open source Java projects:Storm Parallel realtime computation for unbounded data streams | Storm開発環境構築の際に参考にしました。 |
Apache Storm - Running Topologies on a Production Cluster | リモート(分散)モードで動かす際の参考にしました。 |
stackoverflow - How to submit a topology in storm production cluster using IDE | storm jarコマンドを用いてトポロジをsubmitした際に発生したエラーを解決しました。 |
Storm公式ページのTrouble shooting | Topologyがうまく動作しない場合に参考にしました。 |