Edited at

【Apache Camel】簡単にスループットをログに出力する


簡単にスループットをログに出力する

Apache Camelのルートのスループットを簡単に計測する方法を説明します。

CamelではLogコンポーネントを使用することで、指定時間内にルートで処理したデータ件数をログに出力することができます。

スループットを出力する場合、TOエンドポイントにログコンポーネントを以下のように指定します。

to("log:example.camelbegginer.throughput.PutThroughput?groupInterval=1000")

この例での「example.camelbegginer.throughput.PutThroughput」はロギングカテゴリと呼ばれ、任意の名前を指定できます。ロギングカテゴリはログに出力されるので、測定する処理毎に名前を分けておくと便利です。

groupIntervalオプションで1秒(1000ミリ秒)毎にスループットを出力する設定になります。


スループットの出力例

以下がスループットの出力例になります。

ThroughputLogger, example.camelbegginer.throughput.PutThroughput, Received: 100 new messages, with total 300 so far. Last group took: 1001 millis which is: 99.9 messages per second. average: 103.199

最初に「ThroughputLogger, example.camelbegginer.throughput.PutThroughput」では、指定したロギングカテゴリの名前が出力されています。

「Received: 100 new messages, with total 300 so far. Last group took: 1001 millis」は、1001ミリ秒の間に100の新しいメッセージを処理し、合計で300件処理したことを示しています。1000ミリ秒毎にスループットを出力するように設定していますが、1ミリ秒はずれることがあります。

「99.9 messages per second. average: 103.199」で、1秒間で99.9メッセージを処理し、平均103.199件を処理できていることを示しています。

このようにLOGコンポーネントにより、簡単にスループットを出力することができます。


スループット出力のオプション一覧

スループット出力の際に指定するオプションは下表のとおりです。

Logコンポーネントには下表以外にもログレベル(level)などのオプションがありますが、スループットとは直接関係しないため省略しています。

オプション項目
デフォルト値
説明

groupInterval
null
スループットを出力する間隔(ミリ秒)を指定します。"1000"を指定すると、1秒毎のスループットがログに出力されます。groupActiveOnlyとgroupDelayは本オプションが指定した場合のみ使用されます。

groupActiveOnly
true
"true"に設定すると、出力する間隔の間に有効なデータ(0件より多い)の場合のみログを出力します。

groupDelay
0
最初の遅延時間(ミリ秒)を指定します。

groupSize
null
指定した件数だけ処理するとスループットがログに出力されます。"100"を指定すると、100件を処理するごとにログを出力します。groupSizeを指定した場合は、groupIntervalで指定した値は無効になります。


実装例

ここでは、実際に1000件のデータを流し、スループットがどのように出力されるのかを見てみたいと思います。

最初に1000件のデータを準備します。

以下のコードでは、SimpleDataSetクラスを利用し、setSizeメソッドで1000件のデータを指定しています。

SimpleDataSetクラスは、テスト用のコンポーネントで、システムの負荷試験などで利用するための大量データを簡単に作成することができます。

            SimpleDataSet dataSet = new SimpleDataSet();

dataSet.setSize(1000);

次に、SimpleDataSetクラスのインスタンスをレジストリに登録し、CamelContextのインスタンスを生成します。レジストリには先ほど生成した1000件のデータを持つSimpleDataSetを追加しています。

            SimpleRegistry registry = new SimpleRegistry();

registry.put("testDataSet", dataSet);

CamelContext context = new DefaultCamelContext(registry);

最後にCamelContextに1000件のデータを処理するルートを追加します。

    static RouteBuilder createRouteBuilder() {

return new RouteBuilder() {
public void configure() throws Exception {
from("dataset:testDataSet?produceDelay=-1")
.split().simple("${header.CamelDataSetIndex}")
.throttle(100)
.to("log:example.camelbegginer.throughput.PutThroughput?level=INFO&groupInterval=1000");
}
};
}

「from("dataset:testDataSet?produceDelay=-1")」で、先ほど生成した1000件のデータをfromエンドポイントに指定しています。

「split().simple("${header.CamelDataSetIndex}").throttle(100)」では、headerのCamelDataSetIndexで1000件のメッセージを分割し、1秒間に100件のメッセージを流すように指定しています。

CamelDataSetIndexはSimpleDataSetにより自動的に1から連番が指定されています。

アプリケーションを実行し、log4j2で出力されたログの出力例は以下のようになります。

ログが1秒おきに出力され、その時のスループットが表示されていることが確認できます。

[2019-01-13 08:27:46.429], [INFO ], e.c.t.PutThroughput, Camel (camel-1) thread #1 - ThroughputLogger, example.camelbegginer.throughput.PutThroughput, Received: 100 new messages, with total 100 so far. Last group took: 906 millis which is: 110.375 messages per second. average: 110.375

[2019-01-13 08:27:47.422], [INFO ], e.c.t.PutThroughput, Camel (camel-1) thread #1 - ThroughputLogger, example.camelbegginer.throughput.PutThroughput, Received: 100 new messages, with total 200 so far. Last group took: 1000 millis which is: 100 messages per second. average: 104.932
[2019-01-13 08:27:48.423], [INFO ], e.c.t.PutThroughput, Camel (camel-1) thread #1 - ThroughputLogger, example.camelbegginer.throughput.PutThroughput, Received: 100 new messages, with total 300 so far. Last group took: 1001 millis which is: 99.9 messages per second. average: 103.199
[2019-01-13 08:27:49.421], [INFO ], e.c.t.PutThroughput, Camel (camel-1) thread #1 - ThroughputLogger, example.camelbegginer.throughput.PutThroughput, Received: 100 new messages, with total 400 so far. Last group took: 998 millis which is: 100.2 messages per second. average: 102.433
[2019-01-13 08:27:50.422], [INFO ], e.c.t.PutThroughput, Camel (camel-1) thread #1 - ThroughputLogger, example.camelbegginer.throughput.PutThroughput, Received: 100 new messages, with total 500 so far. Last group took: 1001 millis which is: 99.9 messages per second. average: 101.916
[2019-01-13 08:27:51.423], [INFO ], e.c.t.PutThroughput, Camel (camel-1) thread #1 - ThroughputLogger, example.camelbegginer.throughput.PutThroughput, Received: 100 new messages, with total 600 so far. Last group took: 1001 millis which is: 99.9 messages per second. average: 101.574
[2019-01-13 08:27:52.421], [INFO ], e.c.t.PutThroughput, Camel (camel-1) thread #1 - ThroughputLogger, example.camelbegginer.throughput.PutThroughput, Received: 100 new messages, with total 700 so far. Last group took: 998 millis which is: 100.2 messages per second. average: 101.376

今回のサンプルプログラムの全体は以下のようになります。

package example.camelbegginer.throughput;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.dataset.SimpleDataSet;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.SimpleRegistry;

public class PutThroughput {

public static void main(String[] args) {
try {
SimpleDataSet dataSet = new SimpleDataSet();
dataSet.setSize(1000);

SimpleRegistry registry = new SimpleRegistry();
registry.put("testDataSet", dataSet);

CamelContext context = new DefaultCamelContext(registry);
context.addRoutes(createRouteBuilder());

context.start();
Thread.sleep(20000);
context.stop();

} catch (Exception e) {
e.printStackTrace();
}
}

static RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() throws Exception {
from("dataset:testDataSet?produceDelay=-1")
.split().simple("${header.CamelDataSetIndex}")
.throttle(100)
.to("log:example.camelbegginer.throughput.PutThroughput?level=INFO&groupInterval=1000");
}
};
}
}


最後に

Apache Camelでは以上のように簡単にスループットをログに出力することができました。

今回のスループット出力は以下のようなケースで利用できるかと思います。


  • 単体試験時に試しにスループットを確認する。並列処理できていることの確認等ができます。

  • 性能試験時に各処理の性能の計測方法として利用する。

  • プロダクション環境で、性能を計測する手段が準備できない場合は、簡易な性能測定の方法として利用する。

スループット出力の負荷が低いため、プロダクション環境でも十分利用できる機能になっています。


参考