Help us understand the problem. What is going on with this article?

ScalaでMongoDBへMulti-document transaction接続する

More than 1 year has passed since last update.

背景

ScalaでNoSQLを触ってみたかった. MongoDBの事はよく知らない.

環境

  • OS: macOS Mojave Version 10.14.3
  • Language: Scala 2.12.7
  • MongoDB driver: mongo-scala-driver 2.6.0
  • Build Tool: sbt 1.2.1
  • MongoDB: 4.1.6 ( DockerImage mongo:4.1.6-xenial )

MongoDBを構築する

MongoDBのReplica setが安定していないので安定したらいつか書くかも.,,

MongoDB Scala Drivers

MongoDB Scala Driver
依存ライブラリにMongoDB Scala Driverを追加する.

build.sbt
...

lazy val mongodb = (project in file("modules/infrastructure/mongodb"))
  .dependsOn(domain)
  .settings(
    commonSettings,
    name := "mongodb",
    libraryDependencies ++= Seq(
      "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0",
    )
  )

Multi-document transactionの流れ

Transactions and Operations, Builders
概要は↓こんな感じ.

MultiDocumentTransaction.scala
import org.mongodb.scala._
import org.mongodb.scala.bson.BsonString
...

// Options
// よく分かってない
val mongodbUri           = "mongodb://USER:PASSWORD@HOST:PORT,HOST:PORT/?authSource=DATABASE&replicaSet=REPLICA_SET"
val clientSessionOptions = ClientSessionOptions.builder().causallyConsistent(true).build()
val transactionOptions   = TransactionOptions.builder().readConcern(ReadConcern.SNAPSHOT).writeConcern(WriteConcern.MAJORITY).build()

// Start
val mongoClient   = MongoClient(mongodbUri)
val clientSession = Await.result(mongoClient.startSession(clientSessionOptions).toFuture(), Duration.Inf)
clientSession.startTransaction(transactionOptions)

  // Transaction processing
  // Transactions and Operations: https://docs.mongodb.com/manual/core/transactions-operations/

  // Builders: http://mongodb.github.io/mongo-scala-driver/2.6/builders/
  //
  // Case-class sample
  // val codecRegistry = fromRegistries(entityCodecs, valueCodecs, DEFAULT_CODEC_REGISTRY)
  // val mongoClient: MongoCollection[CaseClass] = client.getDatabase("DATABASE").withCodecRegistry(codecRegistry).getCollection("COLLECTION")
  // collection.find(clientSession).results
  // collection.find(clientSession).sort(ascending("startDateTime", "endDateTime")).results
  // collection.find(clientSession, equal("id", id)).first.headResultOption
  // collection.insertOne(clientSession, entity).headResult
  // collection.findOneAndReplace(clientSession, equal("id", entity.id), entity).headResult
  // collection.findOneAndDelete(clientSession, equal("id", entity.id)).headResult

// End
clientSession.commitTransaction // もしくは clientSession.abortTransaction 
clientSession.close()
mongoClient.close()

Helper

Observableの扱いが難しいなぁと思ったら, いっそのことAwaitする.

MongoDBHelper.scala
...

val observable: Observable[C]
val converter:  (C) => String
def results():          Seq[C]    = Await.result(observable.toFuture(), Duration.Inf)
def headResult():       C         = Await.result(observable.head(), Duration.Inf)
def headResultOption(): Option[C] = Await.result(observable.headOption(), Duration.Inf)
...

次のソースが参考になります.

Case-class, ValueObject

Macrosを使用するとDocumet=Case-class=Entityの単位で操作できるMongoCollectionを取得できる.

XxxxRepositoryImpl.scala
import org.bson.codecs.configuration.CodecRegistries
import org.bson.codecs.configuration.CodecRegistries.{fromProviders, fromRegistries}
import org.mongodb.scala.{MongoClient, MongoCollection}
import org.mongodb.scala.bson.codecs.Macros._
import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.Sorts._
...

trait XxxxRepositoryImpl extends XxxxRepository with MongoDBAware {

  private val entityCodecs = fromProviders(classOf[CaseClass]) // <- Case-class=EntityのCodec
  private val valueCodecs = CodecRegistries.fromCodecs(        // <- ValueObjectのCodec
    new ValueObjectDateTimeCodec(),
    ...
  )

  // ↓ 各コーデックを適用したMongoCollectionを取得する.
  // Document=Case-class=Entityの単位で操作できる.
  val codecRegistry = fromRegistries(entityCodecs, valueCodecs, DEFAULT_CODEC_REGISTRY)
  val mongoClient: MongoCollection[CaseClass] = client.getDatabase("DATABASE").withCodecRegistry(codecRegistry).getCollection("COLLECTION") 
  ...

ValueObjectDateTimeCodec.scala
import java.time.ZonedDateTime

import org.bson.{BsonReader, BsonWriter}
import org.bson.codecs.{Codec, DecoderContext, EncoderContext}
import xxxx.ValueObjectDateTime

class ValueObjectDateTimeCodec extends Codec[ValueObjectDateTime] {

  override def getEncoderClass: Class[ValueObjectDateTime] = {
    classOf[ValueObjectDateTime]
  }

  override def encode(writer: BsonWriter, value: ValueObjectDateTime, encorderContext: EncoderContext): Unit = {
    writer.writeString(value.value.toString)
  }

  override def decode(reader: BsonReader, decorderContext: DecoderContext): ValueObjectDateTime = {
    ProgramStartDateTime(ZonedDateTime.parse(reader.readString))
  }

}

次の記事とOSSが理解しやすいです.

試行錯誤したアレコレ

"このクライアントが接続しているMongoDBクラスターでセッションがサポートされていません"と叱られる

1台構成MongoDBへstartSession接続すると下記エラーになる. StackOverflowに"ReplicaSetのMongoDBに接続すると解消するよ :-)"と書いてある.
startSessionしないなら1台構成MongoDBへ接続できる.

このクライアントが接続しているMongoDBクラスターでセッションがサポートされていません.error.log
...

[info]   com.mongodb.MongoClientException: Sessions are not supported by the MongoDB cluster to which this client is connected
[info]   at com.mongodb.async.client.MongoClientImpl$1.onResult(MongoClientImpl.java:90)
[info]   at com.mongodb.async.client.MongoClientImpl$1.onResult(MongoClientImpl.java:83)
[info]   at com.mongodb.async.client.ClientSessionHelper$2.onResult(ClientSessionHelper.java:77)
[info]   at com.mongodb.async.client.ClientSessionHelper$2.onResult(ClientSessionHelper.java:73)
[info]   at com.mongodb.internal.connection.BaseCluster$ServerSelectionRequest.onResult(BaseCluster.java:433)
[info]   at com.mongodb.internal.connection.BaseCluster.handleServerSelectionRequest(BaseCluster.java:309)
[info]   at com.mongodb.internal.connection.BaseCluster.access$800(BaseCluster.java:65)
[info]   at com.mongodb.internal.connection.BaseCluster$WaitQueueHandler.run(BaseCluster.java:482)
[info]   at java.lang.Thread.run(Thread.java:748)
[info]   ...

I was having the same issue when I was trying to connect it to a single standalone mongo instance, however as written in the official documentation, that Mongo supports transaction feature for a replica set. So, I then tried to create a replica set with all instances on MongoDB 4.0.0, I was able to successfully execute the code. So, Start a replica set (3 members), then try to execute the code, the issue will be resolved.

複数Case-classをfromProviderに設定すると実行エラーになる

複数Case-classをfromProviderに設定すると実行エラーになる. Documentがネストするから? よく分からないので(Case-class書式の)ValueObjectは自前でCodecを定義する. ٩( ᐛ )و

Macroはよく分からん.error.log
...

[info] - should *** FAILED ***
[info]   java.util.concurrent.ExecutionException: Boxed Error
[info]   at scala.concurrent.impl.Promise$.resolver(Promise.scala:83)
[info]   at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:75)
[info]   at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:280)
[info]   at scala.concurrent.Promise.complete(Promise.scala:49)
[info]   at scala.concurrent.Promise.complete$(Promise.scala:48)
[info]   at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:183)
[info]   at scala.concurrent.Promise.failure(Promise.scala:100)
[info]   at scala.concurrent.Promise.failure$(Promise.scala:100)
[info]   at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:183)
[info]   at org.mongodb.scala.ObservableImplicits$ScalaObservable$$anon$2.$anonfun$onError$1(ObservableImplicits.scala:369)
[info]   ...
[info]   Cause: java.lang.StackOverflowError:
[info]   at xxxx.XxxxRepositoryImpl$$anon$2$ProgramIdMacroCodec$3.writeValue(XxxxRepositoryImpl.scala:20)
[info]   at org.mongodb.scala.bson.codecs.macrocodecs.MacroCodec.encode(MacroCodec.scala:92)
[info]   at org.mongodb.scala.bson.codecs.macrocodecs.MacroCodec.encode$(MacroCodec.scala:88)
[info]   at xxxx.XxxxRepositoryImpl$$anon$2$ProgramIdMacroCodec$3.encode(XxxxRepositoryImpl.scala:20)
[info]   at org.bson.codecs.configuration.LazyCodec.encode(LazyCodec.java:37)
[info]   at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
[info]   at org.mongodb.scala.bson.codecs.macrocodecs.MacroCodec.writeValue(MacroCodec.scala:168)
[info]   at org.mongodb.scala.bson.codecs.macrocodecs.MacroCodec.writeValue$(MacroCodec.scala:162)
[info]   at xxxx.XxxxRepositoryImpl$$anon$2$ProgramIdMacroCodec$3.writeValue(XxxxRepositoryImpl.scala:20)
[info]   at org.mongodb.scala.bson.codecs.macrocodecs.MacroCodec.encode(MacroCodec.scala:92)
[info]   ...
[info] Run completed in 5 seconds, 415 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***

Multi-document transactionをサポートしていないメソッドにclientSessionを設定すると叱られる

サポートしてないメソッドにclientSessionを設定しちゃだめ!.error.log
...

[info] - should *** FAILED ***
[info]   com.mongodb.MongoCommandException: Command failed with error 263 (OperationNotSupportedInTransaction): 'Cannot run 'drop' in a multi-document transaction.' on server HOST:PORT. The full response is {"operationTime": {"$timestamp": {"t": 1550758394, "i": 1}}, "ok": 0.0, "errmsg": "Cannot run 'drop' in a multi-document transaction.", "code": 263, "codeName": "OperationNotSupportedInTransaction", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1550758394, "i": 1}}, "signature": {"hash": {"$binary": "wQVXtbuZ2TfXAL+X+sebp3owOKE=", "$type": "00"}, "keyId": {"$numberLong": "6660456186795524097"}}}}
[info]   at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:179)
[info]   at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:386)
[info]   at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:372)
[info]   at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:667)
[info]   at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:634)
[info]   at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:510)
[info]   at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:507)
[info]   at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220)
[info]   at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203)
[info]   at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
[info]   ...
[info] Run completed in 4 seconds, 175 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***

所感

SchemaLessを体験したかっただけなのにMongoDBのReplica setの変な処に時間を取られた.,, 疲れた.

参考文献

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away