はじめに
Apache CamelではConcurrency APIを用いた並列処理が簡単に利用できる仕組みを提供しています。
例えば、splitterパターンを用いた並列処理は以下のように実現できます。
並列処理にするためにparallelProcessing属性を追加しているだけです。
XML DSLの場合)
<route>
<from uri="direct:start" />
<split parallelProcessing="true">
<simple>${body}</simple>
<log message="${body}" />
</split>
</route>
Java DSLの場合)
from("direct:start")
.split(simple("${body}")).parallelProcessing(true)
.log("${body}");
splitterパターンだけでなく、以下のパターンでも並列処理を行い、パフォーマンスを向上させることができます。
- aggregate
- multicast
- recipientList
- split
- wireTap
Camelの並列処理の仕組みは下図のとおりです。
新規のタスクはQueueに入り、スレッドプール中のスレッドに対してタスクが割り当てられます。
Queueがいっぱいになった場合は新規のタスクを破棄する、Queueからタスクを削除するなど、いくつかの選択肢から選ぶことができます。
このとき、並列処理はデフォルトのスレッドプールプロファイルからスレッドプールが作成されて使用されます。
デフォルトのスレッドプールプロファイルは以下の値を持ちます。
オプション | デフォルト値 | 型 | 説明 |
---|---|---|---|
poolSize | 10 | int | ExecutorServiceのプールサイズを指定する。 |
maxPoolSize | 20 | int | ExecutorServiceの最大プールサイズを指定する。 |
keepAliveTime | 60 | long | アクティブではないスレッドを存続させる期間を指定する。 |
maxQueueSize | 1000 | int | 基になるJava ExecutorServiceのワーカーキューに保持するタスクの最大数を示す数値 |
rejectedPolicy | CallerRuns | 拒否されたタスクを処理する方法を指定する。 以下のオプションがサポートされている。 ・Abort 例外をスローする ・CallerRuns 呼び出し元で処理する ・Discard タスクを破棄する ・DiscardOldest キューの中で最も古いデータを破棄する |
デフォルト値は以下のソースを参照。
カスタムスレッドプールプロファイルを使用する
通常はデフォルトのスレッドプールで十分ですが、ユースケースに応じてスレッドプールのプールサイズを変更するなどのカスタマイズができます。
デフォルトのスレッドプールプロファイルの設定値を変更することもできますが、1つだけしか設定できません。そのため、ユースケースに応じてカスタムスレッドプールプロファイルを複数作成するのがお勧めです。
カスタムスレッドプールプロファイルは以下のように定義します。
XML DSLの場合)
<threadPoolProfile id="customThreadPoolProfile"
poolSize="20" maxPoolSize="50" maxQueueSize="100"
keepAliveTime="60" rejectedPolicy="CallerRuns" />
Java DSLの場合)
ThreadPoolProfile custom = new ThreadPoolProfileBuilder("customThreadPoolProfile").poolSize(20)
.maxPoolSize(50).maxQueueSize(100).keepAliveTime(60L).rejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns).build();
オプションを指定しなかった場合は、デフォルトプロファイルから継承されます。
スレッドプールは、このカスタムスレッドプールプロファイルから生成されます。
複数のルートで利用された場合は、その数だけのカスタムスレッドプールが生成され、他のルートとスレッドプールが共有されることはありません。
このカスタムスレッドプールプロファイルをSplitterパターンで使用した例は以下のようになります。
executorServiceRef属性に、先ほど定義したカスタムスレッドプールプロファイルのIDを指定するだけです。
XML DSLの場合)
<route>
<from uri="direct:start" />
<split parallelProcessing="true" executorServiceRef="customThreadPoolProfile">
<simple>${body}</simple>
<log message="${body}" />
</split>
</route>
Java DSLの場合)
from("direct:start")
.split(simple("${body}")).parallelProcessing(true).executorServiceRef("customThreadPoolProfile")
.log("${body}");
以下のように複数のルートでカスタムスレッドプールプロファイルを使用することもできます。このとき、カスタムスレッドプールは共有されません。
<route>
<from uri="direct:start" />
<split parallelProcessing="true" executorServiceRef="customThreadPoolProfile">
<simple>${body}</simple>
<log message="${body}" />
</split>
</route>
<route>
<from uri="direct:start2" />
<split parallelProcessing="true" executorServiceRef="customThreadPoolProfile">
<simple>${body}</simple>
<log message="${body}" />
</split>
</route>
キューにタスクを追加できない場合の処理(rejectedPlociy)
キューにタスクを追加できない場合の処理は、rejectedPlociy属性の値によって決まります。
rejectedPlociy属性のとりうる値は以下の4つになります。
- Abort
- CallerRuns
- Discard
- DiscardOldest
デフォルトはCallerRunsで、キューがいっぱいの場合は呼び出し元のスレッドで処理を行います。
Abortでは例外を発生させ、Discardはタスクを破棄、DiscardOldestはキューで最も古いタスクを削除します。
rejectedPlociy属性にAbortを設定した場合にキューにタスクを追加できないと以下の例外が発生します。
[2019-02-18 21:43:35.448], [ERROR], o.a.c.p.DefaultErrorHandler, Camel (camel-1) thread #1 - timer://trigger, org.apache.camel.processor.DefaultErrorHandler, Failed delivery for (MessageId: ID-mky-PC-1550493813701-0-2 on ExchangeId: ID-mky-PC-1550493813701-0-1). Exhausted after delivery attempt: 1 caught: java.util.concurrent.RejectedExecutionException: Task SubmitOrderedFutureTask[1] rejected from org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@7efb53af[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0][Split]
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route1 ] [route1 ] [timer://trigger?repeatCount=1 ] [ 37]
[route1 ] [process1 ] [ref:testData ] [ 11]
[route1 ] [to1 ] [direct:start ] [ 0]
[route2 ] [split1 ] [split[simple{${body}}] ] [ 16]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.util.concurrent.RejectedExecutionException: Task SubmitOrderedFutureTask[1] rejected from org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@7efb53af[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0][Split]
at org.apache.camel.ThreadPoolRejectedPolicy$1.rejectedExecution(ThreadPoolRejectedPolicy.java:49) ~[camel-core-2.23.0.jar:2.23.0]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) ~[?:1.8.0_172]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) ~[?:1.8.0_172]
~省略~
おまけ
今回は説明に入れませんでしたが、以下のことも可能です。
ただし、(私は)カスタムスレッドプールプロファイルが作成できれば十分かと思っているため省略しました。(気が向いたら書くかも)
- デフォルトスレッドプールプロファイルをカスタマイズする
- カスタムスレッドプール(プロファイルではない)を作成する