Edited at

ScalaでMongoDBへMulti-document transaction接続する


背景

ScalaでNoSQLを触ってみたかった. MongoDBの事はよく知らない.


環境


  • OS: macOS Mojave Version 10.14.3

  • Language: Scala 2.12.7

  • MongoDB driver: mongo-scala-driver 2.6.0

  • Build Tool: sbt 1.2.1

  • MongoDB: 4.1.6 ( DockerImage mongo:4.1.6-xenial )


MongoDBを構築する

MongoDBのReplica setが安定していないので安定したらいつか書くかも.,,


MongoDB Scala Drivers

MongoDB Scala Driver

依存ライブラリにMongoDB Scala Driverを追加する.


build.sbt

...

lazy val mongodb = (project in file("modules/infrastructure/mongodb"))
.dependsOn(domain)
.settings(
commonSettings,
name := "mongodb",
libraryDependencies ++= Seq(
"org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0",
)
)



Multi-document transactionの流れ

Transactions and Operations, Builders

概要は↓こんな感じ.


MultiDocumentTransaction.scala

import org.mongodb.scala._

import org.mongodb.scala.bson.BsonString
...

// Options
// よく分かってない
val mongodbUri = "mongodb://USER:PASSWORD@HOST:PORT,HOST:PORT/?authSource=DATABASE&replicaSet=REPLICA_SET"
val clientSessionOptions = ClientSessionOptions.builder().causallyConsistent(true).build()
val transactionOptions = TransactionOptions.builder().readConcern(ReadConcern.SNAPSHOT).writeConcern(WriteConcern.MAJORITY).build()

// Start
val mongoClient = MongoClient(mongodbUri)
val clientSession = Await.result(mongoClient.startSession(clientSessionOptions).toFuture(), Duration.Inf)
clientSession.startTransaction(transactionOptions)

// Transaction processing
// Transactions and Operations: https://docs.mongodb.com/manual/core/transactions-operations/

// Builders: http://mongodb.github.io/mongo-scala-driver/2.6/builders/
//
// Case-class sample
// val codecRegistry = fromRegistries(entityCodecs, valueCodecs, DEFAULT_CODEC_REGISTRY)
// val mongoClient: MongoCollection[CaseClass] = client.getDatabase("DATABASE").withCodecRegistry(codecRegistry).getCollection("COLLECTION")
// collection.find(clientSession).results
// collection.find(clientSession).sort(ascending("startDateTime", "endDateTime")).results
// collection.find(clientSession, equal("id", id)).first.headResultOption
// collection.insertOne(clientSession, entity).headResult
// collection.findOneAndReplace(clientSession, equal("id", entity.id), entity).headResult
// collection.findOneAndDelete(clientSession, equal("id", entity.id)).headResult

// End
clientSession.commitTransaction // もしくは clientSession.abortTransaction
clientSession.close()
mongoClient.close()



Helper

Observableの扱いが難しいなぁと思ったら, いっそのことAwaitする.


MongoDBHelper.scala

...

val observable: Observable[C]
val converter: (C) => String
def results(): Seq[C] = Await.result(observable.toFuture(), Duration.Inf)
def headResult(): C = Await.result(observable.head(), Duration.Inf)
def headResultOption(): Option[C] = Await.result(observable.headOption(), Duration.Inf)
...


次のソースが参考になります.


Case-class, ValueObject

Macrosを使用するとDocumet=Case-class=Entityの単位で操作できるMongoCollectionを取得できる.


XxxxRepositoryImpl.scala

import org.bson.codecs.configuration.CodecRegistries

import org.bson.codecs.configuration.CodecRegistries.{fromProviders, fromRegistries}
import org.mongodb.scala.{MongoClient, MongoCollection}
import org.mongodb.scala.bson.codecs.Macros._
import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.Sorts._
...

trait XxxxRepositoryImpl extends XxxxRepository with MongoDBAware {

private val entityCodecs = fromProviders(classOf[CaseClass]) // <- Case-class=EntityのCodec
private val valueCodecs = CodecRegistries.fromCodecs( // <- ValueObjectのCodec
new ValueObjectDateTimeCodec(),
...
)

// ↓ 各コーデックを適用したMongoCollectionを取得する.
// Document=Case-class=Entityの単位で操作できる.
val codecRegistry = fromRegistries(entityCodecs, valueCodecs, DEFAULT_CODEC_REGISTRY)
val mongoClient: MongoCollection[CaseClass] = client.getDatabase("DATABASE").withCodecRegistry(codecRegistry).getCollection("COLLECTION")
...



ValueObjectDateTimeCodec.scala

import java.time.ZonedDateTime

import org.bson.{BsonReader, BsonWriter}
import org.bson.codecs.{Codec, DecoderContext, EncoderContext}
import xxxx.ValueObjectDateTime

class ValueObjectDateTimeCodec extends Codec[ValueObjectDateTime] {

override def getEncoderClass: Class[ValueObjectDateTime] = {
classOf[ValueObjectDateTime]
}

override def encode(writer: BsonWriter, value: ValueObjectDateTime, encorderContext: EncoderContext): Unit = {
writer.writeString(value.value.toString)
}

override def decode(reader: BsonReader, decorderContext: DecoderContext): ValueObjectDateTime = {
ProgramStartDateTime(ZonedDateTime.parse(reader.readString))
}

}


次の記事とOSSが理解しやすいです.


試行錯誤したアレコレ


"このクライアントが接続しているMongoDBクラスターでセッションがサポートされていません"と叱られる

1台構成MongoDBへstartSession接続すると下記エラーになる. StackOverflowに"ReplicaSetのMongoDBに接続すると解消するよ :-)"と書いてある.

startSessionしないなら1台構成MongoDBへ接続できる.


このクライアントが接続しているMongoDBクラスターでセッションがサポートされていません.error.log

...

[info] com.mongodb.MongoClientException: Sessions are not supported by the MongoDB cluster to which this client is connected
[info] at com.mongodb.async.client.MongoClientImpl$1.onResult(MongoClientImpl.java:90)
[info] at com.mongodb.async.client.MongoClientImpl$1.onResult(MongoClientImpl.java:83)
[info] at com.mongodb.async.client.ClientSessionHelper$2.onResult(ClientSessionHelper.java:77)
[info] at com.mongodb.async.client.ClientSessionHelper$2.onResult(ClientSessionHelper.java:73)
[info] at com.mongodb.internal.connection.BaseCluster$ServerSelectionRequest.onResult(BaseCluster.java:433)
[info] at com.mongodb.internal.connection.BaseCluster.handleServerSelectionRequest(BaseCluster.java:309)
[info] at com.mongodb.internal.connection.BaseCluster.access$800(BaseCluster.java:65)
[info] at com.mongodb.internal.connection.BaseCluster$WaitQueueHandler.run(BaseCluster.java:482)
[info] at java.lang.Thread.run(Thread.java:748)
[info] ...



I was having the same issue when I was trying to connect it to a single standalone mongo instance, however as written in the official documentation, that Mongo supports transaction feature for a replica set. So, I then tried to create a replica set with all instances on MongoDB 4.0.0, I was able to successfully execute the code. So, Start a replica set (3 members), then try to execute the code, the issue will be resolved.



複数Case-classをfromProviderに設定すると実行エラーになる

複数Case-classをfromProviderに設定すると実行エラーになる. Documentがネストするから? よく分からないので(Case-class書式の)ValueObjectは自前でCodecを定義する. ٩( ᐛ )و


Macroはよく分からん.error.log

...

[info] - should *** FAILED ***
[info] java.util.concurrent.ExecutionException: Boxed Error
[info] at scala.concurrent.impl.Promise$.resolver(Promise.scala:83)
[info] at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:75)
[info] at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:280)
[info] at scala.concurrent.Promise.complete(Promise.scala:49)
[info] at scala.concurrent.Promise.complete$(Promise.scala:48)
[info] at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:183)
[info] at scala.concurrent.Promise.failure(Promise.scala:100)
[info] at scala.concurrent.Promise.failure$(Promise.scala:100)
[info] at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:183)
[info] at org.mongodb.scala.ObservableImplicits$ScalaObservable$$anon$2.$anonfun$onError$1(ObservableImplicits.scala:369)
[info] ...
[info] Cause: java.lang.StackOverflowError:
[info] at xxxx.XxxxRepositoryImpl$$anon$2$ProgramIdMacroCodec$3.writeValue(XxxxRepositoryImpl.scala:20)
[info] at org.mongodb.scala.bson.codecs.macrocodecs.MacroCodec.encode(MacroCodec.scala:92)
[info] at org.mongodb.scala.bson.codecs.macrocodecs.MacroCodec.encode$(MacroCodec.scala:88)
[info] at xxxx.XxxxRepositoryImpl$$anon$2$ProgramIdMacroCodec$3.encode(XxxxRepositoryImpl.scala:20)
[info] at org.bson.codecs.configuration.LazyCodec.encode(LazyCodec.java:37)
[info] at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
[info] at org.mongodb.scala.bson.codecs.macrocodecs.MacroCodec.writeValue(MacroCodec.scala:168)
[info] at org.mongodb.scala.bson.codecs.macrocodecs.MacroCodec.writeValue$(MacroCodec.scala:162)
[info] at xxxx.XxxxRepositoryImpl$$anon$2$ProgramIdMacroCodec$3.writeValue(XxxxRepositoryImpl.scala:20)
[info] at org.mongodb.scala.bson.codecs.macrocodecs.MacroCodec.encode(MacroCodec.scala:92)
[info] ...
[info] Run completed in 5 seconds, 415 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***



Multi-document transactionをサポートしていないメソッドにclientSessionを設定すると叱られる


サポートしてないメソッドにclientSessionを設定しちゃだめ!.error.log

...

[info] - should *** FAILED ***
[info] com.mongodb.MongoCommandException: Command failed with error 263 (OperationNotSupportedInTransaction): 'Cannot run 'drop' in a multi-document transaction.' on server HOST:PORT. The full response is {"operationTime": {"$timestamp": {"t": 1550758394, "i": 1}}, "ok": 0.0, "errmsg": "Cannot run 'drop' in a multi-document transaction.", "code": 263, "codeName": "OperationNotSupportedInTransaction", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1550758394, "i": 1}}, "signature": {"hash": {"$binary": "wQVXtbuZ2TfXAL+X+sebp3owOKE=", "$type": "00"}, "keyId": {"$numberLong": "6660456186795524097"}}}}
[info] at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:179)
[info] at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:386)
[info] at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:372)
[info] at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:667)
[info] at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:634)
[info] at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:510)
[info] at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:507)
[info] at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220)
[info] at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203)
[info] at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
[info] ...
[info] Run completed in 4 seconds, 175 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***



所感

SchemaLessを体験したかっただけなのにMongoDBのReplica setの変な処に時間を取られた.,, 疲れた.


参考文献