思ってた以上に前回の記事がアクセス多かったので、今回は使い方のほうをご紹介しようかと。
Apache Cmaelとは
前回の記事に書いた
Apache Camel (Java)を使うと開発が楽になる7つの理由
ステップ0 > ガチのHelloWorld
package test.hello;
public class Hello {
public static void main(String[] args) throws Exception {
System.out.println("hello camel");
}
}
あぁぁガチすぎる。まぁ全ての基本なので。
ステップ1 > Camelを動かしてみる
コード
package test.hello;
import org.apache.camel.main.Main;
public class Hello {
public static void main(String[] args) throws Exception {
System.out.println("hello camel");
Main main = new Main();
main.start();
}
}
Camelライブラリ
pom.xml
もしくはcamelからダウンロードしたzipに入っているcamel-core-xxxxxx.jarのライブラリを使う。
pomの場合
${camel-ver}
は最新のcamelバージョンに読み替えて。(現在2.13.0)
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>${camel-ver}</version>
</dependency>
pomに関してわからない場合は
Eclipse+Maven という便利な開発環境をインストールからプロジェクト作成まで
ちょっと話それたけど、できたら起動!
起動した時のコンソール
hello camel
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
面白くもなんともなくプログラムは終了。
start() を呼ぶと、一発起動するのみで終わる。
なにか文句っぽい事がコンソールに出てるけど、ログ用のライブラリが足りてないよ
と怒られている。
ステップ2 > Camelを常時動かしてみる
Webサーバみたいにずっと動くものにしてみる。
package test.hello;
import org.apache.camel.main.Main;
public class Hello {
public static void main(String[] args) throws Exception {
System.out.println("hello camel");
Main main = new Main();
main.run();
}
}
最後の1行がstart() --> run()
コンソールに出てくる結果は変わらないけど、プログラムが止まらなくなっている。
run() を呼び出すとサーバっぽくずーっと動くアプリに変化する。
前回、ログ関連で怒られていたので、ついでにログライブラリを追加しておく。
今回はslf4j-simple-1.6.6.jar(slf4jでログをコンソールに出力する用のライブラリ)を加えて実行すると
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j-ver}</version>
</dependency>
コンソール
hello camel
8 [main] INFO org.apache.camel.main.MainSupport - Apache Camel 2.13.0 starting
212 [main] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel 2.13.0 (CamelContext: camel-1) is starting
213 [main] INFO org.apache.camel.management.ManagedManagementStrategy - JMX is enabled
467 [main] INFO org.apache.camel.impl.converter.DefaultTypeConverter - Loaded 175 type converters
488 [main] INFO org.apache.camel.impl.DefaultCamelContext - AllowUseOriginalMessage is enabled. If access to the original message is no
t needed, then its recommended to turn this option off as it may improve performance.
488 [main] INFO org.apache.camel.impl.DefaultCamelContext - StreamCaching is not in use. If using streams then its recommended to enabl
e stream caching. See more details at http://camel.apache.org/stream-caching.html
488 [main] INFO org.apache.camel.impl.DefaultCamelContext - Total 0 routes, of which 0 is started.
494 [main] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel 2.13.0 (CamelContext: camel-1) started in 0.276 seconds
おぉぉ。起動したっぽいメッセージがたくさん出てきてる。
確認ポイントがあるとするなら、最後から2行目Total 0 routes, of which 0 is started.
ルートが0本っすよ!と教えてくれている。
ステップ3 > ルートを作成する
-
ルート
に関しては前の投稿に説明してあるので省略。 -
ルート = ベルトコンベア
で説明してある。 -
ルート
はRouteBuilderというクラスの拡張として実装する。 - 今回の
ルート
はからっぽ。
package test.hello.route;
import org.apache.camel.builder.RouteBuilder;
public class HelloRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
}
}
このルートを先ほどのMain
に追加してみる
package test.hello;
import org.apache.camel.main.Main;
import test.hello.route.TimerRoute;
public class Hello {
public static void main(String[] args) throws Exception {
System.out.println("hello camel");
Main main = new Main();
main.addRouteBuilder(new HelloRoute());
main.run();
}
}
これを起動してみても先ほどと変わらない。
ルート
(ベルトコンベア)になにも乗っけてないからね。
ついでにコンソールにもTotal 0 routes, of which 0 is started.
と、いまだにルートが0だと言ってくる。
ステップ4 > ルートにコンポーネントを乗っけてみる
今回のルートは
タイマー(デフォルト1秒間隔)
--> ログ出力
という簡単なお仕事。
package test.hello.route;
import org.apache.camel.builder.RouteBuilder;
public class HelloRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("timer:test-hello")
.to("log:test-log");
}
}
起動してみる
2082 [Camel (camel-1) thread #0 - timer://test-hello] INFO test-log - Exchange[ExchangePattern: InOnly, BodyType: null, Body: [Body is
null]]
3063 [Camel (camel-1) thread #0 - timer://test-hello] INFO test-log - Exchange[ExchangePattern: InOnly, BodyType: null, Body: [Body is
null]]
4064 [Camel (camel-1) thread #0 - timer://test-hello] INFO test-log - Exchange[ExchangePattern: InOnly, BodyType: null, Body: [Body is
null]]
解説
- デフォルトのログフォーマットとしては、一番左に起動してからのミリ秒が表示されている。
- ほぼ1.000秒毎に実行されている。最初のログがぜんぜん1.000秒と合っていないのは「1発めのメッセージ処理」は遅いという事です。初期化処理にありがちな感じ。
- RouteBuilderのconfigure()メソッドの中にfrom()を書くと1ルートができる。
- configure()の中に複数のルートを作れる。ルート毎に先頭にfrom()を書く。
- from().to().to()みたいに、先頭はfrom()、それ以外の場所はfrom()以外を書く。
- これ
ログ出力
ではなくhttp出力
だったらサーバー生存を通知する機能にもなりそう
ステップ5 > ルートにコア以外のコンポーネントを乗っけてみる
コア以外ってのは一番最初に追加したcamel-core-xxxxx.jarの事。
今回はStreamコンポーネントを使いたいので、pomに追加するか、camelをダウンロードした時に入っているjarライブラリ(camel-stream-xxxxx.jar)を追加する。今回外部ライブラリは使わない。
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-stream</artifactId>
<version>${camel-ver}</version>
</dependency>
ルート
を書き換える
今回のルートは
ストリーム入力
--> ストリーム出力
package test.hello.route;
import org.apache.camel.builder.RouteBuilder;
public class HelloRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("stream:in?promptMessage=Enter : ")
.to("stream:out");
}
}
起動してみよー。
Enter : AAAAA
AAAAA
Enter : BBBB
BBBB
なんだかよくありがちな、対話式なCLIツールの出来上がり。
この程度だったらcamel使わなくてもいい。あくまで例題。
テスト用にコンソールトリガーもあるよという程度で認識する。
ちなみに本家のサンプルではサーバログファイルのtail処理として使っている。
from("stream:file?fileName=/server/logs/server.log&scanStream=true&scanStreamDelay=1000")
ステップ6 > 独自実装を加える
基本
今回のルートは
ストリーム入力
--> 独自実装
--> ストリーム出力
ここでちょっとだけ解説。
前回の記事の「Camelとは」という所でも説明してあるが、
-
エクスチェンジ
という箱でベルトコンベアを流れる - 今回は独自実装なのでこれを意識しながら作る必要がある
-
エクスチェンジ
の中にメッセージ
があり -
ストリーム入力
ではエクスチェンジ
のメッセージ
のボディ
に「ユーザが入力した文字」を入れて送り出している - 今回は
ボディ
を取り出して加工して、加工結果をボディ
に入れるといった実装をしてみる。 - 独自実装はProcessorというインターフェース実装にする
- ルートからは「.process」で呼び出す。
package test.hello.process;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
public class HelloProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
String body = exchange.getIn().getBody(String.class);
body = "Hello " + body;
exchange.getIn().setBody(body);
}
}
getIn()
というのが流れてきたメッセージ
になる。
getOut()
というメソッドもあるが、流れてきたメッセージ
を無視して新しいメッセージ
を作成するといった意味のメソッドなので使わないでおこう。
メッセージ
の使い方は「つぎ足しつぎ足し」が基本だ。
ルートも書き換える
public void configure() throws Exception {
from("stream:in?promptMessage=Enter : ")
.process(new HelloProcessor())
.to("stream:out");
}
実行してみるー
Enter : Camel
Hello Camel
Enter :
動いたねぇ。
試しに、
ストリーム入力
--> 独自実装
--> 独自実装
--> ストリーム出力
public void configure() throws Exception {
from("stream:in?promptMessage=Enter : ")
.process(new HelloProcessor())
.process(new HelloProcessor())
.to("stream:out");
}
実行してみると
Enter : Camel
Hello Hello Camel
Enter :
ベルトコンベア+コンポーネントプログラムって感じがようやく出てきた。
まぁ最初のうちは「再利用できるように実装する」とかあまり考えないほうが吉。
既存コードの流用
Camelと関係ない既存のコードがあって、使い回したいというのがあったら方法は2つ
- 先ほどのプロセッサに既存コードを呼び出すコードを実装するやり方。おすすめ。
- bean()を使う。シンプルなやり方。逆に細かな制御はできない。
例えば、Camelとは一切関係ない実装があったとして
package test.hello.process;
public class HelloConv {
public String addHello(String data) {
return "MyHello " + data;
}
}
とすると、ルートからの呼び出しは
@Override
public void configure() throws Exception {
from("stream:in?promptMessage=Enter : ")
.process(new HelloProcessor())
.bean(HelloConv.class, "addHello(${body})")
.to("stream:out");
}
これで既存のコードが呼び出せる。
型を意識しない作り
あと、今回説明しないけど、入力される文字が数字だと分かっている場合
public void process(Exchange exchange) throws Exception {
Integer data = exchange.getIn().getBody(Integer.class);
data = data * 100;
exchange.getIn().setBody(data);
}
実行結果
Enter : 123
12300
Enter :
という具合に、ルート
に流れているデータの型を正確に意識することなく実装できる所も再利用性が高まる仕組みとなっていますね。
今回の流通データを正確に処理しようとした場合はString。
exchange.getIn.getBody()
で取得するとObject型で取れるので、そこから判断できる。まぁそんな使い方をわざわざしないけど。
ステップ7 > 例外を発生させてみる
やはりエラーが出た場合の挙動は気になるもの。
入力文字が空っぽの場合、エラーを発生させてみる。
@Override
public void process(Exchange exchange) throws Exception {
String body = exchange.getIn().getBody(String.class);
if ("".equals(body)) {
throw new Exception("empty value.");
}
body = "Hello " + body;
exchange.getIn().setBody(body);
}
実行結果
Enter : Camel
Hello Hello Camel
Enter :
6591 [Camel (camel-1) thread #0 - stream://in] ERROR org.apache.camel.processor.DefaultErrorHandler - Failed delivery for (MessageId: I
D-daikuro-pc-local-65359-1398503146643-0-3 on ExchangeId: ID-daikuro-pc-local-65359-1398503146643-0-4). Exhausted after delivery attemp
t: 1 caught: java.lang.Exception: empty value.
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route1 ] [route1 ] [stream://in?promptMessage=Enter+%3A+ ] [ 5]
[route1 ] [process1 ] [test.hello.process.HelloProcessor@702f9d0a ] [ 2]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-daikuro-pc-local-65359-1398503146643-0-4
ExchangePattern InOnly
Headers {breadcrumbId=ID-daikuro-pc-local-65359-1398503146643-0-3, CamelRedelivered=false, CamelRedeliveryCounter=0, Camel
StreamComplete=true, CamelStreamIndex=1}
BodyType String
Body
]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.Exception: empty value.
at test.hello.process.HelloProcessor.process(HelloProcessor.java:13)
at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:63)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:105)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)
at org.apache.camel.component.stream.StreamConsumer.processLine(StreamConsumer.java:203)
at org.apache.camel.component.stream.StreamConsumer.readFromStream(StreamConsumer.java:167)
at org.apache.camel.component.stream.StreamConsumer.run(StreamConsumer.java:96)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
Enter :
コンソールにログがいっぱい出力。
内容の説明
-
Message History
ルートのどこを通ってきてエラーになったか -
Message History
処理に何ミリ秒かかってたか -
Exchange
エラー時、エクスチェンジに何が入っていたか -
Stacktrace
エラー時のスタックトレース
といった内容が出力される。
便利。
Message History
に RouteId
がある。どこのルートでエラーになったかがすぐ分かるルート名。
どうやってルートに名前をつけるかというと、
@Override
public void configure() throws Exception {
from("stream:in?promptMessage=Enter : ").routeId("HelloRoute")
.process(new HelloProcessor())
.to("stream:out");
}
これで名前が付く。クラス名と同じにしとけばすぐに分かるよ。
JMXから見てもわかりやすい。例えばjconsoleを開くと、先ほどのrouteIdに、
ExchangeCompleted
という正常完了カウントが1件
ExchangeFailed
という失敗カウントが1件
というのが分かる
ステップ8 > 例外ハンドリング & リトライ処理
ステップ7の状態はログがコンソールに出力されているだけであって、ログ出力をファイルに切り替えたらコンソールになんの反応もなくなる。
エラーを検知したら、「なにかエラーが発生」をコンソールに表示する仕様を追加する。
@Override
public void configure() throws Exception {
onException(Exception.class)
.handled(true)
.setBody().constant("なにかエラーが発生")
.to("stream:out");
from("stream:in?promptMessage=Enter : ")
.process(new HelloProcessor())
.process(new HelloProcessor())
.to("stream:out");
}
ちょっと解説。
- onExceptionは例外処理用の特別な
ルート
- onExceptionは通常ルートより前に書く必要がある
- onExceptionは何個でもかける
- onExceptionのカッコ内はカンマ区切りで複数指定できる
- handled(true)は正常に処理し終えたよという事。例外をリスローしないという事。トランザクションをコミットするという事。
- setBody().constant(xxxx) という書き方は、独自実装が面倒な簡単処理用。メッセージのボディに固定値を入れる処理
Camelにはリトライ処理
というのも付いている
リトライ処理
されているか確認するために、エラー直前に標準出力を付けてみた。
@Override
public void process(Exchange exchange) throws Exception {
String body = exchange.getIn().getBody(String.class);
if ("".equals(body)) {
System.out.println("トライしたけどダメ");
throw new Exception("empty value.");
}
body = "Hello " + body;
exchange.getIn().setBody(body);
}
リトライ処理を追加せずに、とりあえず起動
Enter :
トライしたけどダメ
なにかエラーが発生
Enter :
リトライ処理
をエラーハンドラに設定してみる。
今回の設定は最大2回の再トライをする
といった設定。
public void configure() throws Exception {
onException(Exception.class).maximumRedeliveries(2)
.handled(true)
.setBody().constant("なにかエラーが発生")
.to("stream:out");
from("stream:in?promptMessage=Enter : ").routeId("HelloRoute")
.process(new HelloProcessor())
.to("stream:out");
}
実行結果は
Enter :
トライしたけどダメ
トライしたけどダメ
トライしたけどダメ
なにかエラーが発生
Enter :
まぁリトライしてもダメですよね。。。
けど、うまいことリトライしているっぽい。
また、リトライ間隔も柔軟に変更できる。
onException(Exception.class)
.maximumRedeliveries(10).delayPattern("0:1000;5:3000")
.handled(true)
このように書くと、
- 初回から4回目までのリトライ間隔は1秒
- 5回目以降のリトライ間隔は3秒になる
思った以上にしっかりと考えられている作り。
ステップ9 > 別なルートを追加
複数ルートを作るには2通り
- MainでaddRouteBuilderを呼ぶ
- 同じRouteBuilder内でfrom()から始めるコードを追加する
どちらも同じですが、違う点があるとするなら
1つに例外ハンドリングの対象外になるかどうかです。
例外ハンドラの対象になるかどうかの確認のため、別のルートビルダを作成してみます。
package test.hello.route;
import org.apache.camel.builder.RouteBuilder;
public class HelloRoute2 extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:route2")
.to("log:route2")
.throwException(new Exception("test error"))
;
}
}
解説
- throwExceptionは想像通り
- directは同一JavaVM内でルート同士の接続やコードから直接呼ばれる用
Mainに追加する事を忘れがちなので気をつけて
public static void main(String[] args) throws Exception {
System.out.println("hello camel");
Main main = new Main();
main.addRouteBuilder(new HelloRoute());
main.addRouteBuilder(new HelloRoute2());
main.run();
}
新しく作ったルートを呼び出す
public void configure() throws Exception {
onException(Exception.class).maximumRedeliveries(2)
.handled(true)
.setBody().constant("なにかエラーが発生")
.to("stream:out");
from("stream:in?promptMessage=Enter : ").routeId("HelloRoute")
.process(new HelloProcessor())
.to("direct:route2")
.to("stream:out");
}
}
.to("direct:route2")
で呼び出している。
1089 [main] INFO org.apache.camel.impl.DefaultCamelContext - Total 2 routes, of which 2 is started.
1094 [main] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel 2.13.0 (CamelContext: camel-1) started in 0.547 seconds
Enter :
トライしたけどダメ
トライしたけどダメ
トライしたけどダメ
なにかエラーが発生
Enter : ppp
25619 [Camel (camel-1) thread #0 - stream://in] INFO route2 - Exchange[ExchangePattern: InOnly, BodyType: String, Body: Hello ppp]
25622 [Camel (camel-1) thread #0 - stream://in] ERROR org.apache.camel.processor.DefaultErrorHandler - Failed delivery for (MessageId:
ID-daikuro-pc-local-50622-1398602463115-0-3 on ExchangeId: ID-daikuro-pc-local-50622-1398602463115-0-4). Exhausted after delivery attem
pt: 1 caught: java.lang.Exception: test error
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[HelloRoute ] [HelloRoute ] [stream://in?promptMessage=Enter+%3A+ ] [ 2]
[HelloRoute ] [process1 ] [test.hello.process.HelloProcessor@67da11d8 ] [ 0]
[HelloRoute ] [to2 ] [direct:route2 ] [ 3]
[route1 ] [to4 ] [log:route2 ] [ 0]
[route1 ] [throwException1 ] [throwException[java.lang.Exception] ] [ 1]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-daikuro-pc-local-50622-1398602463115-0-4
ExchangePattern InOnly
Headers {breadcrumbId=ID-daikuro-pc-local-50622-1398602463115-0-3, CamelRedelivered=false, CamelRedeliveryCounter=0, Camel
StreamComplete=true, CamelStreamIndex=1}
BodyType String
Body Hello ppp
]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.Exception: test error
at test.hello.route.HelloRoute2.configure(HelloRoute2.java:11)
at org.apache.camel.builder.RouteBuilder.checkInitialized(RouteBuilder.java:322)
at org.apache.camel.builder.RouteBuilder.configureRoutes(RouteBuilder.java:276)
at org.apache.camel.builder.RouteBuilder.addRoutesToCamelContext(RouteBuilder.java:262)
at org.apache.camel.impl.DefaultCamelContext.addRoutes(DefaultCamelContext.java:677)
at org.apache.camel.main.MainSupport.postProcessCamelContext(MainSupport.java:471)
at org.apache.camel.main.MainSupport.postProcessContext(MainSupport.java:406)
at org.apache.camel.main.Main.doStart(Main.java:108)
at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
at org.apache.camel.main.MainSupport.run(MainSupport.java:148)
at test.hello.Hello.main(Hello.java:16)
Enter :
HelloRoute2で出たエラーは自作のエラールートに流れない模様。
このように例外ハンドラのスコープを分ける事が可能!
呼び出し先のルートが例外ハンドリングしなかったら呼び元の例外ハンドリングが呼び出されそうですが、そうでもない。
デフォルトの例外ハンドラがハンドリングしたという事でハンドリング済みという動作をしているから。
例外ハンドラのスコープを分けない方法はまた別途。
おしまい。
その他
ここまで来ると色々なコンポーネントを使って動作させる事ができるはず!
色々なコンポーネントを組み合わせて、色んな物をちゃらっと作ってみてください。
その他の機能
後日説明するかも(リクエストあれば)
- リクエストリプライなfrom()とか
- 分岐とか
- ループとか
- フェールオーバーとか
- コンポーネントの拡張とか
- コンポーネントのデフォルト設定の変更とか
- Springとの組み合わせとか
- プロパティファイルとの連携とか
- 自動型変換機能とか
- トランザクション制御とか
- データ変換とか
- ストリームキャッシュとか。サイズが大きい処理データをメモリで保持すると大変(というか脆弱)なので、ファイルベースで管理する仕組み。
- 試験方法とか。試験がまたこれ便利。
注意
-
ルート
を普通のコード代わり
に細かく書きすぎるとルートが長くなり管理が面倒になるので、その辺の機能分割なり1プロセッサーの実装ポリシーはチームによって決めること。ルート書く人(大きなストーリー)と機能を実装する人が別の人でもいいぐらいがちょうどいいと思う。