LoginSignup
1

More than 1 year has passed since last update.

Spring + GraphQLでトランザクション管理できるようにする

Last updated at Posted at 2020-02-10

第二部 (2022年)

2年半の間、この現象が心の棘となっており、GraphQLを扱うコードを書く度に、同じ現象の報告や、こんな付け焼き刃ではない解決策がないかをググってました。
今回、技術スタックを見直すにあたり、GraphQL発祥のNode.js系に転向すべきかもしれないと何かが囁いた時、本当にJava系では駄目なのかをハッキリさせようと調査しました。

その結論として、単に自分のバグだということが判明しました!!!
穴があったら入りたいという思いもありますが、これで安心してGraphQLの処理が書ける安堵感も大きいです。

原因はDataLoaderオブジェクトがシングルトンになってたためでした。
修正は、こうなります。

  @Component
  class GraphQLContextBuilder(val mappedBatchLoaders: List<MappedBatchLoader<*, *>>): DefaultGraphQLServletContextBuilder() {
-   val dataLoaderRegistry: DataLoaderRegistry = dataLoaderRegistry()

    override fun build(request: HttpServletRequest, response: HttpServletResponse): GraphQLContext {
      return DefaultGraphQLServletContext.createServletContext().with(request).with(response)
-       .with(dataLoaderRegistry).build()
+       .with(dataLoaderRegistry()).build()
    }

    override fun build(session: Session, handshakeRequest: HandshakeRequest): GraphQLContext {
      return DefaultGraphQLWebSocketContext.createWebSocketContext().with(session).with(handshakeRequest)
-       .with(dataLoaderRegistry).build()
+       .with(dataLoaderRegistry()).build()
    }

    final fun dataLoaderRegistry(): DataLoaderRegistry {
      val registry = DataLoaderRegistry()
      mappedBatchLoaders
        .forEach {
          registry.register(it::class.simpleName, DataLoader.newMappedDataLoader(it, DataLoaderOptions.newOptions().setCachingEnabled(false)))
        }
      return registry
    }
  }

DataLoaderオブジェクトの中に、BatchLoaderを呼び出す時のパラメータと、DataLoader#loadが返すCompletableFutureオブジェクトを、格納するキューが存在しています。DataLoaderオブジェクトがシングルトンなので、全スレッドの呼び出しパラメータが同じキューに格納されてしまいます。

で、異なるスレッド用のBatchLoaderがまとめて実行され、DataLoader#loadが返すCompletableFutureオブジェクトのcompleteメソッドが呼び出された結果、このwhenCompletedが別スレッド上で実行されてしまうようなのです。。。このあたりは、CompletableFutureオブジェクトの連鎖があって、正確に理解できたとまでは言い切れません。

ただ、修正版の再現プログラムを長時間走行させましたが、問題なく動作しており、この理解でほぼ間違いないでしょう。

第一部 (2020年)

GraphQLでは1リクエスト中に複数の処理を行うことが可能です。

mutation {
  m1: createUser(name: "ベス")
  m2: createUser(name: "カララ")
}

しかし、この複数の処理を横断してたトランザクションを行いたい、例えばカララが作成できなかったら、ベスの作成もロールバックしたい場合などは、どうすればよいのでしょうか?

graphql-spring-bootとか探してみても、それらしき機能はなさそうでしたが、他でTransactional queries with Springという記事を見つけました。

    @Override
    public InstrumentationContext<ExecutionResult> beginExecuteOperation(
            InstrumentationExecuteOperationParameters parameters) {
        // 1
        var tx = new TransactionTemplate(this.transactionManager);
        var op = parameters.getExecutionContext().getOperationDefinition().getOperation();
        if (!Operation.MUTATION.equals(operation)) {
            tx.setReadOnly(true);
        }
        var status = this.transactionManager.getTransaction(tx);
        return SimpleInstrumentationContext.whenCompleted((t, e) -> {
            // 2
            if (e != null) {
                this.transactionManager.rollback(status);
            } else {
                this.transactionManager.commit(status);
            }
        });
    }

instrumentation機能を使って

  1. 処理の前にSpringのTransactionTemplateオブジェクトを生成してトランザクションを開始。
  2. 処理が終わったらSimpleInstrumentationContext.whenCompletedに登録したコールバックが呼び出されるので、正常ならコミット、例外が発生していたらロールバックする。

というものです。

記事に感謝しつつ実装をすすめ、クライアント側と結合をすると困ったことが起こるようになりました。

Caused by: java.lang.IllegalStateException: No value for key [HikariDataSource (HikariPool-1)] bound to thread [http-nio-auto-1-exec-7]
	at org.springframework.transaction.support.TransactionSynchronizationManager.unbindResource(TransactionSynchronizationManager.java:213) ~[spring-tx-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.orm.jpa.JpaTransactionManager.doCleanupAfterCompletion(JpaTransactionManager.java:600) ~[spring-orm-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.transaction.support.AbstractPlatformTransactionManager.cleanupAfterCompletion(AbstractPlatformTransactionManager.java:1005) ~[spring-tx-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:791) ~[spring-tx-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:712) ~[spring-tx-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at quo.vadis.megasys.GraphqlTransactionInstrumentation$beginExecuteOperation$1.accept(GraphqlTransactionApplication.kt:153) ~[classes/:na]
	at quo.vadis.megasys.GraphqlTransactionInstrumentation$beginExecuteOperation$1.accept(GraphqlTransactionApplication.kt:129) ~[classes/:na]
	at graphql.execution.instrumentation.SimpleInstrumentationContext.onCompleted(SimpleInstrumentationContext.java:48) ~[graphql-java-13.0.jar:na]
	at graphql.execution.instrumentation.ChainedInstrumentation$ChainedInstrumentationContext.lambda$onCompleted$1(ChainedInstrumentation.java:254) ~[graphql-java-13.0.jar:na]
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
	at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) ~[na:na]
	at graphql.execution.instrumentation.ChainedInstrumentation$ChainedInstrumentationContext.onCompleted(ChainedInstrumentation.java:254) ~[graphql-java-13.0.jar:na]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[na:na]
	... 59 common frames omitted

コミット処理の後始末で、スレッドローカル変数に格納されている筈のオブジェクトが何故かないようです。

そこで色々とログを埋め込んだ結果、以下のようなことが起きてました。

  1. スレッドAでトランザクション1を開始。
  2. スレッドBでトランザクション2を開始。
  3. スレッドAでトランザクション2を終了。
  4. スレッドAでトランザクション1を終了。
    • ここで上記の例外が発生。
    • DBを確認すると、おそらくスレッドBが使っていただろうコネクションが、トランザクション中のまま。

再現プログラム

コードをそのまま載せると膨大なので、ミニミニ再現プログラムを作ってみました。
(2022/07/26追記)このコードにはバグがあります。上記、第二部参照。

application.kt
@SpringBootApplication
class GraphqlTransactionApplication : GraphQLQueryResolver {
  @Autowired
  lateinit var userRepo: UserRepository

  @Autowired
  lateinit var transactionTemplate: TransactionTemplate

  fun users(): List<UserModel> {
    return userRepo.findAll()
  }
}

fun main(args: Array<String>) {
  runApplication<GraphqlTransactionApplication>(*args)
}

@Component
class GraphqlTransactionInstrumentation(private val transactionManager: PlatformTransactionManager) : SimpleInstrumentation() {
  override fun beginExecuteOperation(parameters: InstrumentationExecuteOperationParameters): InstrumentationContext<ExecutionResult> {
    /* 上述と同じコードなので割愛 */
  }
}

@Component
class UserModelGraphQLResolver(): GraphQLResolver<UserModel> {
  fun groups(userModel: UserModel, env: DataFetchingEnvironment): CompletableFuture<List<GroupModel>> {
    val context = env.getContext<GraphQLContext>()
    val registry = context.dataLoaderRegistry.get()
    val dataLoader = registry.getDataLoader<Int, List<GroupModel>>(UserGroupDataLoader::class.simpleName)
    return dataLoader.load(userModel.userId)
  }
}

@Component
class UserGroupDataLoader(val userGroupRepo: UserGroupRepository, val groupRepo: GroupRepository) : MappedBatchLoader<Int, List<GroupModel>> {
  override fun load(keys: MutableSet<Int>): CompletionStage<MutableMap<Int, List<GroupModel>>> {
    val userGroups = userGroupRepo.findByKeyUserIdIn(keys)
    val groups = groupRepo.findAllById(userGroups.map { it.key.groupId }.toSet()).map { it.groupId to it }.toMap()
    val result = userGroups
      .groupBy { it.key.userId }
      .map { entry -> entry.key to entry.value.map { groups[it.key.groupId]!! } }
      .toMap().toMutableMap()
    return CompletableFuture.completedFuture(result)
  }
}

@Component
class GraphQLContextBuilder(val mappedBatchLoaders: List<MappedBatchLoader<*, *>>): DefaultGraphQLServletContextBuilder() {
  val dataLoaderRegistry: DataLoaderRegistry = dataLoaderRegistry()

  override fun build(request: HttpServletRequest, response: HttpServletResponse): GraphQLContext {
    return DefaultGraphQLServletContext.createServletContext().with(request).with(response)
      .with(dataLoaderRegistry).build()
  }

  override fun build(session: Session, handshakeRequest: HandshakeRequest): GraphQLContext {
    return DefaultGraphQLWebSocketContext.createWebSocketContext().with(session).with(handshakeRequest)
      .with(dataLoaderRegistry).build()
  }

  final fun dataLoaderRegistry(): DataLoaderRegistry {
    val registry = DataLoaderRegistry()
    mappedBatchLoaders
      .forEach {
        registry.register(it::class.simpleName, DataLoader.newMappedDataLoader(it, DataLoaderOptions.newOptions().setCachingEnabled(false)))
      }
    return registry
  }
}
repositories.kt
@Entity
data class UserModel(
  @Id
  @GeneratedValue
  var userId: Int = -1,
  var userName: String = StringUtils.EMPTY
)

@Entity
data class GroupModel(
  @Id
  @GeneratedValue
  var groupId: Int = -1,
  var groupName: String = StringUtils.EMPTY
)

@Embeddable
data class UserGroupKey(
  var userId: Int = -1,
  var groupId: Int = -1
): Serializable

@Entity
data class UserGroupModel(
  @EmbeddedId
  var key: UserGroupKey = UserGroupKey()
)

interface UserRepository: JpaRepository<UserModel, Int>
interface GroupRepository: JpaRepository<GroupModel, Int>
interface UserGroupRepository: JpaRepository<UserGroupModel, UserGroupKey> {
  fun findByKeyUserIdIn(userId: Collection<Int>): List<UserGroupModel>
}
application.schema.graphqls
type User {
  userId: ID!
  userName: String!
  groups: [Group!]
}

type Group {
  groupId: ID!
  groupName: String!
}

type Query {
  users: [User!]
}

使用したOSSは以下のバージョンです。

  • Spring Boot 2.2.4.RELEASE
  • Graphql Spring Boot Starter 6.0.1

大まかな処理としては、クライアントから

query {
  users {
    userName
    groups {
      groupName
    }
  }
}

で問い合わせると、、、

  • GraphqlTransactionApplication.usersメソッドが呼出され、まずUserModelオブジェクトを返す。
  • UserModelオブジェクトにはgroupsプロパティが無いので、UserModelGraphQLResolver.groupsが呼出される。ここではMappedBatchLoaderを実装したUserGroupDataLoaderを返す。
  • UserGroupDataLoader.loadメソッドが呼出され、GraphqlTransactionApplication.usersメソッドで返したユーザのIDがリストで渡されるので、該当するGroupModelオブジェクトを返す。
    • 同じスレッドでDBアクセス処理をさせるため、CompletableFuture.completedFutureで結果を返してます。

のように動きます。

これをクライアントからマルチスレッドでアクセスすると上記の現象が発生します。
そしてgroupsプロパティを指定せず、UserGroupDataLoaderを使わないと現象は発生しません。

デバッガで追ってみると通常時は

graphql.execution.Execution
    private CompletableFuture<ExecutionResult> executeOperation(ExecutionContext executionContext, InstrumentationExecutionParameters instrumentationExecutionParameters, Object root, OperationDefinition operationDefinition) {

        InstrumentationExecuteOperationParameters instrumentationParams = new InstrumentationExecuteOperationParameters(executionContext);
        // ①
        InstrumentationContext<ExecutionResult> executeOperationCtx = instrumentation.beginExecuteOperation(instrumentationParams);

        OperationDefinition.Operation operation = operationDefinition.getOperation();

        〜〜〜割愛〜〜〜

        CompletableFuture<ExecutionResult> result;
        try {
            ExecutionStrategy executionStrategy;
            if (operation == OperationDefinition.Operation.MUTATION) {
                executionStrategy = mutationStrategy;
            } else if (operation == SUBSCRIPTION) {
                executionStrategy = subscriptionStrategy;
            } else {
                executionStrategy = queryStrategy;
            }
            log.debug("Executing '{}' query operation: '{}' using '{}' execution strategy", executionContext.getExecutionId(), operation, executionStrategy.getClass().getName());
            // ②
            result = executionStrategy.execute(executionContext, parameters);
        } catch (NonNullableFieldWasNullException e) {
            result = completedFuture(new ExecutionResultImpl(null, executionContext.getErrors()));
        }
        executeOperationCtx.onDispatched(result);
        // ③
        result = result.whenComplete(executeOperationCtx::onCompleted);
        return deferSupport(executionContext, result);
    }

①でinstrumentationのbeginExecuteOperationが呼び出されてトランザクションが開始し、②でGraphqlTransactionApplication.usersなどの処理が行われ、③でinstrumentationが返却した終了処理メソッドでコミットまたはロールバックが行われます。
同じメソッド内でトランザクションの開始と終了が行われるので、別スレッドのexecuteOperationCtxオブジェクトのonCompletedを呼び出す理由がありません。

しかし、マルチスレッドでMappedBatchLoaderを使った処理をしていると、instrumentationのonCompletedが③から直接ではなく、他のinstrumentationのonCompletedからの連鎖で呼び出されることが判りました。

accept:102, GraphqlTransactionInstrumentation$beginExecuteOperation$1 (quo.vadis.megasys)
accept:78, GraphqlTransactionInstrumentation$beginExecuteOperation$1 (quo.vadis.megasys)
onCompleted:48, SimpleInstrumentationContext (graphql.execution.instrumentation)
lambda$onCompleted$1:254, ChainedInstrumentation$ChainedInstrumentationContext (graphql.execution.instrumentation)
accept:-1, 1449497430 (graphql.execution.instrumentation.ChainedInstrumentation$ChainedInstrumentationContext$$Lambda$1476)
forEach:1540, ArrayList (java.util)
forEach:1085, Collections$UnmodifiableCollection (java.util)
onCompleted:254, ChainedInstrumentation$ChainedInstrumentationContext (graphql.execution.instrumentation)
accept:-1, 1123005170 (graphql.execution.Execution$$Lambda$1571)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
tryFire:837, CompletableFuture$UniWhenComplete (java.util.concurrent)
postComplete:506, CompletableFuture (java.util.concurrent)
complete:2073, CompletableFuture (java.util.concurrent)
lambda$handleResults$0:35, AbstractAsyncExecutionStrategy (graphql.execution)
accept:-1, 476641882 (graphql.execution.AbstractAsyncExecutionStrategy$$Lambda$1563)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
tryFire:837, CompletableFuture$UniWhenComplete (java.util.concurrent)
postComplete:506, CompletableFuture (java.util.concurrent)
complete:2073, CompletableFuture (java.util.concurrent)
lambda$each$0:40, Async (graphql.execution)
accept:-1, 1627764878 (graphql.execution.Async$$Lambda$1552)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
tryFire:837, CompletableFuture$UniWhenComplete (java.util.concurrent)
postComplete:506, CompletableFuture (java.util.concurrent)
complete:2073, CompletableFuture (java.util.concurrent)
lambda$completeValueForList$8:553, ExecutionStrategy (graphql.execution)
accept:-1, 1358196039 (graphql.execution.ExecutionStrategy$$Lambda$1560)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
tryFire:837, CompletableFuture$UniWhenComplete (java.util.concurrent)
postComplete:506, CompletableFuture (java.util.concurrent)
complete:2073, CompletableFuture (java.util.concurrent)
lambda$each$0:40, Async (graphql.execution)
accept:-1, 1627764878 (graphql.execution.Async$$Lambda$1552)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
tryFire:837, CompletableFuture$UniWhenComplete (java.util.concurrent)
postComplete:506, CompletableFuture (java.util.concurrent)
complete:2073, CompletableFuture (java.util.concurrent)
lambda$handleResults$0:35, AbstractAsyncExecutionStrategy (graphql.execution)
accept:-1, 476641882 (graphql.execution.AbstractAsyncExecutionStrategy$$Lambda$1563)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
uniWhenCompleteStage:883, CompletableFuture (java.util.concurrent)
whenComplete:2251, CompletableFuture (java.util.concurrent)
lambda$execute$1:89, AsyncExecutionStrategy (graphql.execution)
accept:-1, 403822411 (graphql.execution.AsyncExecutionStrategy$$Lambda$1554)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
tryFire:837, CompletableFuture$UniWhenComplete (java.util.concurrent)
postComplete:506, CompletableFuture (java.util.concurrent)
complete:2073, CompletableFuture (java.util.concurrent)
lambda$each$0:40, Async (graphql.execution)
accept:-1, 1627764878 (graphql.execution.Async$$Lambda$1552)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
tryFire:837, CompletableFuture$UniWhenComplete (java.util.concurrent)
postComplete:506, CompletableFuture (java.util.concurrent)
complete:2073, CompletableFuture (java.util.concurrent)
lambda$dispatchQueueBatch$2:200, DataLoaderHelper (org.dataloader)
apply:-1, 649559205 (org.dataloader.DataLoaderHelper$$Lambda$1580)
uniApplyNow:680, CompletableFuture (java.util.concurrent)
uniApplyStage:658, CompletableFuture (java.util.concurrent)
thenApply:2094, CompletableFuture (java.util.concurrent)
dispatchQueueBatch:177, DataLoaderHelper (org.dataloader)
dispatch:141, DataLoaderHelper (org.dataloader)
dispatch:461, DataLoader (org.dataloader)
accept:-1, 1574118341 (org.dataloader.DataLoaderRegistry$$Lambda$1532)
forEach:1540, ArrayList (java.util)
dispatchAll:94, DataLoaderRegistry (org.dataloader)
dispatch:290, FieldLevelTrackingApproach (graphql.execution.instrumentation.dataloader)
onFieldValuesInfo:159, FieldLevelTrackingApproach$1 (graphql.execution.instrumentation.dataloader)
lambda$onFieldValuesInfo$2:278, ChainedInstrumentation$ChainedExecutionStrategyInstrumentationContext (graphql.execution.instrumentation)
accept:-1, 968785148 (graphql.execution.instrumentation.ChainedInstrumentation$ChainedExecutionStrategyInstrumentationContext$$Lambda$1567)
forEach:1540, ArrayList (java.util)
forEach:1085, Collections$UnmodifiableCollection (java.util)
onFieldValuesInfo:278, ChainedInstrumentation$ChainedExecutionStrategyInstrumentationContext (graphql.execution.instrumentation)
lambda$execute$1:88, AsyncExecutionStrategy (graphql.execution)
accept:-1, 403822411 (graphql.execution.AsyncExecutionStrategy$$Lambda$1554)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
uniWhenCompleteStage:883, CompletableFuture (java.util.concurrent)
whenComplete:2251, CompletableFuture (java.util.concurrent)
execute:81, AsyncExecutionStrategy (graphql.execution)
executeOperation:161, Execution (graphql.execution)

何故、このようなことになるのか?GraphQL Javaは非同期用に作成されているからしょうがないのか?詳細は終えていませんが、これにより別スレッドのトランザクションが終了してしまう現象が発生しているようです。

そこで、同じスレッドで確実に呼び出されるよう、以下のように対処してみました。
(2022/07/26追記)この回避策は適切ではありません。上記、第二部参照。

    @Override
    public InstrumentationContext<ExecutionResult> beginExecuteOperation(
            InstrumentationExecuteOperationParameters parameters) {
        var tx = new TransactionTemplate(this.transactionManager);
        var op = parameters.getExecutionContext().getOperationDefinition().getOperation();
        if (!Operation.MUTATION.equals(operation)) {
            tx.setReadOnly(true);
        }
        var status = this.transactionManager.getTransaction(tx);
        return SimpleInstrumentationContext.whenDispatched { codeToRun ->
          codeToRun.join()

          if (codeToRun.isCompletedExceptionally || codeToRun.get().errors.isNotEmpty()) {
            this.transactionManager.rollback(status)
          } else {
            this.transactionManager.commit(status)
          }
        }
    }

result.whenCompleteを挟んで呼び出されるonCompletedと異なり、onDispatchedの呼び出しは確実に同じスレッドで呼び出される筈です。そしてMappedBatchLoaderの処理は同期式で行っているので、whenDispatchedが呼び出された時には既に処理が終了している筈です。ただ、念のためjoin()で処理の終了を確実に待ち合わせるようにしています。

(2020/02/25追記)
処理中に例外が発生しても、GraphQL側でExecutionResultのエラー情報に翻訳されるので、codeToRun.isCompletedExceptionallyがtrueにならないことが判りました。
なので、エラー情報が存在した場合もロールバックするようにしました。

考察

上記に載せたコードは現時点で動いていますが、弊害を精査できている訳ではないのでご注意下さい。

SpringでもReactiveでトランザクションができるようになりつつあり、もっと自分の理解が進めば、それを使ったトランザクションを実装できるようになるかもしれませんが、今時点ではここまでが精一杯です。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1