Apache Camel 処理中のメッセージ数を取得する
※とりあえずメモとして残す。後でまともな記事に編集するかも。他サーバから処理中のメッセージ数を取得する等を書きたい。
Inflight Exchange = routeで処理中のメッセージ
InflightRepositoryインターフェースのsizeメソッドで、routeで処理中のメッセージ数(Inflight Exchange)を取得できる。
定時処理バッチで、前回バッチの処理が終了していない場合はスキップする必要がある場合は、メッセージ数が0でなければ処理を開始しないなどの処理ができそう。
TestProcessor.java
package sample;
import javax.annotation.Resource;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class TestProcessor implements Processor {
Logger logger = LoggerFactory.getLogger(TestProcessor.class);
@Resource
private CamelContext context;
@Override
public void process(Exchange exchange) throws Exception {
logger.info("TestProcessor start ");
int size = context.getInflightRepository().size("parentRoute");
logger.info("Inflight Size = {}", size);
}
}
1秒間に1メッセージずつルートに投入すると以下のように出力される。
[2018-11-17 10:22:52.504], [INFO ], s.TestProcessor, Camel (camel-1) thread #2 - timer://runwatch, sample.TestProcessor, Inflight Size = 1
[2018-11-17 10:22:53.484], [INFO ], s.TestProcessor, Camel (camel-1) thread #2 - timer://runwatch, sample.TestProcessor, Inflight Size = 2
[2018-11-17 10:22:54.485], [INFO ], s.TestProcessor, Camel (camel-1) thread #2 - timer://runwatch, sample.TestProcessor, Inflight Size = 3
[2018-11-17 10:22:55.484], [INFO ], s.TestProcessor, Camel (camel-1) thread #2 - timer://runwatch, sample.TestProcessor, Inflight Size = 4
[2018-11-17 10:22:56.486], [INFO ], s.TestProcessor, Camel (camel-1) thread #2 - timer://runwatch, sample.TestProcessor, Inflight Size = 5