5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Apache CamelアプリケーションからApache Igniteを分散高速キャッシュサーバとして利用する

Posted at

はじめに

Apache Igniteはインメモリデータグリッドを実現するOSSです。具体的にはインメモリDB、KVS、分散メッセージングシステム、分散キャッシュサーバ等の役割を果たすことができます。
今回は、Apache Igniteを分散キャッシュサーバとして、Apache Camelアプリケーションから利用したいと思います。

Apache Camelでキャッシュサーバへアクセスするコンポーネントは以下の4つがあります。
※他にもあるかもしれませんが私が知っているのは4つのみで、使用したことがあるのはRedisとIgniteの2つ。

  • Hazelcast
  • Infinispan
  • Redis
  • Apache Ignite

その中で今回の記事でApache Igniteを選択したのは、以前に使用したことがありCamelから利用したいと思ったからです。
Apache CamelでIgniteに接続するためにはCamel Igniteコンポーネントを使用します。

今回はIgniteのv2.6.0を使用します。環境の構築方法は以下の記事を参照してください。

作成するアプリケーションの説明

今回作成するアプリケーションは、下図のように1秒ごとに日時のメッセージをIgniteのキャッシュへPUSHし、Igniteから同メッセージをGETするものです。また、Igniteのキャッシュから指定したキーのキャッシュをREMOVEします。

image.png

それでは、Igniteをキャッシュサーバとして利用するアプリケーションを作成してみましょう。

Igniteを使用するためのライブラリを追加する

Mavenの場合は、pom.xmlに以下のライブラリを追加します。
camel-igniteがCamelでIgniteを扱うためのコンポーネントで、${camel.version}には使用しているcamelのバージョンを指定します。
ignite-coreはIgnite用のクライアントライブラリです。今回、設定はSpring XMLを使用するため、ignite-springも追加しておきます。

		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-ignite</artifactId>
			<version>${camel.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.ignite</groupId>
			<artifactId>ignite-core</artifactId>
			<version>2.6.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.ignite</groupId>
			<artifactId>ignite-spring</artifactId>
			<version>2.6.0</version>
		</dependency>

コンポーネントのURI

camel-igniteコンポーネントをキャッシュサーバとして使用するためのURIは以下のようになります。

ignite-cache:cacheName

コンテキストパス(cacheName)には対象のキャッシュ名を指定します。

アプリケーションを作成する

まず、Igniteへの接続設定を行います。
IgniteではIpFinderで接続先のIgniteノードを指定します。下のTcpDiscoveryVmIpFinder は固定IPアドレスで接続先を指定するIpFinderで、ignite1, ignite2の2サーバを設定しています。

			IgniteConfiguration config = new IgniteConfiguration();
			TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
			Set<String> nodes = new HashSet<String>();
			nodes.add("ignite1:47500..47509");
			nodes.add("ignite2:47500..47509");
			ipFinder.setAddresses(nodes);
		    config.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));

次に、Igniteのtest01キャッシュに文字列をPUTするrouteを作成します。
Java DSLで書いたrouteは以下のようになります。

  • キャッシュのキーは、IgniteConstants.IGNITE_CACHE_KEYヘッダにsetHeaderメソッドで指定します。
  • キャッシュの操作は、URIのoperationオプションに"PUT", "GET", "REMOVE"などを指定します。
  • PUT, GETするデータはメッセージのBODYに、setBodyメソッドで指定します。
					from("timer:triggerPut?period=1000") // 1000ミリ秒毎に実行
							.routeId("ignite_put_route")
							.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
							.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
							.to("ignite-cache:test01?operation=PUT") // キャッシュtest01へPUT
							.log("KEY=testkey01, PUT = ${body}"); // 取得したキャッシュの内容をログに表示

Igniteのtest01キャッシュからGETするrouteを作成します。

					from("timer:triggerGet?period=1000") // 1000ミリ秒毎に実行
							.routeId("ignite_get_route")
							.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
							.to("ignite-cache:test01?operation=GET") // キャッシュtest01からGET
							.log("KEY=testkey01, GET = ${body}"); // 取得したキャッシュの内容をログに表示
					from("timer:triggerRemove?repeatCount=1") // 1回だけ実行
							.routeId("ignite_remove_route")
							.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey02")) // キャッシュのキーをtestkey02に指定
							.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
							.to("ignite-cache:test01?operation=PUT") // キャッシュtest02へPUT
							.to("ignite-cache:test01?operation=GET")
							.log("KEY=testkey02, GET = ${body}") // 取得したキャッシュの内容をログに表示
							.to("ignite-cache:test01?operation=REMOVE") // キャッシュtest02からREMOVE
							.to("ignite-cache:test01?operation=GET")
							.log("KEY=testkey02, GET = ${body}"); // 取得したキャッシュの内容をログに表示(REMOVE後なので空になっている)

これでIgniteへの接続設定、キャッシュへPUT、REMOVE、キャッシュからGETするrouteの作成が終わりました。
これらを使用するmain関数などを作成した全体のソースは以下になります。

	public static void main(String[] args) {
		try {
			CamelContext context = new DefaultCamelContext();

			IgniteConfiguration config = new IgniteConfiguration();
			TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
			Set<String> nodes = new HashSet<String>();
			nodes.add("ignite1:47500..47509");
			nodes.add("ignite2:47500..47509");
			ipFinder.setAddresses(nodes);// 接続先のノードを指定
		    config.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));

			IgniteCacheComponent ignite = new IgniteCacheComponent();
			ignite.setIgniteConfiguration(config);
			context.addComponent("ignite-cache", ignite);

			context.addRoutes(new RouteBuilder() {
				public void configure() {
					from("timer:triggerPut?period=1000") // 1000ミリ秒毎に実行
							.routeId("ignite_put_route")
							.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
							.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
							.to("ignite-cache:test01?operation=PUT") // キャッシュtest01へPUT
							.log("KEY=testkey01, PUT = ${body}"); // 取得したキャッシュの内容をログに表示

					from("timer:triggerGet?period=1000") // 1000ミリ秒毎に実行
							.routeId("ignite_get_route")
							.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
							.to("ignite-cache:test01?operation=GET") // キャッシュtest01からGET
							.log("KEY=testkey01, GET = ${body}"); // 取得したキャッシュの内容をログに表示

					from("timer:triggerRemove?repeatCount=1") // 1回だけ実行
							.routeId("ignite_remove_route")
							.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey02")) // キャッシュのキーをtestkey02に指定
							.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
							.to("ignite-cache:test01?operation=PUT") // キャッシュtest02へPUT
							.to("ignite-cache:test01?operation=GET")
							.log("KEY=testkey02, GET = ${body}") // 取得したキャッシュの内容をログに表示
							.to("ignite-cache:test01?operation=REMOVE") // キャッシュtest02からREMOVE
							.to("ignite-cache:test01?operation=GET")
							.log("KEY=testkey02, GET = ${body}"); // 取得したキャッシュの内容をログに表示(REMOVE後なので空になっている)
				}
			});

			context.start();
			Thread.sleep(10000);
			context.stop();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

作成したCamelアプリケーションを実行する

このアプリケーションを実行した際に出力されたログについて説明します。

次のログはPUTとGETのrouteが出力したログです。
どちらも1秒ごとに出力され、PUTとGETともに日時の文字列が出力されています。
別のrouteで実行されているため、GETとPUTの順番が入れ替わることもあります。

[2019-02-01 21:41:04.258], [INFO ], ignite_producer_route, Camel (camel-1) thread #1 - timer://triggerPut, ignite_producer_route, KEY=testkey01, PUT = 2019-02-01 21:41:04
[2019-02-01 21:41:05.230], [INFO ], ignite_producer_route, Camel (camel-1) thread #1 - timer://triggerPut, ignite_producer_route, KEY=testkey01, PUT = 2019-02-01 21:41:05
[2019-02-01 21:41:05.230], [INFO ], ignite_get_route, Camel (camel-1) thread #2 - timer://triggerGet, ignite_get_route, KEY=testkey01, GET = 2019-02-01 21:41:05
[2019-02-01 21:41:06.229], [INFO ], ignite_producer_route, Camel (camel-1) thread #1 - timer://triggerPut, ignite_producer_route, KEY=testkey01, PUT = 2019-02-01 21:41:06
[2019-02-01 21:41:06.229], [INFO ], ignite_get_route, Camel (camel-1) thread #2 - timer://triggerGet, ignite_get_route, KEY=testkey01, GET = 2019-02-01 21:41:06

次のログはREMOVEのrouteが出力したログです。
最初のGETでは日時が表示されていますが、REMOVE実行後の2回目のGETでは日時が空になっていることが分かります。

[2019-02-01 21:41:04.263], [INFO ], ignite_remove_route, Camel (camel-1) thread #3 - timer://triggerRemove, ignite_remove_route, KEY=testkey02, GET = 2019-02-01 21:41:04
[2019-02-01 21:41:04.272], [INFO ], ignite_remove_route, Camel (camel-1) thread #3 - timer://triggerRemove, ignite_remove_route, KEY=testkey02, GET = 

今回のソースでは、エンドポイントURIでキャッシュへの操作を設定しましたが、IgniteConstants.IGNITE_CACHE_OPERATIONメッセージヘッダーを設定することでメッセージごとに変更することができます。
そのため、以下のソースは同じ結果になります。

// (1)エンドポイントURIでキャッシュへの操作を設定した場合

							.to("ignite-cache:test01?operation=PUT")
							.to("ignite-cache:test01?operation=GET")

// (2)IgniteConstants.IGNITE_CACHE_OPERATIONメッセージヘッダーに設定した場合
							.setHeader(IgniteConstants.IGNITE_CACHE_OPERATION, simple("PUT"))
							.to("ignite-cache:test01")
							.setHeader(IgniteConstants.IGNITE_CACHE_OPERATION, simple("GET"))
							.to("ignite-cache:test01")

Igniteサーバとアプリケーションの関係

今回、ignite1, ignite2という2台のIgniteのキャッシュサーバへ接続しました。
Camelアプリケーションはクライアントのように見えますが、実はキャッシュサーバとして実行されています。
つまり、2台のキャッシュサーバのクラスタにジョインし、自らもキャッシュサーバとして機能しているということです。これはアプリケーションとキャッシュサーバを同一サーバ上で動かす場合に効率的な構成になります。

逆にキャッシュサーバの規模が大きくなると、アプリケーションサーバとキャッシュサーバは別サーバとして構成したい場合があります。その場合は、アプリケーションをクライアントモードで起動することになります。

次はアプリケーションをクライアントモードで起動するようにアプリケーションを作成してみます。

アプリケーションをIgniteサーバのクライアントとして作成する

アプリケーションをクライアントモードで起動するには、以下の1行を追加するだけです。これで今回の構成ではキャッシュサーバ2台のクライアントとしてアプリケーションが起動することになります。

			Ignition.setClientMode(true);

これだけでは面白くないので、設定をSpring XMLに変更したいと思います。
Igniteでは設定ファイルを以下のように記載することができます。
ファイルは「default-config-cluster2server.xml」というファイル名で作成しています。
これは先ほど作成したプログラムの情報がXMLに書かれているだけのものになります。

default-config-cluster2server.xml
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!--
        Alter configuration below as needed.
    -->
    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                <value>ignite1:47500..47509</value>
                                <value>ignite2:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

作成したXMLファイルを読み込むには以下のように指定します。
"default-config-cluster2server.xml"は、ファイルパスを指定しています。
"grid.cfg"はXML中で指定した、BEANのIDになります。

			IgniteConfiguration config = Ignition.loadSpringBean("default-config-cluster2server.xml", "grid.cfg");

クライアントモードを設定し、コンフィグにXMLを用いた場合のソース全体は以下のようになります。route定義は前述のソースと同じです。

	public static void main(String[] args) {
		try {
			CamelContext context = new DefaultCamelContext();

			IgniteConfiguration config = Ignition.loadSpringBean("default-config-cluster2server.xml", "grid.cfg");
			IgniteCacheComponent ignite = IgniteCacheComponent.fromConfiguration(config);
			Ignition.setClientMode(true);

			context.addComponent("ignite-cache", ignite);

			context.addRoutes(new RouteBuilder() {
				public void configure() {
					from("timer:triggerPut?period=1000") // 1000ミリ秒毎に実行
							.routeId("ignite_put_route")
							.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
							.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
							.to("ignite-cache:test01?operation=PUT") // キャッシュtest01へPUT
							.log("KEY=testkey01, PUT = ${body}"); // 取得したキャッシュの内容をログに表示

					from("timer:triggerGet?period=1000") // 1000ミリ秒毎に実行
							.routeId("ignite_get_route")
							.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
							.to("ignite-cache:test01?operation=GET") // キャッシュtest01からGET
							.log("KEY=testkey01, GET = ${body}"); // 取得したキャッシュの内容をログに表示

					from("timer:triggerRemove?repeatCount=1") // 1回だけ実行
							.routeId("ignite_remove_route")
							.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey02")) // キャッシュのキーをtestkey02に指定
							.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
							.to("ignite-cache:test01?operation=PUT") // キャッシュtest02へPUT
							.to("ignite-cache:test01?operation=GET")
							.log("KEY=testkey02, GET = ${body}") // 取得したキャッシュの内容をログに表示
							.to("ignite-cache:test01?operation=REMOVE") // キャッシュtest02からREMOVE
							.to("ignite-cache:test01?operation=GET")
							.log("KEY=testkey02, GET = ${body}"); // 取得したキャッシュの内容をログに表示(REMOVE後なので空になっている)
				}
			});

			context.start();
			Thread.sleep(10000);
			context.stop();
		} catch (Exception e) {
			e.printStackTrace();
		}

Apachg InigteのコンソールであるIgniteVisorからtopコマンドでクラスタの状態を確認します。

visor> top
Hosts: 3
+=================================================================================================================================+
|   Int./Ext. IPs    |   Node ID8(@)    | Node Type |                  OS                   | CPUs |       MACs        | CPU Load |
+=================================================================================================================================+
| 0:0:0:0:0:0:0:1%lo | 1: 0D2C0B3D(@n0) | Server    | Linux amd64 3.10.0-693.2.2.el7.x86_64 | 1    | 08:00:27:76:F3:CF | 1.00 %   |
| 127.0.0.1          |                  |           |                                       |      |                   |          |
| 192.168.20.72      |                  |           |                                       |      |                   |          |
+--------------------+------------------+-----------+---------------------------------------+------+-------------------+----------+
| 192.168.100.254    | 1: 8A498968(@n1) | Server    | Linux amd64 3.10.0-693.2.2.el7.x86_64 | 1    | 02:42:2F:F1:6A:FD | 0.67 %   |
| 192.168.20.71      |                  |           |                                       |      | 08:00:27:E3:4A:44 |          |
| 127.0.0.1          |                  |           |                                       |      |                   |          |
+--------------------+------------------+-----------+---------------------------------------+------+-------------------+----------+
| 0:0:0:0:0:0:0:1    | 1: F751C80C(@n2) | Client    | Windows 10 amd64 10.0                 | 4    | 00:1B:DC:F5:B4:36 | 0.00 %   |
| 192.168.20.197     |                  |           |                                       |      | 0A:00:27:00:00:0F |          |
| 127.0.0.1          |                  |           |                                       |      | 0A:00:27:00:00:0B |          |
+---------------------------------------------------------------------------------------------------------------------------------+

Summary:
+--------------------------------------+
| Active         | true                |
| Total hosts    | 3                   |
| Total nodes    | 3                   |
| Total CPUs     | 6                   |
| Avg. CPU load  | 0.56 %              |
| Avg. free heap | 80.00 %             |
| Avg. Up time   | 09:44:38            |
| Snapshot time  | 2019-02-02 02:51:35 |
+--------------------------------------+

すると、上のように"Node Type"が"Client"であるnodeが確認できます。これは今回作成したアプリケーションであり、クライアントモードで動作しているということになります。
クライアントモードで動作している場合は、クラスタには参加しますがキャッシュはローカルには持ちません。

camel-igniteコンポーネントの主なプロパティ

最後にcamel-igniteコンポーネントの主なプロパティについて説明します。
プロパティは以下の表以外にもありますので、詳細は公式ページを参照してください。

プロパティ名 producer/consumer 説明 デフォルト値
operation producer キャッシュに実行する操作を指定する。以下の値を指定する。GET, PUT, REMOVE, SIZE, REBALANCE, QUERY, CLEAR. IgniteCacheOperation

参考

5
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?