SpringBoot + PostgreSql + mybatis + NarayanaJTA で分散トランザクションを実施してみた
非常にめんどくさい複数データベースへの対応
以前DynamicAbstractRoutingDataSourceを用いて複数データベースへの対応を試みたが、
後から確認したところロールバックがうまく機能していなかったため別の方法で複数データソースへの対応を検討してみた。
まず動くものが欲しいという人はページ下部にソースのリンクがありますのでそちらから参照して下さい。
Atomikos Transaction Managerを使用しない理由
SpringBootの入門本などではAtomikosを使用した分散トランザクションを紹介している上、ネットでの情報もAtomicosを使用したサンプルが非常に多かった。
しかし、各データソースのユニークキーとなるXIDの扱いが接続先DBが可変の場合にコネクション作成の動きかが怪しかったため(うろ覚え)今回はNarayanaJTAを使用した複数データベースへの対応を行う。
環境
- Windows10 Professional
- Java 9.0.4
- PostgreSQL10.3-1
- 使用ライブラリ
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-narayana</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>com.integralblue</groupId>
<artifactId>log4jdbc-spring-boot-starter</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
</dependencies>
実現目標
- 複数データソースに対して同時に接続しデータの読み取りが出来ること
- TwoPahseCommitによる分散トランザクションが実現できること
- データソースの接続数が可変となる場合に対応すること(リクエストで受けた支店リストの支店コード毎にデータベースを持ってる場合とか)
ひとまずNarayanaを使用してPostgreSqlに接続できるようにする
SpringBootとNarayanaの連携自体はSpringが提供しているNarayanaDataSourceBeanをDataSourceのBeanとして登録することで実現できる。
例えばこんな感じでBeanに登録してやるだけでNarayanaを使用した分散トランザクション向けの接続が行える。
package com.example.datasource;
import com.example.common.DataSourceUtil;
import com.example.config.MyDataBaseProperties;
import com.example.constance.DataBaseConst;
import javax.sql.DataSource;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.mybatis.spring.boot.autoconfigure.MybatisProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
@EnableConfigurationProperties(MyDataBaseProperties.class)
@MapperScan(sqlSessionFactoryRef = DataBaseConst.DataSourceOneConst.NAME,
basePackages = DataBaseConst.DataSourceOneConst.MAPPER)
public class DatasourceConfigOne {
@Autowired
MyDataBaseProperties properties;
@Autowired
MybatisProperties mybatisProperty;
/**
* Get dataSource one.
*
* @return
*/
@Primary
@Bean
public DataSource getDataSourceOne() {
return DataSourceUtil.getDataSource(
this.properties.getProperty(DataBaseConst.DataSourceOneConst.DATA_SOURCE).getDetail());
}
}
MyDataBasePropertiesはyamlファイルに記載したデータソースの情報一覧でそこからプロパティ名を指定してデータソース情報を取得、
DataSourceUtil#getDataSourceでは取得したプロパティを元にNarayanaDataSourceBeanを作成している。
また今回はORMとしてMybatisを使用するため、Mybatis向けのSqlSessionFactoryBeanの生成メソッドもこのクラスに追加する。
/**
* Get SqlSessionFactory one.
*
* @return
*/
@Primary
@Bean(name = DataBaseConst.DataSourceOneConst.NAME)
public SqlSessionFactoryBean getSqlSessionFactoryOne() {
SqlSessionFactoryBean factory = new SqlSessionFactoryBean();
factory.setConfiguration(this.mybatisProperty.getConfiguration());
factory.setDataSource(getDataSourceOne());
return factory;
}
やってることはMybatisのConfigを読み出しデータソース情報と合わせてセッションを作成してやるだけ。
これでクラスで宣言しているMapperScanのbasePackagesに対応したDAOがDIされた時にこのデータソース設定が活用される。
@MapperScan(sqlSessionFactoryRef = DataBaseConst.DataSourceOneConst.NAME,
basePackages = DataBaseConst.DataSourceOneConst.MAPPER)
接続先のデータベースが決まっているのであればこれを元に接続したデータベースの数だけこのクラスを増やしてやれば宣言的トランザクションでも問題無く分散トランザクションが行える。
NarayanaにあるConnectionManagerの全機能を有効にする
Springで提供されているNarayanaDataSourceBeanを使用すれば確かに分散トランザクションは可能となる。
しかし下記の抜粋ソースを見ればわかる通り使用できる機能はごく一部だけである。
@Override
public Connection getConnection() throws SQLException {
Properties properties = new Properties();
properties.put(TransactionalDriver.XADataSource, this.xaDataSource);
return ConnectionManager.create(null, properties);
}
/*
* Connections are pooled for the duration of a transaction.
*/
public static synchronized Connection create (String dbUrl, Properties info) throws SQLException
{
String user = info.getProperty(TransactionalDriver.userName, "");
String passwd = info.getProperty(TransactionalDriver.password, "");
String dynamic = info.getProperty(TransactionalDriver.dynamicClass, "");
String poolConnections = info.getProperty(TransactionalDriver.poolConnections, "true");
Object xaDataSource = info.get(TransactionalDriver.XADataSource);
int maxConnections = Integer.valueOf(info.getProperty(TransactionalDriver.maxConnections, "10"));
~~~省略~~~
}
コネクション作成時にプロパティとしてXADataSourceしか渡していないため、コネクションプールの使用設定も、最大接続数も設定できない。
そのほかのプロパティはデータソースが同一であることの確認でしか使われないためあまり影響は無いが10以上のデータソースを使用する場合はこのままではいけない。
しかもdynamicClass、poolConnections、maxConnectionsはPostgreSqlのPGXADataSourceには存在しない設定項目だ。
そのためNarayanaDataSourceBeanを拡張したクラスとPGXADataSourceを拡張したクラスを作成する。
PGXADataSourceの拡張については外部からプロパティ値を渡す実装にすれば必要ないかもしれないが、それでも作成しておく。
package com.example.common;
import java.util.Objects;
import org.postgresql.xa.PGXADataSource;
/**
* Extends PGXADataSource for TransactionalDriver.
*
* @author suimyakunosoko
*
*/
public class MyXaDataSource extends PGXADataSource {
/** enable pool connection. */
private boolean poolConnections = true;
/** max pool connection counts. */
private int maxConnections = 10;
public String getDynamicClass() {
return this.getClass().getName();
}
public boolean getPoolConnections() {
return this.poolConnections;
}
public void setPoolConnections(boolean poolConnections) {
this.poolConnections = poolConnections;
}
public int getMaxConnections() {
return this.maxConnections;
}
public void setMaxConnections(int maxConnections) {
this.maxConnections = maxConnections;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof MyXaDataSource)) {
return false;
}
MyXaDataSource casted = (MyXaDataSource) obj;
return Objects.equals(casted.getURL(), this.getURL())
&& Objects.equals(casted.getUser(), this.getUser())
&& Objects.equals(casted.getPassword(), this.getPassword());
}
}
PGXADataSourceを拡張したMyXaDataSourceでは不足していたプロパティpoolConnectionsとmaxConnectionsを追加する。
ConnectionManagerに初期値も合わせておく。
getDynamicClassはデータソースチェックにしか使用しないため適当にクラス名あたりを渡してやる。
equalsのオーバーライドはConnectionManagerがデータソースの存在チェックを行うときにXADataSourceのequalメソッドを使用してのチェックも行っているため。
デフォルトのままではハッシュ値同士の比較になるためオーバーライドしてやる。
PGXADataSourceのURLにユーザ名やらパスワードも含まれているため本来は必要ないがここは気分で追加しておく。
package com.example.common;
import com.arjuna.ats.internal.jdbc.ConnectionManager;
import com.arjuna.ats.jdbc.TransactionalDriver;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;
import org.springframework.boot.jta.narayana.NarayanaDataSourceBean;
/**
* Extends NarayanaDataSourceBean for ConnectionManager and enable TransactionalDriverProperties.
*
* @author suimyakunosoko
*
*/
public class MyNarayanaDataSourceBean extends NarayanaDataSourceBean {
private final Properties properties;
/**
* Wrap NarayanaDataSourceBean for ConnectionManager.
*
* @param myXaDataSource MyXaDataSource
*/
public MyNarayanaDataSourceBean(MyXaDataSource myXaDataSource) {
super(myXaDataSource);
this.properties = new Properties();
this.properties.put(TransactionalDriver.userName, myXaDataSource.getUser());
this.properties.put(TransactionalDriver.password, myXaDataSource.getPassword());
this.properties.put(TransactionalDriver.dynamicClass, myXaDataSource.getDynamicClass());
this.properties.put(TransactionalDriver.poolConnections,
String.valueOf(myXaDataSource.getPoolConnections()));
this.properties.put(TransactionalDriver.XADataSource, myXaDataSource);
this.properties.put(TransactionalDriver.maxConnections,
String.valueOf(myXaDataSource.getMaxConnections()));
}
@Override
public Connection getConnection() throws SQLException {
return ConnectionManager.create(null, this.properties);
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return ConnectionManager.create(null, this.properties);
}
}
NarayanaDataSourceBeanを拡張したMyNarayanaDataSourceBeanでは設定されていなかった項目の設定を行うように修正する。
引数にMyXaDataSourceを指定しているが、PostgreSqlサーバとMySqlサーバ、Oracleサーバの三つともに対応するなんて場合は必要な設定項目を網羅したインターフェースを用意することになる気がする。
今回はPostgreSQLにさえ接続できれば良いので気にしない。
TwoPahseCommitが有効になるようjbossts-properties.xmlをリソースに配置
このままでも動くしちゃんとロールバックもされるが、Narayanaの初期設定はOnePhaseCommitである上になんか変なWARNログが流れる。
WARN 4812 --- [ main] com.arjuna.ats.common : ARJUNA048002: Could not find configuration file, URL was: null
jbossts-properties.xmlファイルを読み込みに行こうとして見つからない場合のメッセージがこれに当たる。
TwoPahseCommitを有効にするついでにこのファイルをリソース配下に格納してやる。
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<!-- (default is YES) -->
<!-- (twoPhaseCommit is NO) -->
<entry key="CoordinatorEnvironmentBean.commitOnePhase">NO</entry>
<!-- (Must be unique across all Arjuna instances.) -->
<!-- (default is 1) -->
<entry key="CoreEnvironmentBean.nodeIdentifier">1</entry>
</properties>
これでWARNログの抑制とTwoPhaseCommitが可能になる。
ここまでで静的な分散トランザクションは完了、でも動的な場合は?
ここまでの実装で静的データソースに対する分散トランザクション(自システムのメインDBと連携先システムのDBといった場合など)は実現出来る。
しかしここで狂気の設計が行われたとする。しかもその設計に逆らえない。
その設計の内容は**「そうだ!支店がたくさんあるから支店毎にサーバ構築してデータベースを持たせよう!」**
実際にはありえない・・・ありえないと思いたかったがこれに似た事象に遭遇してしまったので対応するしかない。
ロジックの中で接続先が決まる以上DIはあきらめる
各コントローラ、各サービスが実行されるタイミングでは使用するデータソースが決まっていないためSpringのDIは諦める。
そのためフレームワークが吸収していた以下の機能を実現してやる必要がある。
- 使用するデータソースが確定したタイミングでのSqlSessionの作成
- 接続情報が注入されたDaoインスタンスの生成
- トランザクション完了後のセッション回収
1と2についてはMybatisの公式サイトを参考に実装。
3はAOPの機能を利用して実現する。
動的にSqlSessionを作成してDaoインスタンスを作成する
基本的にMybatis公式のドキュメントを読んで素直に実装してやれば気を付けなくてはいけない点は少ない。
作成したSession情報をリクエスト毎(Thread毎)に管理してやること、同じセッションを何度も作らないこと、最低でもこの二つを気を付けていればちゃんと動くはず。
今回はSqlSession作成用にSqlSessionUtilを、SqlSession管理用にSqlSessionManagerをそれぞれ作成する。どちらもDIして使用する。
package com.example.common;
import com.example.config.MyDataBaseProperties;
import com.example.constance.DataBaseConst;
import java.util.Objects;
import javax.sql.DataSource;
import javax.sql.XADataSource;
import org.apache.ibatis.mapping.Environment;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
import org.apache.ibatis.transaction.TransactionFactory;
import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
import org.mybatis.spring.boot.autoconfigure.MybatisProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SqlSessionUtil {
@Autowired
MyDataBaseProperties properties;
@Autowired
MybatisProperties mybatisProperty;
@Autowired
SqlSessionManager manager;
/**
* create SqlSessionFactory by DataBase name.
*
* @param name DataBase name
* @return
*/
public SqlSessionFactory getSqlSessionFactory(String name) {
return getSqlSessionFactory(name, null);
}
/**
* <p>
* create SqlSessionFactory by DataBase name.
* </p>
* <p>
* when no DataBase name on yml, create SqlSessionFactory by defBase.
* </p>
*
* @param name DataBase name
* @param defBase use when yml dose not contain DataBaseName
* @return
*/
public SqlSessionFactory getSqlSessionFactory(String name, String defBase) {
XADataSource dataSourceprop = this.properties.getProperty(name, defBase).getDetail();
DataSource dataSource = DataSourceUtil.getDataSource(dataSourceprop);
TransactionFactory transactionFactory = new JdbcTransactionFactory();
Environment environment = new Environment("development", transactionFactory, dataSource);
Configuration configuration = DataSourceUtil.fillNullByDefault(new Configuration(environment),
this.mybatisProperty.getConfiguration());
configuration.addMappers(DataBaseConst.DataSourceDefault.MAPPER);
return new SqlSessionFactoryBuilder().build(configuration);
}
/**
* Get SqlSession by name.
*
* @param name DataBase name
* @return
*/
public SqlSession getSqlSession(String name) {
return getSqlSession(name, null);
}
/**
* <p>
* create SqlSession by DataBase name.
* </p>
* <p>
* when no DataBase name on yml, create SqlSession by defBase.
* </p>
*
* @param name DataBase name
* @param defBase use when yml dose not contain DataBaseName
* @return
*/
public SqlSession getSqlSession(String name, String defBase) {
SqlSession session = this.manager.get(name);
if (!Objects.isNull(session)) {
return session;
}
session = getSqlSessionFactory(name, defBase).openSession();
this.manager.put(name, session);
return session;
}
}
SqlSessionに設定ファイルからデータソースを作成する機能に加えて、接続情報が無い場合にテンプレートから接続情報を作成する機能も追加している。
Singletoneに見えるSqlSessionManagerは内部でThreadLocalな変数を持っているため他のスレッドからの影響は受けない。
バッチ更新の場合などを考慮してSqlSessionManagerのインスタンスを渡すメソッドがあっても良いかもしれない。
package com.example.common;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.ibatis.session.SqlSession;
import org.springframework.stereotype.Component;
@Component
public class SqlSessionManager {
ThreadLocal<Map<String, SqlSession>> sessionMap = new ThreadLocal<>();
/**
* put SqlSession.
*
* @param key key
* @param session SqlSession
*/
public void put(String key, SqlSession session) {
init();
if (Objects.isNull(this.sessionMap.get())) {
this.sessionMap.set(new HashMap<String, SqlSession>());
}
this.sessionMap.get().put(key, session);
}
/**
* get SqlSession by key.
*
* @param key key
* @return
*/
public SqlSession get(String key) {
init();
return this.sessionMap.get().get(key);
}
/**
* close all session.
*/
public void close() {
init();
this.sessionMap.get().forEach((key, session) -> session.close());
this.sessionMap.set(null);
}
private void init() {
if (Objects.isNull(this.sessionMap.get())) {
this.sessionMap.set(new HashMap<String, SqlSession>());
}
}
}
SqlSessionManagerdでは作成したSqlSessionのMapを保持するようにしている。
インスタンスの作成タイミング != スレッドの開始タイミングであるため、NPE対策として各メソッドの頭にNULLチェックを行うようにしている。
これはAOPでリクエスト受付時にLocalThreadのインスタンスを作成してあげてもよかったかもしれない。
動的に作成したSqlSessionをTransaction完了後にCloseする
SqlSessionの作成をしたのは良いが、Transaction開始後に作成したSqlSessionをTransaction終了前にcloseしてはちゃんと動かない。
そのためAOPで宣言的Transactionが完了した後にSessionをCloseするようにしてやる。
package com.example.aop;
import com.example.common.SqlSessionManager;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Aspect
@Component
@Order(Ordered.LOWEST_PRECEDENCE - 1)
@Transactional
public class SessionCloseAspect {
@Autowired
SqlSessionManager manager;
@After("@within(org.springframework.transaction.annotation.Transactional)")
public void closeSqlSession() {
this.manager.close();
}
}
Orderの指定は数字が大きいものから実行されていく。
個人的に優先順位が高い = 先に実行される という感覚だったので少し奇妙に感じるがこの順で実行されるならそういう事だろう。
動的に作成したSqlSessionでの操作後に宣言トランザクションを抜けても例外が発生しなければOK。
ひとまず正常動作を確認したが(気になる課題)
No modifier information found for db. Connection will be closed immediately
なんてログが常に出てくるのが気になる。
コネクションを終了する際に他の接続と同期をとりながら切断するかどうかのオプションが無効になっているようだ。
あとjbossts-properties.xmlをなるべくyamlファイルに合流させたいけどWARNログが出るのでダメそう。
参考
Narayana Spring Boot example
Narayana公式のサンプル。英語でも情報が少ないのでこれをベースに試行錯誤させて頂きました。
jbossts-properties.xml
MavenからNarayanaを取得した場合に設定用のXMLファイルが無かったのでこちらから拝借
ソース
説明が分かりにくいからソースを見たい、御託は良いからとっとと使えるものをという人はこちらからどうぞ。
https://github.com/suimyakunosoko/narayana-spring-boot-mybatis-postgresql