はじめに
MongoDB4.0以降はトランザクションサポートされました。
トランザクションが必要な処理できるようになりましたので、ユースケースは増えます。
出典:https://docs.mongodb.com/manual/core/transactions-in-applications/
単一ノードはサポートされないですね。実行するとエラーになります。
Exception in thread "main" com.mongodb.MongoClientException: Sessions are not supported by the MongoDB cluster to which this client is connected
at com.mongodb.client.internal.MongoClientImpl.startSession(MongoClientImpl.java:127)
at com.mongodb.client.internal.MongoClientImpl.startSession(MongoClientImpl.java:113)
at mongodb.MongoDBTest.main(MongoDBTest.java:24)
Replica Set環境の準備
Replica Setの環境がない場合は、ローカルでは先に用意します。
出典:https://docs.mongodb.com/manual/replication/
各Nodeのフォルダーを作成
Node1: /data/mongo-replicaset/node1
Node2: /data/mongo-replicaset/node2
Node3: /data/mongo-replicaset/node3
各Nodeを起動
mongod --replSet my-set --dbpath /data/mongo-replicaset/node1 --logpath /data/mongo-replicaset/node1/node1.log --port 27001
mongod --replSet my-set --dbpath /data/mongo-replicaset/node2 --logpath /data/mongo-replicaset/node2/node2.log --port 27002
mongod --replSet my-set --dbpath /data/mongo-replicaset/node3 --logpath /data/mongo-replicaset/node3/node3.log --port 27003
Relica Setの初期化
Node1に接続し、初期化を行います。
rs.initiate(
{
"_id" : "my-set",
"members" : [
{
"_id" : 0,
"host" : "localhost:27001"
},
{
"_id" : 1,
"host" : "localhost:27002"
},
{
"_id" : 2,
"host" : "localhost:27003"
}
]
});
rs.status()でステータス確認
OK。これで準備完了です。
JAVAでトランザクション処理を試す
違うDBに複数レコードを追加
package mongodb;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.TransactionOptions;
import com.mongodb.WriteConcern;
import com.mongodb.client.ClientSession;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.TransactionBody;
import java.util.Date;
import org.bson.Document;
public class MongoDBTest {
public static void main(String[] args) {
// MongoDBクライントを生成する
MongoClient client = MongoClients.create("mongodb://localhost:27001,localhost:27002,localhost:27003");
// コレクションが存在しないと、トランザクションの処理は実行されないため、コードでコレクションを作成する。
//(一回だけ。手動でコレクションを追加してもよい)
client.getDatabase("front").getCollection("access_log").withWriteConcern(WriteConcern.MAJORITY)
.insertOne(new Document("dummy", 0));
client.getDatabase("server").getCollection("users").withWriteConcern(WriteConcern.MAJORITY)
.insertOne(new Document("dummy", 0));
// セッションを生成する
ClientSession session = client.startSession();
// Options定義
TransactionOptions txnOptions = TransactionOptions.builder().readPreference(ReadPreference.primary())
.readConcern(ReadConcern.LOCAL).writeConcern(WriteConcern.MAJORITY).build();
TransactionBody txnBody = new TransactionBody<String>() {
public String execute() {
MongoCollection<Document> frontAccessLog = client.getDatabase("front").getCollection("access_log");
MongoCollection<Document> serverUsers = client.getDatabase("server").getCollection("users");
// アクセスログ
Document accessLog = new Document();
accessLog.append("log", "xxxx");
accessLog.append("acccessDate", new Date());
frontAccessLog.insertOne(session, accessLog);
// ユーザーデータ
Document user = new Document();
user.append("lastName", "tanaka");
user.append("firstName", "tarou");
user.append("createDate", new Date());
serverUsers.insertOne(session, user);
return "OK";
}
};
try {
// 同じトランザクションで処理
session.withTransaction(txnBody, txnOptions);
} catch (RuntimeException e) {
// 異常処理
} finally {
session.close();
}
// クライントを閉じる
client.close();
}
}
同じDBに複数レコード処理
package mongodb;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.TransactionOptions;
import com.mongodb.WriteConcern;
import com.mongodb.client.ClientSession;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.TransactionBody;
import com.mongodb.client.model.Filters;
import java.util.Date;
import org.bson.Document;
public class MongoDBTest {
public static void main(String[] args) {
// MongoDBクライントを生成する
MongoClient client = MongoClients.create("mongodb://localhost:27001,localhost:27002,localhost:27003");
// セッションを生成する
ClientSession session = client.startSession();
// Options定義
TransactionOptions txnOptions = TransactionOptions.builder().readPreference(ReadPreference.primary())
.readConcern(ReadConcern.LOCAL).writeConcern(WriteConcern.MAJORITY).build();
TransactionBody txnBody = new TransactionBody<String>() {
public String execute() {
MongoCollection<Document> frontAccessLog = client.getDatabase("front").getCollection("access_log");
// ダミーデータを削除
frontAccessLog.deleteOne(Filters.eq("dummy", 0));
// アクセスログ
Document accessLog = new Document();
accessLog.append("log", "yyyyy");
accessLog.append("acccessDate", new Date());
frontAccessLog.insertOne(session, accessLog);
// 異常にする
throw new RuntimeException("処理失敗です。");
// return "OK";
}
};
try {
// 同じトランザクションで処理
session.withTransaction(txnBody, txnOptions);
} catch (RuntimeException e) {
// 異常処理
e.printStackTrace();
} finally {
session.close();
}
// クライントを閉じる
client.close();
}
}
わざわざ異常にスローしたので、DBデータの変化ないことを確認できます。
throw new RuntimeException("処理失敗です。");
を削除すると、予想通りデータ削除とインサートができました。
トランザクションURL:https://docs.mongodb.com/manual/core/transactions/
Replication: https://docs.mongodb.com/manual/replication/
RelicaSetの構築: https://docs.mongodb.com/manual/tutorial/deploy-replica-set/
ReplicaSetの認証設定:https://docs.mongodb.com/manual/tutorial/deploy-replica-set-with-keyfile-access-control/
以上