Posted at

Apache CamelアプリケーションからApache Igniteを分散高速キャッシュサーバとして利用する


はじめに

Apache Igniteはインメモリデータグリッドを実現するOSSです。具体的にはインメモリDB、KVS、分散メッセージングシステム、分散キャッシュサーバ等の役割を果たすことができます。

今回は、Apache Igniteを分散キャッシュサーバとして、Apache Camelアプリケーションから利用したいと思います。

Apache Camelでキャッシュサーバへアクセスするコンポーネントは以下の4つがあります。

※他にもあるかもしれませんが私が知っているのは4つのみで、使用したことがあるのはRedisとIgniteの2つ。


  • Hazelcast

  • Infinispan

  • Redis

  • Apache Ignite

その中で今回の記事でApache Igniteを選択したのは、以前に使用したことがありCamelから利用したいと思ったからです。

Apache CamelでIgniteに接続するためにはCamel Igniteコンポーネントを使用します。

今回はIgniteのv2.6.0を使用します。環境の構築方法は以下の記事を参照してください。


作成するアプリケーションの説明

今回作成するアプリケーションは、下図のように1秒ごとに日時のメッセージをIgniteのキャッシュへPUSHし、Igniteから同メッセージをGETするものです。また、Igniteのキャッシュから指定したキーのキャッシュをREMOVEします。

image.png

それでは、Igniteをキャッシュサーバとして利用するアプリケーションを作成してみましょう。


Igniteを使用するためのライブラリを追加する

Mavenの場合は、pom.xmlに以下のライブラリを追加します。

camel-igniteがCamelでIgniteを扱うためのコンポーネントで、${camel.version}には使用しているcamelのバージョンを指定します。

ignite-coreはIgnite用のクライアントライブラリです。今回、設定はSpring XMLを使用するため、ignite-springも追加しておきます。

        <dependency>

<groupId>org.apache.camel</groupId>
<artifactId>camel-ignite</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>2.6.0</version>
</dependency>


コンポーネントのURI

camel-igniteコンポーネントをキャッシュサーバとして使用するためのURIは以下のようになります。

ignite-cache:cacheName

コンテキストパス(cacheName)には対象のキャッシュ名を指定します。


アプリケーションを作成する

まず、Igniteへの接続設定を行います。

IgniteではIpFinderで接続先のIgniteノードを指定します。下のTcpDiscoveryVmIpFinder は固定IPアドレスで接続先を指定するIpFinderで、ignite1, ignite2の2サーバを設定しています。

            IgniteConfiguration config = new IgniteConfiguration();

TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
Set<String> nodes = new HashSet<String>();
nodes.add("ignite1:47500..47509");
nodes.add("ignite2:47500..47509");
ipFinder.setAddresses(nodes);
config.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));

次に、Igniteのtest01キャッシュに文字列をPUTするrouteを作成します。

Java DSLで書いたrouteは以下のようになります。


  • キャッシュのキーは、IgniteConstants.IGNITE_CACHE_KEYヘッダにsetHeaderメソッドで指定します。

  • キャッシュの操作は、URIのoperationオプションに"PUT", "GET", "REMOVE"などを指定します。

  • PUT, GETするデータはメッセージのBODYに、setBodyメソッドで指定します。

                    from("timer:triggerPut?period=1000") // 1000ミリ秒毎に実行

.routeId("ignite_put_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest01へPUT
.log("KEY=testkey01, PUT = ${body}"); // 取得したキャッシュの内容をログに表示

Igniteのtest01キャッシュからGETするrouteを作成します。

                    from("timer:triggerGet?period=1000") // 1000ミリ秒毎に実行

.routeId("ignite_get_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.to("ignite-cache:test01?operation=GET") // キャッシュtest01からGET
.log("KEY=testkey01, GET = ${body}"); // 取得したキャッシュの内容をログに表示

                    from("timer:triggerRemove?repeatCount=1") // 1回だけ実行

.routeId("ignite_remove_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey02")) // キャッシュのキーをtestkey02に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest02へPUT
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}") // 取得したキャッシュの内容をログに表示
.to("ignite-cache:test01?operation=REMOVE") // キャッシュtest02からREMOVE
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}"); // 取得したキャッシュの内容をログに表示(REMOVE後なので空になっている)

これでIgniteへの接続設定、キャッシュへPUT、REMOVE、キャッシュからGETするrouteの作成が終わりました。

これらを使用するmain関数などを作成した全体のソースは以下になります。

    public static void main(String[] args) {

try {
CamelContext context = new DefaultCamelContext();

IgniteConfiguration config = new IgniteConfiguration();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
Set<String> nodes = new HashSet<String>();
nodes.add("ignite1:47500..47509");
nodes.add("ignite2:47500..47509");
ipFinder.setAddresses(nodes);// 接続先のノードを指定
config.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));

IgniteCacheComponent ignite = new IgniteCacheComponent();
ignite.setIgniteConfiguration(config);
context.addComponent("ignite-cache", ignite);

context.addRoutes(new RouteBuilder() {
public void configure() {
from("timer:triggerPut?period=1000") // 1000ミリ秒毎に実行
.routeId("ignite_put_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest01へPUT
.log("KEY=testkey01, PUT = ${body}"); // 取得したキャッシュの内容をログに表示

from("timer:triggerGet?period=1000") // 1000ミリ秒毎に実行
.routeId("ignite_get_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.to("ignite-cache:test01?operation=GET") // キャッシュtest01からGET
.log("KEY=testkey01, GET = ${body}"); // 取得したキャッシュの内容をログに表示

from("timer:triggerRemove?repeatCount=1") // 1回だけ実行
.routeId("ignite_remove_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey02")) // キャッシュのキーをtestkey02に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest02へPUT
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}") // 取得したキャッシュの内容をログに表示
.to("ignite-cache:test01?operation=REMOVE") // キャッシュtest02からREMOVE
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}"); // 取得したキャッシュの内容をログに表示(REMOVE後なので空になっている)
}
});

context.start();
Thread.sleep(10000);
context.stop();
} catch (Exception e) {
e.printStackTrace();
}
}


作成したCamelアプリケーションを実行する

このアプリケーションを実行した際に出力されたログについて説明します。

次のログはPUTとGETのrouteが出力したログです。

どちらも1秒ごとに出力され、PUTとGETともに日時の文字列が出力されています。

別のrouteで実行されているため、GETとPUTの順番が入れ替わることもあります。

[2019-02-01 21:41:04.258], [INFO ], ignite_producer_route, Camel (camel-1) thread #1 - timer://triggerPut, ignite_producer_route, KEY=testkey01, PUT = 2019-02-01 21:41:04

[2019-02-01 21:41:05.230], [INFO ], ignite_producer_route, Camel (camel-1) thread #1 - timer://triggerPut, ignite_producer_route, KEY=testkey01, PUT = 2019-02-01 21:41:05
[2019-02-01 21:41:05.230], [INFO ], ignite_get_route, Camel (camel-1) thread #2 - timer://triggerGet, ignite_get_route, KEY=testkey01, GET = 2019-02-01 21:41:05
[2019-02-01 21:41:06.229], [INFO ], ignite_producer_route, Camel (camel-1) thread #1 - timer://triggerPut, ignite_producer_route, KEY=testkey01, PUT = 2019-02-01 21:41:06
[2019-02-01 21:41:06.229], [INFO ], ignite_get_route, Camel (camel-1) thread #2 - timer://triggerGet, ignite_get_route, KEY=testkey01, GET = 2019-02-01 21:41:06

次のログはREMOVEのrouteが出力したログです。

最初のGETでは日時が表示されていますが、REMOVE実行後の2回目のGETでは日時が空になっていることが分かります。

[2019-02-01 21:41:04.263], [INFO ], ignite_remove_route, Camel (camel-1) thread #3 - timer://triggerRemove, ignite_remove_route, KEY=testkey02, GET = 2019-02-01 21:41:04

[2019-02-01 21:41:04.272], [INFO ], ignite_remove_route, Camel (camel-1) thread #3 - timer://triggerRemove, ignite_remove_route, KEY=testkey02, GET =

今回のソースでは、エンドポイントURIでキャッシュへの操作を設定しましたが、IgniteConstants.IGNITE_CACHE_OPERATIONメッセージヘッダーを設定することでメッセージごとに変更することができます。

そのため、以下のソースは同じ結果になります。

// (1)エンドポイントURIでキャッシュへの操作を設定した場合

.to("ignite-cache:test01?operation=PUT")
.to("ignite-cache:test01?operation=GET")

// (2)IgniteConstants.IGNITE_CACHE_OPERATIONメッセージヘッダーに設定した場合
.setHeader(IgniteConstants.IGNITE_CACHE_OPERATION, simple("PUT"))
.to("ignite-cache:test01")
.setHeader(IgniteConstants.IGNITE_CACHE_OPERATION, simple("GET"))
.to("ignite-cache:test01")


Igniteサーバとアプリケーションの関係

今回、ignite1, ignite2という2台のIgniteのキャッシュサーバへ接続しました。

Camelアプリケーションはクライアントのように見えますが、実はキャッシュサーバとして実行されています。

つまり、2台のキャッシュサーバのクラスタにジョインし、自らもキャッシュサーバとして機能しているということです。これはアプリケーションとキャッシュサーバを同一サーバ上で動かす場合に効率的な構成になります。

逆にキャッシュサーバの規模が大きくなると、アプリケーションサーバとキャッシュサーバは別サーバとして構成したい場合があります。その場合は、アプリケーションをクライアントモードで起動することになります。

次はアプリケーションをクライアントモードで起動するようにアプリケーションを作成してみます。


アプリケーションをIgniteサーバのクライアントとして作成する

アプリケーションをクライアントモードで起動するには、以下の1行を追加するだけです。これで今回の構成ではキャッシュサーバ2台のクライアントとしてアプリケーションが起動することになります。

            Ignition.setClientMode(true);

これだけでは面白くないので、設定をSpring XMLに変更したいと思います。

Igniteでは設定ファイルを以下のように記載することができます。

ファイルは「default-config-cluster2server.xml」というファイル名で作成しています。

これは先ほど作成したプログラムの情報がXMLに書かれているだけのものになります。


default-config-cluster2server.xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd"
>
<!--
Alter configuration below as needed.
-->

<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<value>ignite1:47500..47509</value>
<value>ignite2:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>


作成したXMLファイルを読み込むには以下のように指定します。

"default-config-cluster2server.xml"は、ファイルパスを指定しています。

"grid.cfg"はXML中で指定した、BEANのIDになります。

            IgniteConfiguration config = Ignition.loadSpringBean("default-config-cluster2server.xml", "grid.cfg");

クライアントモードを設定し、コンフィグにXMLを用いた場合のソース全体は以下のようになります。route定義は前述のソースと同じです。

    public static void main(String[] args) {

try {
CamelContext context = new DefaultCamelContext();

IgniteConfiguration config = Ignition.loadSpringBean("default-config-cluster2server.xml", "grid.cfg");
IgniteCacheComponent ignite = IgniteCacheComponent.fromConfiguration(config);
Ignition.setClientMode(true);

context.addComponent("ignite-cache", ignite);

context.addRoutes(new RouteBuilder() {
public void configure() {
from("timer:triggerPut?period=1000") // 1000ミリ秒毎に実行
.routeId("ignite_put_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest01へPUT
.log("KEY=testkey01, PUT = ${body}"); // 取得したキャッシュの内容をログに表示

from("timer:triggerGet?period=1000") // 1000ミリ秒毎に実行
.routeId("ignite_get_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.to("ignite-cache:test01?operation=GET") // キャッシュtest01からGET
.log("KEY=testkey01, GET = ${body}"); // 取得したキャッシュの内容をログに表示

from("timer:triggerRemove?repeatCount=1") // 1回だけ実行
.routeId("ignite_remove_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey02")) // キャッシュのキーをtestkey02に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest02へPUT
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}") // 取得したキャッシュの内容をログに表示
.to("ignite-cache:test01?operation=REMOVE") // キャッシュtest02からREMOVE
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}"); // 取得したキャッシュの内容をログに表示(REMOVE後なので空になっている)
}
});

context.start();
Thread.sleep(10000);
context.stop();
} catch (Exception e) {
e.printStackTrace();
}

Apachg InigteのコンソールであるIgniteVisorからtopコマンドでクラスタの状態を確認します。

visor> top

Hosts: 3
+=================================================================================================================================+
| Int./Ext. IPs | Node ID8(@) | Node Type | OS | CPUs | MACs | CPU Load |
+=================================================================================================================================+
| 0:0:0:0:0:0:0:1%lo | 1: 0D2C0B3D(@n0) | Server | Linux amd64 3.10.0-693.2.2.el7.x86_64 | 1 | 08:00:27:76:F3:CF | 1.00 % |
| 127.0.0.1 | | | | | | |
| 192.168.20.72 | | | | | | |
+--------------------+------------------+-----------+---------------------------------------+------+-------------------+----------+
| 192.168.100.254 | 1: 8A498968(@n1) | Server | Linux amd64 3.10.0-693.2.2.el7.x86_64 | 1 | 02:42:2F:F1:6A:FD | 0.67 % |
| 192.168.20.71 | | | | | 08:00:27:E3:4A:44 | |
| 127.0.0.1 | | | | | | |
+--------------------+------------------+-----------+---------------------------------------+------+-------------------+----------+
| 0:0:0:0:0:0:0:1 | 1: F751C80C(@n2) | Client | Windows 10 amd64 10.0 | 4 | 00:1B:DC:F5:B4:36 | 0.00 % |
| 192.168.20.197 | | | | | 0A:00:27:00:00:0F | |
| 127.0.0.1 | | | | | 0A:00:27:00:00:0B | |
+---------------------------------------------------------------------------------------------------------------------------------+

Summary:
+--------------------------------------+
| Active | true |
| Total hosts | 3 |
| Total nodes | 3 |
| Total CPUs | 6 |
| Avg. CPU load | 0.56 % |
| Avg. free heap | 80.00 % |
| Avg. Up time | 09:44:38 |
| Snapshot time | 2019-02-02 02:51:35 |
+--------------------------------------+

すると、上のように"Node Type"が"Client"であるnodeが確認できます。これは今回作成したアプリケーションであり、クライアントモードで動作しているということになります。

クライアントモードで動作している場合は、クラスタには参加しますがキャッシュはローカルには持ちません。


camel-igniteコンポーネントの主なプロパティ

最後にcamel-igniteコンポーネントの主なプロパティについて説明します。

プロパティは以下の表以外にもありますので、詳細は公式ページを参照してください。

プロパティ名
producer/consumer
説明
デフォルト値

operation
producer
キャッシュに実行する操作を指定する。以下の値を指定する。GET, PUT, REMOVE, SIZE, REBALANCE, QUERY, CLEAR.

IgniteCacheOperation


参考