LoginSignup
0

More than 5 years have passed since last update.

Apache Stormの導入〜リモートモードで独自のTopologyをSubmitするまで(完全版)

Posted at

Stormプロジェクトを作成、実行する

  • Apache Stormを使って、分散処理を行う方法です。
  • Web上にあるコードはリモートモード(1台のマシンで実行するエミュレータモードで終わっているものがほとんど。)ではなかったりと、色々四苦八苦しながら構築したノウハウを公開します。

1 環境構築

1.1 JDKとJavaの開発環境を整えます。

ホスト 用途 JDK
MacOS X 開発環境 Oracle Java SE Development Kit 7u79 JDK 7u80 with Netbeansでも問題ありません。
CentOS 6.6 Nimbus&Supervisors yum -y install java-1.7.0-openjdk-devel.x86_64
alternatives --set java /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java
alternatives --set javac /usr/lib/jvm/java-1.7.0-openjdk.x86_64/bin/javac
alternatives --set jar /usr/lib/jvm/java-1.7.0-openjdk.x86_64/bin/jar
export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk.x86_64
export JRE_HOME=/usr/lib/jvm/java-1.7.0-openjdk.x86_64/jre
export PATH=$PATH:/usr/lib/jvm/java-1.7.0-openjdk.x86_64/bin:/usr/lib/jvm/java-1.7.0-openjdk.x86_64/jre/bin
  • Nimbus&Supervisorsのjava -versionコマンドの結果
java -version
#java version "1.7.0_79"
#OpenJDK Runtime Environment (rhel-2.5.5.1.el6_6-x86_64 u79-b14)
#OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode)
javac -version
#javac 1.7.0_79

1.2 Stormをダウンロード、インストールします。

  • 下記スクリプトを使って、Apache Storm公式ページからダウンロードします。
sudo mkdir /usr/local
sudo mkdir /usr/local/downlad
sudo cd /usr/local/download
sudo curl http://ftp.riken.jp/net/apache/storm/apache-storm-0.9.4/apache-storm-0.9.4.tar.gz -o apache-storm-0.9.4.tar.gz
sudo tar xvzf apache-storm-0.9.4.tar.gz
sudo mv apache-storm-0.9.4 /usr/local/
sudo ln -s /usr/local/apache-storm-0.9.4/ /usr/local/storm
cd /usr/local/storm
echo 'export PATH=$PATH:/usr/local/storm/bin' >> ~/.bashrc
export PATH=$PATH:/usr/local/storm/bin

1.3 Mavenをダウンロード、インストールします。

sudo cd /usr/local/download
sudo curl http://ftp.tsukuba.wide.ad.jp/software/apache/maven/maven-3/3.3.1/binaries/apache-maven-3.3.1-bin.tar.gz -o apache-maven-3.3.1-bin.tar.gz
sudo tar xvzf apache-maven-3.3.1-bin.tar.gz
sudo mv apache-maven-3.3.1 /usr/local/
sudo ln -s /usr/local/apache-maven-3.3.1/ /usr/local/maven
echo 'export PATH=$PATH:/usr/local/maven/bin' >> ~/.bashrc
export PATH=$PATH:/usr/local/maven/bin
mvn -version

1.4 サンプルコードをダウンロードします。

mkdir ~/storm-project
cd ~/storm-project/
git clone git://github.com/nathanmarz/storm-starter
cd ~/storm-project/storm-starter/
mv m2-pom.xml pom.xml

1.5 Mavenを使ってコンパイルします。

mvn compile exec:java -Dstorm.topology=storm.starter.WordCountTopology

1.6 Mavenを使って、トポロジをsubmitできる形にパッケージングします。

mvn package

2 統合開発環境(IDE)を使ってStormを動かす。

  • Netbeansを使いました。

2.1 New ProjectでMavenを選択します。

Storm1.png

2.2 プロジェクト名等を指定します。

Storm2.png

2.3 pom.xmlを編集します。

Storm3.png

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>jp.soushi</groupId>
    <artifactId>SampleStorm</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>storm-test</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>jp.soushi.samplestorm.PrimeNumberTopology</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>install</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm-lib</artifactId>
            <version>0.9.0</version>
            <scope>provided</scope>
            <!-- keep storm out of the jar-with-dependencies
               <scope>provided</scope> -->
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>github-releases</id>
            <url>http://oss.sonatype.org/content/repositories/github-releases/</url>
        </repository>
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
    </repositories>
</project>

2.4 NumberSpoutクラス、PrimeNumberBoltクラス、PrimeNumberTopologyクラスを作成します。

==パッケージ名は変更すること!==

  • NumberSpout.java
package jp.soushi.samplestorm;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values; 
import java.util.Map;

public class NumberSpout extends BaseRichSpout 
{
    private SpoutOutputCollector collector;

    private static int currentNumber = 1;

    @Override
    public void open( Map conf, TopologyContext context, SpoutOutputCollector collector ) 
    {
        this.collector = collector;
    }

    @Override
    public void nextTuple() 
    {
        // Emit the next number
        collector.emit( new Values( new Integer( currentNumber++ ) ) );
    }

    @Override
    public void ack(Object id) 
    {
    }

    @Override
    public void fail(Object id) 
    {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
    {
        declarer.declare( new Fields( "number" ) );
    }
}
  • PrimeNumberBolt.java
package jp.soushi.samplestorm;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class PrimeNumberBolt extends BaseRichBolt 
{
    private OutputCollector collector;

    public void prepare( Map conf, TopologyContext context, OutputCollector collector ) 
    {
        this.collector = collector;
    }

    public void execute( Tuple tuple ) 
    {
        int number = tuple.getInteger( 0 );
        if( isPrime( number) )
        {
            System.out.println( number );
        }
        collector.ack( tuple );
    }

    public void declareOutputFields( OutputFieldsDeclarer declarer ) 
    {
        declarer.declare( new Fields( "number" ) );
    }   

    private boolean isPrime( int n ) 
    {
        if( n == 1 || n == 2 || n == 3 )
        {
            return true;
        }

        // Is n an even number?
        if( n % 2 == 0 )
        {
            return false;
        }

        //if not, then just check the odds
        for( int i=3; i*i<=n; i+=2 ) 
        {
            if( n % i == 0)
            {
                return false;
            }
        }
        return true;
    }
}
  • PrimeNumberTopology.java
package jp.soushi.samplestorm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;

public class PrimeNumberTopology 
{
    public static void main(String[] args) 
    {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout( "spout", new NumberSpout() );
        builder.setBolt( "prime", new PrimeNumberBolt() )
                .shuffleGrouping("spout");


        Config conf = new Config();

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", conf, builder.createTopology());
        Utils.sleep(10000);
        cluster.killTopology("test");
        cluster.shutdown();
    }
}

==この時点で、import部分に赤線が付いている。気にせずクリーンビルドすると消えます。==

2.5 プロジェクトを右クリック->Propaties->RunでメインクラスをPrimeNumberTopologyに設定します。

Storm10.png

2.6 クリーンビルドします。(赤線エラーが消えます)

Storm4.png

Storm5.png

2.7 実行します。

  • 数字が表示されていれば成功です。

Storm6.png

Storm7.png

3 リモートモード(分散環境)で実行します。

3.1 nimbusの場所を指定するファイルを作成します。

mkdir ~/.storm
echo 'nimbus.host: "131.113.102.201"' > ~/.storm/storm.yaml

3.2 javaファイルをすべて書き換えます。

==PrimeNumberクラスを使うのをやめて、下記のクラスを定義します。
- ExclamationTopology.java(追加)
- ExclamationSpout.java(追加)
- ExclamationBolt.java(追加)
- AddQuestionBolt.java(追加)

  • トポロジ定義ファイル(ExclamationTopology.java)
package jp.soushi.samplestorm;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;

public class ExclamationTopology {
  public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("word", new ExclamationSpout(), 5);
    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
    builder.setBolt("question1", new AddQuestionBolt(), 3).shuffleGrouping("exclaim1");

    Config conf = new Config();
    conf.setDebug(true);
    conf.setNumWorkers(31);
    conf.setMaxSpoutPending(5000);
    StormSubmitter.submitTopology( "exclamationtopology", conf, builder.createTopology() );
  }
}
  • ボルト定義ファイル(ExclamationBolt.java)
package jp.soushi.samplestorm;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;

public class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
      _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }

}
  • ボルト定義ファイル2(AddQuestionBolt.java)
package jp.soushi.samplestorm;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;

public class AddQuestionBolt extends BaseRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
      _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }

}
  • スパウト定義ファイル(ExclamationSpout.java)
package jp.soushi.samplestorm;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;

import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ExclamationSpout extends BaseRichSpout {
    public static Logger LOG = LoggerFactory.getLogger(ExclamationSpout.class);
    boolean _isDistributed;
    SpoutOutputCollector _collector;

    public ExclamationSpout() {
        this(true);
    }

    public ExclamationSpout(boolean isDistributed) {
        _isDistributed = isDistributed;
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
    }

    public void close() {

    }

    public void nextTuple() {
        Utils.sleep(100);
        final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word));
    }

    public void ack(Object msgId) {

    }

    public void fail(Object msgId) {

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        if(!_isDistributed) {
            Map<String, Object> ret = new HashMap<String, Object>();
            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
            return ret;
        } else {
            return null;
        }
    }    
}

3.2 pom.xmlファイルを書き換えます。

  • storm jarコマンドで、トポロジをsubmitすると、下記のエラーが発生します。これは、storm本体(storm.jar)を実行ファイルに含めているせいです。そこで、pom.xmlファイルを修正し、mvn package時に、storm.jarファイルを含めないようにする必要があります。
Exception in thread "main" java.lang.ExceptionInInitializerError
    at backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:106)
    at jp.soushi.samplestorm.PrimeNumberTopology.main(PrimeNumberTopology.java:36)
Caused by: java.lang.RuntimeException: Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar. [jar:file:/Users/soushi/Desktop/SampleStorm/target/lib/storm-core-0.9.0.jar!/defaults.yaml, jar:file:/usr/local/apache-storm-0.9.3/lib/storm-core-0.9.3.jar!/defaults.yaml]
    at backtype.storm.utils.Utils.findAndReadConfigFile(Utils.java:133)
    at backtype.storm.utils.Utils.readDefaultConfig(Utils.java:160)
    at backtype.storm.utils.Utils.readStormConfig(Utils.java:184)
    at backtype.storm.utils.Utils.<clinit>(Utils.java:71)
    ... 2 more
  • pom.xmlファイルを次のように編集します。(provided行を追加するだけです。)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>jp.soushi</groupId>
    <artifactId>SampleStorm</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>storm-test</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>jp.soushi.samplestorm.ExclamationTopology</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>install</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm-lib</artifactId>
            <version>0.9.0</version>
            <scope>provided</scope>
            <!-- keep storm out of the jar-with-dependencies
               <scope>provided</scope> -->
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>github-releases</id>
            <url>http://oss.sonatype.org/content/repositories/github-releases/</url>
        </repository>
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
    </repositories>
</project>

3.3 Filesタブに切り替え、ルートディレクトリを右クリック->Tools->Show In Finderを選択します。(Finderでプロジェクトが保存されているディレクトリを表示します。)

Storm8.png

Storm9.png

3.4 Terminalでそのディレクトリに移動します。

cd /Users/soushi/Desktop/SampleStorm

3.5 mvn cleanコマンドでクリーンした後、mvn compileコマンドでコンパイルし、mvn packageコマンドでjarファイルを生成します。

mvn clean
mvn compile
mvn package

3.6 targetフォルダ内にjarファイルが生成されます。

Storm11.png

3.7 Nimbusにトポロジをsubmitします

command arg1 arg2 arg3
storm jar jarのパス メインクラス
cd /Users/soushi/Desktop/SampleStorm
storm jar /Users/soushi/Desktop/SampleStorm/target/SampleStorm-1.0-SNAPSHOT.jar jp.soushi.samplestorm.PrimeNumberTopology

Trouble shooting

トポロジがしばらくするとkillされてしまう

ログファイルを確認し、Async loop detectedが発生している場合

  • 【原因】トポロジ内にループが発生しています。
  • 【解決法】トポロジを定義しているJavaファイル(*Topology.java)を確認し、同じボルトクラスを2回使っていないかを確認します。同一のボルトクラスを使いまわさないよう変更してください。
  • ボルトを使いまわす場合は、ラベルを変えてループにならないようにしてください。

java.nio.channels.UnresolvedAddressExceptionが発生している場合

  • 【原因】各ノードの/etc/hostsファイルが正しく記述されておらず、ホスト名を解決できていません。
  • 【解決法】全ノード(Nimbus,Supervisors含む)の/etc/hostsファイルを正しく記述しているかを確認します。(ホスト名が解決できないとエラーが発生します)例えば、master1というホスト名を持つノードの場合、下記のように記述します。
127.0.0.1 master1 master1.yamanaka.ics.keio.ac.jp localhost localhost.localdomain
::1 master1 master1.yamanaka.ics.keio.ac.jp localhost localhost.localdomain

192.168.56.201 master1 master1.yamanaka.ics.keio.ac.jp
192.168.56.202 master2 master2.yamanaka.ics.keio.ac.jp
192.168.56.203 master3 master3.yamanaka.ics.keio.ac.jp
192.168.56.171 cluster1 cluster1.yamanaka.ics.keio.ac.jp
192.168.56.172 cluster2 cluster2.yamanaka.ics.keio.ac.jp
192.168.56.173 cluster3 cluster3.yamanaka.ics.keio.ac.jp
192.168.56.174 cluster4 cluster4.yamanaka.ics.keio.ac.jp

org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refusedが発生している場合

  • 様々な原因が考えられます。Trouble shootingの全ての項目を確認してください。ログを読むとより詳細な情報が得られる場合があります。また、Storm公式ページのTrouble shootingもご覧ください。

指定したWorker数までWorkerが起動しない

  • 【原因】Nimbusノードの、storm.yamlの- storm.yamlのsupervisor.slots.ports:が正しく記述されていないことが原因です。デフォルトでは4つのポート(6700,6701,6702,6703番)が設定されているため、デフォルト値のままだとノード数×4つのworkerまでしか起動できません。
  • 【解決法】Nimbusノードのstorm.yamlファイルの記述を修正します。1ノードあたり10workerを起動したい場合は、10ポート記述します。(合計起動可能worker数はノード数×ポート数になる点に注意)
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
    - 6704
    - 6705
    - 6706
    - 6707
    - 6708
    - 6709

その他上手く動かない場合

  • Zookeeperが通信できているかを確認します。
  • Nimbusを複数起動している場合は、1台のみで起動させます。
  • Nimbusノードのstorm.yamlに記述されているsupervisor.slots.ports:に、充分な数のポートが記載されているかを確認します。このポートは、同時に起動させたいワーカーの数だけ必要となります。例えば、下記のように、Worker数5で起動したトポロジを動かすためには、最低でも5ポート以上を記述しておく必要があります。また、Dockerコンテナ内でStormを実行する場合には、EXPOSEでポートを開けておく必要があります。
  • サンプルコードを動かしてみてください。下記の3つのファイルを含むMavenを使ったJavaプロジェクトを作成し、nimbusノードにプロジェクトのフォルダごとscpで転送します。次に、Nimbusノード上でmvnを使ってコンパイルを行い、その後パッケージング(jarを作成)をします。最後に、storm jarコマンドを実行すると、jarファイルがsupervisorsに自動的に配布され、実行されます。
mvn clean
mvn compile
mvn package
storm jar ~/SampleStorm/target/SampleStorm-1.0-SNAPSHOT.jar jp.soushi.samplestorm.ExclamationTopology
  • トポロジ定義ファイル(ExclamationTopology.java)の例(動作確認済)
package jp.soushi.samplestorm;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;

public class ExclamationTopology {
  public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("word", new ExclamationSpout());
    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");

    Config conf = new Config();
    conf.setDebug(true);
    conf.setNumWorkers(11);
    conf.setMaxSpoutPending(5000);
    StormSubmitter.submitTopology( "exclamationtopology", conf, builder.createTopology() );
  }
}
  • ボルト定義ファイル(ExclamationBolt.java)の例(動作確認済)
package jp.soushi.samplestorm;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;

public class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
      _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }

}
  • スパウト定義ファイルの例(動作確認済)
package jp.soushi.samplestorm;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;

import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ExclamationSpout extends BaseRichSpout {
    public static Logger LOG = LoggerFactory.getLogger(ExclamationSpout.class);
    boolean _isDistributed;
    SpoutOutputCollector _collector;

    public ExclamationSpout() {
        this(true);
    }

    public ExclamationSpout(boolean isDistributed) {
        _isDistributed = isDistributed;
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
    }

    public void close() {

    }

    public void nextTuple() {
        Utils.sleep(1000);
        final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word));
    }

    public void ack(Object msgId) {

    }

    public void fail(Object msgId) {

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        if(!_isDistributed) {
            Map<String, Object> ret = new HashMap<String, Object>();
            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
            return ret;
        } else {
            return null;
        }
    }    
}
  • pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>jp.soushi</groupId>
    <artifactId>adcontex</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>

    <name>adcontex</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>jp.soushi.adcontex.AdContexTopology</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>install</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <configuration>
                    <!-- put your configurations here -->
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>central</id>
            <url>https://repo1.maven.org/maven2</url>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
        <repository>
            <id>github-releases</id>
            <url>http://oss.sonatype.org/content/repositories/github-releases/</url>
        </repository>
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
      <repository>
          <id>local-project-libraries</id>
          <name>Local project libraries</name>
          <url>file://${project.basedir}/lib</url>
          <layout>default</layout>
      </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm-lib</artifactId>
            <version>0.9.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.3.1</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
</project>

References

URL comment
JAVA WORLD:HOW-TO Open source Java projects:Storm Parallel realtime computation for unbounded data streams Storm開発環境構築の際に参考にしました。
Apache Storm - Running Topologies on a Production Cluster リモート(分散)モードで動かす際の参考にしました。
stackoverflow - How to submit a topology in storm production cluster using IDE storm jarコマンドを用いてトポロジをsubmitした際に発生したエラーを解決しました。
Storm公式ページのTrouble shooting Topologyがうまく動作しない場合に参考にしました。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0