はじめに
Apache Igniteはインメモリデータグリッドを実現するOSSです。具体的にはインメモリDB、KVS、分散メッセージングシステム、分散キャッシュサーバ等の役割を果たすことができます。
今回は、Apache Igniteを分散キャッシュサーバとして、Apache Camelアプリケーションから利用したいと思います。
Apache Camelでキャッシュサーバへアクセスするコンポーネントは以下の4つがあります。
※他にもあるかもしれませんが私が知っているのは4つのみで、使用したことがあるのはRedisとIgniteの2つ。
- Hazelcast
- Infinispan
- Redis
- Apache Ignite
その中で今回の記事でApache Igniteを選択したのは、以前に使用したことがありCamelから利用したいと思ったからです。
Apache CamelでIgniteに接続するためにはCamel Igniteコンポーネントを使用します。
今回はIgniteのv2.6.0を使用します。環境の構築方法は以下の記事を参照してください。
作成するアプリケーションの説明
今回作成するアプリケーションは、下図のように1秒ごとに日時のメッセージをIgniteのキャッシュへPUSHし、Igniteから同メッセージをGETするものです。また、Igniteのキャッシュから指定したキーのキャッシュをREMOVEします。
それでは、Igniteをキャッシュサーバとして利用するアプリケーションを作成してみましょう。
Igniteを使用するためのライブラリを追加する
Mavenの場合は、pom.xmlに以下のライブラリを追加します。
camel-igniteがCamelでIgniteを扱うためのコンポーネントで、${camel.version}には使用しているcamelのバージョンを指定します。
ignite-coreはIgnite用のクライアントライブラリです。今回、設定はSpring XMLを使用するため、ignite-springも追加しておきます。
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-ignite</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>2.6.0</version>
</dependency>
コンポーネントのURI
camel-igniteコンポーネントをキャッシュサーバとして使用するためのURIは以下のようになります。
ignite-cache:cacheName
コンテキストパス(cacheName)には対象のキャッシュ名を指定します。
アプリケーションを作成する
まず、Igniteへの接続設定を行います。
IgniteではIpFinderで接続先のIgniteノードを指定します。下のTcpDiscoveryVmIpFinder は固定IPアドレスで接続先を指定するIpFinderで、ignite1, ignite2の2サーバを設定しています。
IgniteConfiguration config = new IgniteConfiguration();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
Set<String> nodes = new HashSet<String>();
nodes.add("ignite1:47500..47509");
nodes.add("ignite2:47500..47509");
ipFinder.setAddresses(nodes);
config.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));
次に、Igniteのtest01キャッシュに文字列をPUTするrouteを作成します。
Java DSLで書いたrouteは以下のようになります。
- キャッシュのキーは、IgniteConstants.IGNITE_CACHE_KEYヘッダにsetHeaderメソッドで指定します。
- キャッシュの操作は、URIのoperationオプションに"PUT", "GET", "REMOVE"などを指定します。
- PUT, GETするデータはメッセージのBODYに、setBodyメソッドで指定します。
from("timer:triggerPut?period=1000") // 1000ミリ秒毎に実行
.routeId("ignite_put_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest01へPUT
.log("KEY=testkey01, PUT = ${body}"); // 取得したキャッシュの内容をログに表示
Igniteのtest01キャッシュからGETするrouteを作成します。
from("timer:triggerGet?period=1000") // 1000ミリ秒毎に実行
.routeId("ignite_get_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.to("ignite-cache:test01?operation=GET") // キャッシュtest01からGET
.log("KEY=testkey01, GET = ${body}"); // 取得したキャッシュの内容をログに表示
from("timer:triggerRemove?repeatCount=1") // 1回だけ実行
.routeId("ignite_remove_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey02")) // キャッシュのキーをtestkey02に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest02へPUT
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}") // 取得したキャッシュの内容をログに表示
.to("ignite-cache:test01?operation=REMOVE") // キャッシュtest02からREMOVE
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}"); // 取得したキャッシュの内容をログに表示(REMOVE後なので空になっている)
これでIgniteへの接続設定、キャッシュへPUT、REMOVE、キャッシュからGETするrouteの作成が終わりました。
これらを使用するmain関数などを作成した全体のソースは以下になります。
public static void main(String[] args) {
try {
CamelContext context = new DefaultCamelContext();
IgniteConfiguration config = new IgniteConfiguration();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
Set<String> nodes = new HashSet<String>();
nodes.add("ignite1:47500..47509");
nodes.add("ignite2:47500..47509");
ipFinder.setAddresses(nodes);// 接続先のノードを指定
config.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));
IgniteCacheComponent ignite = new IgniteCacheComponent();
ignite.setIgniteConfiguration(config);
context.addComponent("ignite-cache", ignite);
context.addRoutes(new RouteBuilder() {
public void configure() {
from("timer:triggerPut?period=1000") // 1000ミリ秒毎に実行
.routeId("ignite_put_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest01へPUT
.log("KEY=testkey01, PUT = ${body}"); // 取得したキャッシュの内容をログに表示
from("timer:triggerGet?period=1000") // 1000ミリ秒毎に実行
.routeId("ignite_get_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.to("ignite-cache:test01?operation=GET") // キャッシュtest01からGET
.log("KEY=testkey01, GET = ${body}"); // 取得したキャッシュの内容をログに表示
from("timer:triggerRemove?repeatCount=1") // 1回だけ実行
.routeId("ignite_remove_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey02")) // キャッシュのキーをtestkey02に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest02へPUT
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}") // 取得したキャッシュの内容をログに表示
.to("ignite-cache:test01?operation=REMOVE") // キャッシュtest02からREMOVE
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}"); // 取得したキャッシュの内容をログに表示(REMOVE後なので空になっている)
}
});
context.start();
Thread.sleep(10000);
context.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
作成したCamelアプリケーションを実行する
このアプリケーションを実行した際に出力されたログについて説明します。
次のログはPUTとGETのrouteが出力したログです。
どちらも1秒ごとに出力され、PUTとGETともに日時の文字列が出力されています。
別のrouteで実行されているため、GETとPUTの順番が入れ替わることもあります。
[2019-02-01 21:41:04.258], [INFO ], ignite_producer_route, Camel (camel-1) thread #1 - timer://triggerPut, ignite_producer_route, KEY=testkey01, PUT = 2019-02-01 21:41:04
[2019-02-01 21:41:05.230], [INFO ], ignite_producer_route, Camel (camel-1) thread #1 - timer://triggerPut, ignite_producer_route, KEY=testkey01, PUT = 2019-02-01 21:41:05
[2019-02-01 21:41:05.230], [INFO ], ignite_get_route, Camel (camel-1) thread #2 - timer://triggerGet, ignite_get_route, KEY=testkey01, GET = 2019-02-01 21:41:05
[2019-02-01 21:41:06.229], [INFO ], ignite_producer_route, Camel (camel-1) thread #1 - timer://triggerPut, ignite_producer_route, KEY=testkey01, PUT = 2019-02-01 21:41:06
[2019-02-01 21:41:06.229], [INFO ], ignite_get_route, Camel (camel-1) thread #2 - timer://triggerGet, ignite_get_route, KEY=testkey01, GET = 2019-02-01 21:41:06
次のログはREMOVEのrouteが出力したログです。
最初のGETでは日時が表示されていますが、REMOVE実行後の2回目のGETでは日時が空になっていることが分かります。
[2019-02-01 21:41:04.263], [INFO ], ignite_remove_route, Camel (camel-1) thread #3 - timer://triggerRemove, ignite_remove_route, KEY=testkey02, GET = 2019-02-01 21:41:04
[2019-02-01 21:41:04.272], [INFO ], ignite_remove_route, Camel (camel-1) thread #3 - timer://triggerRemove, ignite_remove_route, KEY=testkey02, GET =
今回のソースでは、エンドポイントURIでキャッシュへの操作を設定しましたが、IgniteConstants.IGNITE_CACHE_OPERATIONメッセージヘッダーを設定することでメッセージごとに変更することができます。
そのため、以下のソースは同じ結果になります。
// (1)エンドポイントURIでキャッシュへの操作を設定した場合
.to("ignite-cache:test01?operation=PUT")
.to("ignite-cache:test01?operation=GET")
// (2)IgniteConstants.IGNITE_CACHE_OPERATIONメッセージヘッダーに設定した場合
.setHeader(IgniteConstants.IGNITE_CACHE_OPERATION, simple("PUT"))
.to("ignite-cache:test01")
.setHeader(IgniteConstants.IGNITE_CACHE_OPERATION, simple("GET"))
.to("ignite-cache:test01")
Igniteサーバとアプリケーションの関係
今回、ignite1, ignite2という2台のIgniteのキャッシュサーバへ接続しました。
Camelアプリケーションはクライアントのように見えますが、実はキャッシュサーバとして実行されています。
つまり、2台のキャッシュサーバのクラスタにジョインし、自らもキャッシュサーバとして機能しているということです。これはアプリケーションとキャッシュサーバを同一サーバ上で動かす場合に効率的な構成になります。
逆にキャッシュサーバの規模が大きくなると、アプリケーションサーバとキャッシュサーバは別サーバとして構成したい場合があります。その場合は、アプリケーションをクライアントモードで起動することになります。
次はアプリケーションをクライアントモードで起動するようにアプリケーションを作成してみます。
アプリケーションをIgniteサーバのクライアントとして作成する
アプリケーションをクライアントモードで起動するには、以下の1行を追加するだけです。これで今回の構成ではキャッシュサーバ2台のクライアントとしてアプリケーションが起動することになります。
Ignition.setClientMode(true);
これだけでは面白くないので、設定をSpring XMLに変更したいと思います。
Igniteでは設定ファイルを以下のように記載することができます。
ファイルは「default-config-cluster2server.xml」というファイル名で作成しています。
これは先ほど作成したプログラムの情報がXMLに書かれているだけのものになります。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--
Alter configuration below as needed.
-->
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<value>ignite1:47500..47509</value>
<value>ignite2:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
作成したXMLファイルを読み込むには以下のように指定します。
"default-config-cluster2server.xml"は、ファイルパスを指定しています。
"grid.cfg"はXML中で指定した、BEANのIDになります。
IgniteConfiguration config = Ignition.loadSpringBean("default-config-cluster2server.xml", "grid.cfg");
クライアントモードを設定し、コンフィグにXMLを用いた場合のソース全体は以下のようになります。route定義は前述のソースと同じです。
public static void main(String[] args) {
try {
CamelContext context = new DefaultCamelContext();
IgniteConfiguration config = Ignition.loadSpringBean("default-config-cluster2server.xml", "grid.cfg");
IgniteCacheComponent ignite = IgniteCacheComponent.fromConfiguration(config);
Ignition.setClientMode(true);
context.addComponent("ignite-cache", ignite);
context.addRoutes(new RouteBuilder() {
public void configure() {
from("timer:triggerPut?period=1000") // 1000ミリ秒毎に実行
.routeId("ignite_put_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest01へPUT
.log("KEY=testkey01, PUT = ${body}"); // 取得したキャッシュの内容をログに表示
from("timer:triggerGet?period=1000") // 1000ミリ秒毎に実行
.routeId("ignite_get_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.to("ignite-cache:test01?operation=GET") // キャッシュtest01からGET
.log("KEY=testkey01, GET = ${body}"); // 取得したキャッシュの内容をログに表示
from("timer:triggerRemove?repeatCount=1") // 1回だけ実行
.routeId("ignite_remove_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey02")) // キャッシュのキーをtestkey02に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest02へPUT
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}") // 取得したキャッシュの内容をログに表示
.to("ignite-cache:test01?operation=REMOVE") // キャッシュtest02からREMOVE
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}"); // 取得したキャッシュの内容をログに表示(REMOVE後なので空になっている)
}
});
context.start();
Thread.sleep(10000);
context.stop();
} catch (Exception e) {
e.printStackTrace();
}
Apachg InigteのコンソールであるIgniteVisorからtopコマンドでクラスタの状態を確認します。
visor> top
Hosts: 3
+=================================================================================================================================+
| Int./Ext. IPs | Node ID8(@) | Node Type | OS | CPUs | MACs | CPU Load |
+=================================================================================================================================+
| 0:0:0:0:0:0:0:1%lo | 1: 0D2C0B3D(@n0) | Server | Linux amd64 3.10.0-693.2.2.el7.x86_64 | 1 | 08:00:27:76:F3:CF | 1.00 % |
| 127.0.0.1 | | | | | | |
| 192.168.20.72 | | | | | | |
+--------------------+------------------+-----------+---------------------------------------+------+-------------------+----------+
| 192.168.100.254 | 1: 8A498968(@n1) | Server | Linux amd64 3.10.0-693.2.2.el7.x86_64 | 1 | 02:42:2F:F1:6A:FD | 0.67 % |
| 192.168.20.71 | | | | | 08:00:27:E3:4A:44 | |
| 127.0.0.1 | | | | | | |
+--------------------+------------------+-----------+---------------------------------------+------+-------------------+----------+
| 0:0:0:0:0:0:0:1 | 1: F751C80C(@n2) | Client | Windows 10 amd64 10.0 | 4 | 00:1B:DC:F5:B4:36 | 0.00 % |
| 192.168.20.197 | | | | | 0A:00:27:00:00:0F | |
| 127.0.0.1 | | | | | 0A:00:27:00:00:0B | |
+---------------------------------------------------------------------------------------------------------------------------------+
Summary:
+--------------------------------------+
| Active | true |
| Total hosts | 3 |
| Total nodes | 3 |
| Total CPUs | 6 |
| Avg. CPU load | 0.56 % |
| Avg. free heap | 80.00 % |
| Avg. Up time | 09:44:38 |
| Snapshot time | 2019-02-02 02:51:35 |
+--------------------------------------+
すると、上のように"Node Type"が"Client"であるnodeが確認できます。これは今回作成したアプリケーションであり、クライアントモードで動作しているということになります。
クライアントモードで動作している場合は、クラスタには参加しますがキャッシュはローカルには持ちません。
camel-igniteコンポーネントの主なプロパティ
最後にcamel-igniteコンポーネントの主なプロパティについて説明します。
プロパティは以下の表以外にもありますので、詳細は公式ページを参照してください。
プロパティ名 | producer/consumer | 説明 | デフォルト値 | 型 |
---|---|---|---|---|
operation | producer | キャッシュに実行する操作を指定する。以下の値を指定する。GET, PUT, REMOVE, SIZE, REBALANCE, QUERY, CLEAR. | IgniteCacheOperation |