Apache Storm 1.0.0を使ってみようシリーズです。前回までではこんなのを試しています。
Apache Storm 1.0.0を使ってみる Storm UI編
前回はとりあえず、Storm 1.0.0を使ってみた系の記事を書きたくてかなり表面的なところを行ったわけですが、~~今回も表面的なところをなぞっていますよ。~~だんだんとStormの深みを目指したいと思います。
の前に、本家の説明サイトは以下になります。
Distributed Cache APIって何?
Topology毎に共有できるデータ保存場所があり、ファイルをいちいちTopologyと一緒にデプロイしなくて済む、という仕組みです。共有したいデータが大きければ大きいほど恩恵を受けられる、ということです。本家サイトでは、「位置情報」とか「辞書データ」を保持するといいよ、言っています。が、実際には設定ファイルとかの共有がまずはしたいかな、、、と自分は思ったりしてますが、キャッシュというからには初期化のタイミングとかもあるようだし、その辺どうなんだろう。要調査ですね。
BlobStoreにデータは保持される
データを保持するために、BlobStoreというInterfaceを介して、現在2種類の実装が存在します。
- LocalFsBlobStore
- HdfsBlobStore
それぞれ処理のイメージは以下になります。まずデータ自体はコマンドベースで登録されます(JavaAPIとかあるので、そちらでもOK)。データはキー文字列と、ファイルの中身(圧縮可能)からなります。NimbusがTopology起動を開始した際、Supervisorがコードやら設定やらJarファイルをBlobStoreからDLします。(ここまでは今までと同じ流れなのかな)その際、BlobStoreのMapも取得して、Localizerの助けを借りながら、共有データをDLするようです。(ほぼ和訳)
まあ、何はともあれ動かしてみましょう。
ちなみに何も共有データを登録せずにTopologyを起動すると・・・
# ./bin/storm jar examples/storm-starter/storm-starter-topologies-1.0.0.jar org.apache.storm.starter.clj.word_count test_topo -c topology.blobstore.map='{"key1":{"localname":"blob_file", "uncompress":"false"}}'
当然怒られます。
2016-04-23 11:19:37.599 o.a.s.d.supervisor [ERROR]
KeyNotFoundException(msg:key1)
at org.apache.storm.generated.Nimbus$getBlobMeta_result$getBlobMeta_resultStandardScheme.read(Nimbus.java:23897)
at org.apache.storm.generated.Nimbus$getBlobMeta_result$getBlobMeta_resultStandardScheme.read(Nimbus.java:23865)
at org.apache.storm.generated.Nimbus$getBlobMeta_result.read(Nimbus.java:23796)
at org.apache.storm.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
at org.apache.storm.generated.Nimbus$Client.recv_getBlobMeta(Nimbus.java:769)
at org.apache.storm.generated.Nimbus$Client.getBlobMeta(Nimbus.java:756)
at org.apache.storm.blobstore.NimbusBlobStore.getBlobMeta(NimbusBlobStore.java:306)
at org.apache.storm.utils.Utils.nimbusVersionOfBlob(Utils.java:558)
at org.apache.storm.localizer.Localizer.downloadBlob(Localizer.java:508)
at org.apache.storm.localizer.Localizer.access$000(Localizer.java:64)
at org.apache.storm.localizer.Localizer$DownloadBlob.call(Localizer.java:497)
at org.apache.storm.localizer.Localizer$DownloadBlob.call(Localizer.java:473)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2016-04-23 11:19:37.600 o.a.s.d.supervisor [INFO] Failed to download blob resources for storm-id test_topo-1-1461377693
2016-04-23 11:19:37.628 o.a.s.d.supervisor [INFO] Finished downloading code for storm id test_topo-1-1461377693
BlobStoreへの登録
というわけで、BlobStoreにデータを登録しましょう。公式サイトには以下のコマンドで登録可能、と書いてあります。
# storm blobstore create --file README.txt --acl o::rwa --repl-fctr 4 key1
が、実際には違います。"--repl-fctr"引数に対応していません。
Exception in thread "main" java.lang.Exception: '--repl-fctr' is not a valid argument
at org.apache.storm.shade.clojure.tools.cli$apply_specs.invoke(cli.clj:73)
at org.apache.storm.shade.clojure.tools.cli$cli.doInvoke(cli.clj:135)
at clojure.lang.RestFn.invoke(RestFn.java:460)
at org.apache.storm.command.blobstore$create_cli.invoke(blobstore.clj:89)
at org.apache.storm.command.blobstore$_main.doInvoke(blobstore.clj:156)
at clojure.lang.RestFn.applyTo(RestFn.java:137)
at org.apache.storm.command.blobstore.main(Unknown Source)
ヘルプを見てみると--replication-factorに対応していると。。。やってくれますね。
# ./bin/storm help blobstore
Syntax: [storm blobstore cmd]
list [KEY...] - lists blobs currently in the blob store
cat [-f FILE] KEY - read a blob and then either write it to a file, or STDOUT (requires read access).
create [-f FILE] [-a ACL ...] [--replication-factor NUMBER] KEY - create a new blob. Contents comes from a FILE
or STDIN. ACL is in the form [uo]:[username]:[r-][w-][a-] can be comma separated list.
update [-f FILE] KEY - update the contents of a blob. Contents comes from
a FILE or STDIN (requires write access).
delete KEY - delete an entry from the blob store (requires write access).
set-acl [-s ACL] KEY - ACL is in the form [uo]:[username]:[r-][w-][a-] can be comma
separated list (requires admin access).
replication --read KEY - Used to read the replication factor of the blob.
replication --update --replication-factor NUMBER KEY where NUMBER > 0. It is used to update the
replication factor of a blob.
For example, the following would create a mytopo:data.tgz key using the data
stored in data.tgz. User alice would have full access, bob would have
read/write access and everyone else would have read access.
storm blobstore create mytopo:data.tgz -f data.tgz -a u:alice:rwa,u:bob:rw,o::r
上記を踏まえて、README.markdownを登録してみます。("create"は存在するファイルから内容をBlobStoreに登録するコマンドであり、公式に書いてあるREADME.txtも地味に合っていません。)
# ./bin/storm blobstore create --file README.markdown --acl o::rwa --replication-factor 4 key1
Running: java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/apache-storm-1.0.0 -Dstorm.log.dir=/opt/apache-storm-1.0.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/apache-storm-1.0.0/lib/storm-core-1.0.0.jar:/opt/apache-storm-1.0.0/lib/kryo-3.0.3.jar:/opt/apache-storm-1.0.0/lib/reflectasm-1.10.1.jar:/opt/apache-storm-1.0.0/lib/asm-5.0.3.jar:/opt/apache-storm-1.0.0/lib/minlog-1.3.0.jar:/opt/apache-storm-1.0.0/lib/objenesis-2.1.jar:/opt/apache-storm-1.0.0/lib/clojure-1.7.0.jar:/opt/apache-storm-1.0.0/lib/disruptor-3.3.2.jar:/opt/apache-storm-1.0.0/lib/log4j-api-2.1.jar:/opt/apache-storm-1.0.0/lib/log4j-core-2.1.jar:/opt/apache-storm-1.0.0/lib/log4j-slf4j-impl-2.1.jar:/opt/apache-storm-1.0.0/lib/slf4j-api-1.7.7.jar:/opt/apache-storm-1.0.0/lib/log4j-over-slf4j-1.6.6.jar:/opt/apache-storm-1.0.0/lib/servlet-api-2.5.jar:/opt/apache-storm-1.0.0/lib/storm-rename-hack-1.0.0.jar:/opt/storm/conf:/opt/apache-storm-1.0.0/bin org.apache.storm.command.blobstore create --file README.markdown --acl o::rwa --replication-factor 4 key1
2106 [main] INFO o.a.s.c.blobstore - Creating key1 with ACL ("o::rwa")
2681 [main] INFO o.a.s.c.blobstore - Successfully created key1
無事登録されたようです。
BlobStoreの中身を確認する
登録できたので、中身を確認してみましょう。ただのテキストファイルを登録しているので、そのまま見れるはずです。が、ここにも落とし穴があります。
公式サイトでは現在「storm.local.dir/nimbus/blob」が保存ディレクトリと書いてありましたが、実際には「storm.local.dir/storm-local/blobs/」に配置されていました。ちょいちょい公式が間違ってますねw
上記ディレクトリ配下にはランダム数字のディレクトリになっているので、あてずっぽうでそれっぽいものを表示してみます。
$ less storm-local/blobs/729/data_key1/data
これで表示されました。
Master Branch: [](https://travis-ci.org/apache/storm)
Storm is a distributed realtime computation system. Similar to how Hadoop provides a set of general primitives for doing batch processing, Storm provides a set of general primitives for doing realtime computation. Storm is simple, can be used with any programming language, [is used by many companies](http://storm.apache.org/documentation/Powered-By.html), and is a lot of fun to use!
READMEが確かに登録されているようです。では、このファイルが実際に各Topologyからで共有されるのか、確認したいと思います。
BlobStoreの内容を複数のTopologyから参照しているか確認する
とりあえず以下のコマンドで、一つ目のTopologyを起動しましょう。本当は設定した値を使ってみたいのですが、一旦Stormのサンプルjarファイルを用いて動作確認だけしちゃいます。
$ ./bin/storm jar examples/storm-starter/storm-starter-topologies-1.0.0.jar org.apache.storm.starter.clj.word_count test_topo -c topology.blobstore.map='{"key1":{"localname":"blob_file", "uncompress":"false"}}'
Topology IDが「test_topo-4-1461390482」で出来上がったので、supervisor.logを見てみると、ログからはSupervisorが無事ダウンロードできたように見えます。
2016-04-23 14:48:05.782 o.a.s.d.supervisor [INFO] Downloading code for storm id test_topo-4-1461390482
2016-04-23 14:48:06.279 o.a.s.d.supervisor [INFO] Successfully downloaded blob resources for storm-id test_topo-4-1461390482
2016-04-23 14:48:06.280 o.a.s.d.supervisor [INFO] Finished downloading code for storm id test_topo-4-1461390482
:
2016-04-23 14:48:06.285 o.a.s.d.supervisor [INFO] Creating symlinks for worker-id: 6d23e0f9-9aa1-43c6-a475-773b0537bdfb storm-id: test_topo-4-1461390482 to its port artifacts directory
2016-04-23 14:48:06.286 o.a.s.d.supervisor [INFO] Creating symlinks for worker-id: 6d23e0f9-9aa1-43c6-a475-773b0537bdfb storm-id: test_topo-4-1461390482 for files(2): ("resources" "blob_file")
続いて、同じ内容をtest_topo2というトポロジを作成して確認してみます。
$ ./bin/storm jar examples/storm-starter/storm-starter-topologies-1.0.0.jar org.apache.storm.starter.clj.word_count test_topo2 -c topology.blobstore.map='{"key1":{"localname":"blob_file", "uncompress":"false"}}'
ちなみに「-c」オプションは、configurationをコマンドラインから投入するためのオプションです。試験的に何度も値変えたい時などに便利。
「test_topo2-5-1461390876」というIDで生成されたようなので、supervisor.logを再び確認。
2016-04-23 14:54:37.675 o.a.s.d.supervisor [INFO] Downloading code for storm id test_topo2-5-1461390876
2016-04-23 14:54:38.414 o.a.s.d.supervisor [INFO] Successfully downloaded blob resources for storm-id test_topo2-5-1461390876
2016-04-23 14:54:38.415 o.a.s.d.supervisor [INFO] Finished downloading code for storm id test_topo2-5-1461390876
:
2016-04-23 14:54:38.423 o.a.s.d.supervisor [INFO] Creating symlinks for worker-id: add589f8-6454-44a9-86f1-909796d5a847 storm-id: test_topo2-5-1461390876 to its port artifacts directory
2016-04-23 14:54:38.424 o.a.s.d.supervisor [INFO] Creating symlinks for worker-id: add589f8-6454-44a9-86f1-909796d5a847 storm-id: test_topo2-5-1461390876 for files(2): ("resources" "blob_file")
おお、きちんとこちらも同じ内容をDonwloadできているっポイですね。これで2つのTopology間でデータ共有ができました。
まとめ
今までTopologyで利用するデータはTopologyと一緒にデプロイする、というのが一般的でしたが、共有データとして保持できることで、効率的なStormの運用ができそうです。これらがJavaAPIでできるようになってもいるようなので、そちらも時間があったら調べてみたいですね。