前回やったこと
前回はSpringDataGeodeを使って、インメモリデータグリッドを使ったSpringBootアプリケーションを作成し、SpringDataを使用して永続化したデータが二つのアプリケーションで共有されていることを確認しました。
SpringDataGeodeを使用したアプリケーション作成
今回やること
インメモリデータグリッドを使ったイベント処理
今回はインメモリデータグリッドの仕組みを使ったイベント処理を実装しようと
思います。
- まずは
Region
の変更を検知するリスナーを実装
package spring.geode.geodeCommon.listener;
import java.time.LocalDateTime;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.RegionEvent;
import spring.geode.geodeCommon.model.User;
import spring.geode.geodeCommon.region.UserRegion;
/**
* {@link UserRegion}の変更を検知するリスナー
*
*/
public class UserRegionListener implements CacheListener<Integer,User> {
public void afterCreate(EntryEvent<Integer,User> event) {
System.out.println(LocalDateTime.now());
System.out.println("afterCreate!!!!!!!!!" + event.getNewValue());
}
public void afterDestroy(EntryEvent<Integer, User> event) {
System.out.println("afterDestroy!!!!!!!!!" + event);
}
public void afterInvalidate(EntryEvent<Integer, User> event) {
System.out.println("afterInvalidate!!!!!!!!!" + event);
}
public void afterRegionDestroy(RegionEvent<Integer, User> event) {
System.out.println("afterRegionDestroy!!!!!!!!!" + event);
}
public void afterRegionCreate(RegionEvent<Integer, User> event) {
System.out.println("afterRegionCreate!!!!!!!!!" + event);
}
public void afterRegionInvalidate(RegionEvent<Integer, User> event) {
System.out.println("afterRegionInvalidate!!!!!!!!!" + event);
}
public void afterUpdate(EntryEvent<Integer, User> event) {
System.out.println("afterUpdate!!!!!!!!!" + event);
}
public void afterRegionClear(RegionEvent<Integer, User> event) {
System.out.println("afterRegionClear!!!!!!!!!" + event);
}
public void afterRegionLive(RegionEvent<Integer, User> event) {
System.out.println("afterRegionLive!!!!!!!!!" + event);
}
public void close() {
System.out.println("close!!!!!!!!!");
}
}
Region
に対するリスナーを作成するには、CacheListener<K,V>
を継承したクラスを作成します。
Key
,Value
にはリスナー登録したいRegion
のKey
,Value
を設定します。
今回検証したのは以下の二つ。
・ Region
作成イベント処理: afterRegionCreate
メソッドの処理が実行される
・User新規登録イベント処理: afterCreate
メソッドの処理が実行される
- 次に、リスナーを
Region
に登録する実装
package spring.geode.geodeCommon.region;
import org.apache.geode.cache.GemFireCache;
import org.apache.geode.cache.Region;
import org.springframework.data.gemfire.ReplicatedRegionFactoryBean;
import spring.geode.geodeCommon.listener.UserRegionListener;
import spring.geode.geodeCommon.model.User;
/**
* ユーザを管理する{@link Region}の設定を作成する
*
*/
public class UserRegion {
public Region<Integer, User> createUserRegion(final GemFireCache cache) {
return cache.<Integer, User>getRegion("Users");
}
public ReplicatedRegionFactoryBean<Integer, User> createUserRegionFactory(GemFireCache cache) {
ReplicatedRegionFactoryBean<Integer, User> replicatedRegionFactory = new ReplicatedRegionFactoryBean<>();
UserRegionListener[] listeners = {new UserRegionListener()};
listeners[0] = new UserRegionListener();
replicatedRegionFactory.setCacheListeners(listeners);
replicatedRegionFactory.setClose(false);
replicatedRegionFactory.setCache(cache);
replicatedRegionFactory.setRegionName("Users");
replicatedRegionFactory.setPersistent(false);
return replicatedRegionFactory;
}
}
createUserRegion
メソッドでUserを管理するRegion
を作成して、ApplicationContext
内にBean登録する。
createUserRegionFactory
メソッドでRegion
への設定を行います。
上で作ったリスナーをRegion
に登録する処理もここで行います。
- 上で実装した
Region
や設定をアプリに設定する実装
package spring.geode.client.geodeClient;
import org.apache.geode.cache.GemFireCache;
import org.apache.geode.cache.Region;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.gemfire.ReplicatedRegionFactoryBean;
import org.springframework.data.gemfire.config.annotation.PeerCacheApplication;
import org.springframework.data.gemfire.repository.config.EnableGemfireRepositories;
import spring.geode.client.geodeClient.repository.UserRepository;
import spring.geode.geodeCommon.model.User;
import spring.geode.geodeCommon.region.UserRegion;
@SpringBootApplication
@PeerCacheApplication(name = "SpringGeodeClientApplication",locators = "localhost[40404]")
@EnableGemfireRepositories(basePackageClasses = UserRepository.class)
public class GeodeClientApplication {
public static void main(String[] args) {
SpringApplication.run(GeodeClientApplication.class, args);
}
@Configuration
static class CacheInitializer {
@Bean
Region<Integer, User> userRegion(final GemFireCache cache) {
return new UserRegion().createUserRegion(cache);
}
@Bean
public ReplicatedRegionFactoryBean<Integer, User> replicatedRegion(GemFireCache cache) {
return new UserRegion().createUserRegionFactory(cache);
}
}
}
package spring.geode.server.geodeServer;
import org.apache.geode.cache.GemFireCache;
import org.apache.geode.cache.Region;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.gemfire.ReplicatedRegionFactoryBean;
import org.springframework.data.gemfire.config.annotation.EnableEntityDefinedRegions;
import org.springframework.data.gemfire.config.annotation.EnableLocator;
import org.springframework.data.gemfire.config.annotation.EnableManager;
import org.springframework.data.gemfire.config.annotation.PeerCacheApplication;
import org.springframework.data.gemfire.repository.config.EnableGemfireRepositories;
import spring.geode.geodeCommon.model.User;
import spring.geode.geodeCommon.region.UserRegion;
import spring.geode.server.geodeServer.repository.UserRepository;
@SpringBootApplication
@PeerCacheApplication(name = "SpringGeodeServerApplication", locators = "localhost[40404]")
@EnableGemfireRepositories(basePackageClasses = UserRepository.class)
public class GeodeServerApplication {
public static void main(String[] args) {
SpringApplication.run(GeodeServerApplication.class, args);
}
@Configuration
@EnableLocator(port = 40404)
@EnableManager(start = true)
static class LocatorManagerConfiguration {
}
@Configuration
static class CacheInitializer {
@Bean
Region<Integer, User> userRegion(final GemFireCache cache) {
return new UserRegion().createUserRegion(cache);
}
@Bean
public ReplicatedRegionFactoryBean<Integer, User> replicatedRegion(GemFireCache cache) {
return new UserRegion().createUserRegionFactory(cache);
}
}
}
CacheInitializer
クラス内の処理でRegion
やRegion
への設定を反映させています。
これをキャッシュ組み込みサーバとして起動するアプリで実装します。
クラス名にClient
やServer
と名付けていますが、今回はP2P形式で起動するので、@PeerCacheApplication
アノテーションをクラスに付与しています。
クラス名は気にせず。。
以上でイベント処理の実装は完了です。
locator
を起動する処理をGeodeServerApplication.java
に実装しているので、GeodeServerApplication.java
から起動して、動作確認。
[info 2019/02/03 02:56:32.334 JST <main> tid=0x1] Initializing region Users
[info 2019/02/03 02:56:32.334 JST <main> tid=0x1] Initialization of region Users completed
afterRegionCreate!!!!!!!!!RegionEventImpl[region=org.apache.geode.internal.cache.DistributedRegion[path='/Users';scope=DISTRIBUTED_NO_ACK';dataPolicy=REPLICATE; concurrencyChecksEnabled];op=REGION_CREATE;isReinitializing=false;callbackArg=null;originRemote=false;originMember=192.168.11.3(SpringGeodeServerApplication:5899)<ec><v0>:1024;tag=null]
[info 2019/02/03 02:56:32.705 JST <main> tid=0x1] Initializing ExecutorService 'applicationTaskExecutor'
org.apache.coyote.AbstractProtocol start
情報: Starting ProtocolHandler ["http-nio-9090"]
[info 2019/02/03 02:56:32.877 JST <main> tid=0x1] Tomcat started on port(s): 9090 (http) with context path ''
[info 2019/02/03 02:56:32.880 JST <main> tid=0x1] Started GeodeServerApplication in 3.66 seconds (JVM running for 5.179)
[info 2019/02/03 02:56:49.107 JST <main> tid=0x1] Initializing region Users
[info 2019/02/03 02:56:49.113 JST <main> tid=0x1] Region Users requesting initial image from 192.168.11.3(SpringGeodeServerApplication:5899)<ec><v0>:1024
[info 2019/02/03 02:56:49.116 JST <main> tid=0x1] Users is done getting image from 192.168.11.3(SpringGeodeServerApplication:5899)<ec><v0>:1024. isDeltaGII is false
[info 2019/02/03 02:56:49.116 JST <main> tid=0x1] Initialization of region Users completed
afterRegionCreate!!!!!!!!!RegionEventImpl[region=org.apache.geode.internal.cache.DistributedRegion[path='/Users';scope=DISTRIBUTED_NO_ACK';dataPolicy=REPLICATE; concurrencyChecksEnabled];op=REGION_CREATE;isReinitializing=false;callbackArg=null;originRemote=false;originMember=192.168.11.3(SpringGeodeClientApplication:5901)<v1>:1025;tag=null]
[info 2019/02/03 02:56:49.510 JST <main> tid=0x1] Initializing ExecutorService 'applicationTaskExecutor'
org.apache.coyote.AbstractProtocol start
情報: Starting ProtocolHandler ["http-nio-9000"]
[info 2019/02/03 02:56:49.919 JST <main> tid=0x1] Tomcat started on port(s): 9000 (http) with context path ''
[info 2019/02/03 02:56:49.922 JST <main> tid=0x1] Started GeodeClientApplication in 4.245 seconds (JVM running for 5.513)
二つのアプリの起動ログで上記のログが確認できたらリージョン作成イベント処理はOKです。
Region Users requesting initial image from 192.168.11.3(SpringGeodeServerApplication:5899)<ec><v0>:1024
ちなみに上のログで、GeodeClientApplication
のUsers
というRegion
はGeodeServerApplication
で作成したRegion
のイメージを使ってRegion
を初期化しているみたい。
次は新規ユーザデータがGeodeClientApplcaition
側のRegion
に登録され、GeodeServerApplcaition
にデータが同期されたことを契機にリスナーの処理が実行されるかの確認。
curl -X POST -H "Content-Type: application/json" -d '{"name":"Michel", "age":"100"}' localhost:9000/register/user
リクエスト送信後のGeodeServerApplcaition
でのログ
[info 2019/02/03 04:48:30.587 JST <pool-3-thread-1> tid=0x5b] Initialization of region _monitoringRegion_192.168.11.3<v1>1025 completed
[info 2019/02/03 04:48:30.593 JST <pool-3-thread-1> tid=0x5b] Initializing region _notificationRegion_192.168.11.3<v1>1025
[info 2019/02/03 04:48:30.595 JST <pool-3-thread-1> tid=0x5b] Initialization of region _notificationRegion_192.168.11.3<v1>1025 completed
2019-02-03T04:48:46.176823
afterCreate!!!!!!!!!User(id=-1816523715, name=Michel, age=100)
期待値通り、GeodeServerApplication
でイベント処理メソッドが発火しているようです。
おまけ
「GeodeClientApplication
でデータを永続化した時点」と「GeodeServerApplication
でイベント処理が実行された時点」との差をとって、あるキャッシュサーバでデータが永続化されてから、別のキャッシュサーバでデータ同期イベントが実行されるまでのレイテンシを測ってみました。
上の動作確認で行ったことをfor文で10回繰り返し、レイテンシの平均値をとってみたところ結果は0.002ms
と非常に高速でした。
試行回数が少ないのであまり参考にはならないかもしれませんが、個人的には満足。
次は永続化について実装してみようと思います。