Help us understand the problem. What is going on with this article?

Spring + GraphQLでトランザクション管理できるようにする

GraphQLでは1リクエスト中に複数の処理を行うことが可能です。

mutation {
  m1: createUser(name: "ベス")
  m2: createUser(name: "カララ")
}

しかし、この複数の処理を横断してたトランザクションを行いたい、例えばカララが作成できなかったら、ベスの作成もロールバックしたい場合などは、どうすればよいのでしょうか?

graphql-spring-bootとか探してみても、それらしき機能はなさそうでしたが、他でTransactional queries with Springという記事を見つけました。

    @Override
    public InstrumentationContext<ExecutionResult> beginExecuteOperation(
            InstrumentationExecuteOperationParameters parameters) {
        // 1
        var tx = new TransactionTemplate(this.transactionManager);
        var op = parameters.getExecutionContext().getOperationDefinition().getOperation();
        if (!Operation.MUTATION.equals(operation)) {
            tx.setReadOnly(true);
        }
        var status = this.transactionManager.getTransaction(tx);
        return SimpleInstrumentationContext.whenCompleted((t, e) -> {
            // 2
            if (e != null) {
                this.transactionManager.rollback(status);
            } else {
                this.transactionManager.commit(status);
            }
        });
    }

instrumentation機能を使って

  1. 処理の前にSpringのTransactionTemplateオブジェクトを生成してトランザクションを開始。
  2. 処理が終わったらSimpleInstrumentationContext.whenCompletedに登録したコールバックが呼び出されるので、正常ならコミット、例外が発生していたらロールバックする。

というものです。

記事に感謝しつつ実装をすすめ、クライアント側と結合をすると困ったことが起こるようになりました。

Caused by: java.lang.IllegalStateException: No value for key [HikariDataSource (HikariPool-1)] bound to thread [http-nio-auto-1-exec-7]
    at org.springframework.transaction.support.TransactionSynchronizationManager.unbindResource(TransactionSynchronizationManager.java:213) ~[spring-tx-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.orm.jpa.JpaTransactionManager.doCleanupAfterCompletion(JpaTransactionManager.java:600) ~[spring-orm-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.cleanupAfterCompletion(AbstractPlatformTransactionManager.java:1005) ~[spring-tx-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:791) ~[spring-tx-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:712) ~[spring-tx-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at quo.vadis.megasys.GraphqlTransactionInstrumentation$beginExecuteOperation$1.accept(GraphqlTransactionApplication.kt:153) ~[classes/:na]
    at quo.vadis.megasys.GraphqlTransactionInstrumentation$beginExecuteOperation$1.accept(GraphqlTransactionApplication.kt:129) ~[classes/:na]
    at graphql.execution.instrumentation.SimpleInstrumentationContext.onCompleted(SimpleInstrumentationContext.java:48) ~[graphql-java-13.0.jar:na]
    at graphql.execution.instrumentation.ChainedInstrumentation$ChainedInstrumentationContext.lambda$onCompleted$1(ChainedInstrumentation.java:254) ~[graphql-java-13.0.jar:na]
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
    at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) ~[na:na]
    at graphql.execution.instrumentation.ChainedInstrumentation$ChainedInstrumentationContext.onCompleted(ChainedInstrumentation.java:254) ~[graphql-java-13.0.jar:na]
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[na:na]
    ... 59 common frames omitted

コミット処理の後始末で、スレッドローカル変数に格納されている筈のオブジェクトが何故かないようです。

そこで色々とログを埋め込んだ結果、以下のようなことが起きてました。

  1. スレッドAでトランザクション1を開始。
  2. スレッドBでトランザクション2を開始。
  3. スレッドAでトランザクション2を終了。
  4. スレッドAでトランザクション1を終了。
    • ここで上記の例外が発生。
    • DBを確認すると、おそらくスレッドBが使っていただろうコネクションが、トランザクション中のまま。

再現プログラム

コードをそのまま載せると膨大なので、ミニミニ再現プログラムを作ってみました。

application.kt
@SpringBootApplication
class GraphqlTransactionApplication : GraphQLQueryResolver {
  @Autowired
  lateinit var userRepo: UserRepository

  @Autowired
  lateinit var transactionTemplate: TransactionTemplate

  fun users(): List<UserModel> {
    return userRepo.findAll()
  }
}

fun main(args: Array<String>) {
  runApplication<GraphqlTransactionApplication>(*args)
}

@Component
class GraphqlTransactionInstrumentation(private val transactionManager: PlatformTransactionManager) : SimpleInstrumentation() {
  override fun beginExecuteOperation(parameters: InstrumentationExecuteOperationParameters): InstrumentationContext<ExecutionResult> {
    /* 上述と同じコードなので割愛 */
  }
}

@Component
class UserModelGraphQLResolver(): GraphQLResolver<UserModel> {
  fun groups(userModel: UserModel, env: DataFetchingEnvironment): CompletableFuture<List<GroupModel>> {
    val context = env.getContext<GraphQLContext>()
    val registry = context.dataLoaderRegistry.get()
    val dataLoader = registry.getDataLoader<Int, List<GroupModel>>(UserGroupDataLoader::class.simpleName)
    return dataLoader.load(userModel.userId)
  }
}

@Component
class UserGroupDataLoader(val userGroupRepo: UserGroupRepository, val groupRepo: GroupRepository) : MappedBatchLoader<Int, List<GroupModel>> {
  override fun load(keys: MutableSet<Int>): CompletionStage<MutableMap<Int, List<GroupModel>>> {
    val userGroups = userGroupRepo.findByKeyUserIdIn(keys)
    val groups = groupRepo.findAllById(userGroups.map { it.key.groupId }.toSet()).map { it.groupId to it }.toMap()
    val result = userGroups
      .groupBy { it.key.userId }
      .map { entry -> entry.key to entry.value.map { groups[it.key.groupId]!! } }
      .toMap().toMutableMap()
    return CompletableFuture.completedFuture(result)
  }
}

@Component
class GraphQLContextBuilder(val mappedBatchLoaders: List<MappedBatchLoader<*, *>>): DefaultGraphQLServletContextBuilder() {
  val dataLoaderRegistry: DataLoaderRegistry = dataLoaderRegistry()

  override fun build(request: HttpServletRequest, response: HttpServletResponse): GraphQLContext {
    return DefaultGraphQLServletContext.createServletContext().with(request).with(response)
      .with(dataLoaderRegistry).build()
  }

  override fun build(session: Session, handshakeRequest: HandshakeRequest): GraphQLContext {
    return DefaultGraphQLWebSocketContext.createWebSocketContext().with(session).with(handshakeRequest)
      .with(dataLoaderRegistry).build()
  }

  final fun dataLoaderRegistry(): DataLoaderRegistry {
    val registry = DataLoaderRegistry()
    mappedBatchLoaders
      .forEach {
        registry.register(it::class.simpleName, DataLoader.newMappedDataLoader(it, DataLoaderOptions.newOptions().setCachingEnabled(false)))
      }
    return registry
  }
}
repositories.kt
@Entity
data class UserModel(
  @Id
  @GeneratedValue
  var userId: Int = -1,
  var userName: String = StringUtils.EMPTY
)

@Entity
data class GroupModel(
  @Id
  @GeneratedValue
  var groupId: Int = -1,
  var groupName: String = StringUtils.EMPTY
)

@Embeddable
data class UserGroupKey(
  var userId: Int = -1,
  var groupId: Int = -1
): Serializable

@Entity
data class UserGroupModel(
  @EmbeddedId
  var key: UserGroupKey = UserGroupKey()
)

interface UserRepository: JpaRepository<UserModel, Int>
interface GroupRepository: JpaRepository<GroupModel, Int>
interface UserGroupRepository: JpaRepository<UserGroupModel, UserGroupKey> {
  fun findByKeyUserIdIn(userId: Collection<Int>): List<UserGroupModel>
}
application.schema.graphqls
type User {
  userId: ID!
  userName: String!
  groups: [Group!]
}

type Group {
  groupId: ID!
  groupName: String!
}

type Query {
  users: [User!]
}

使用したOSSは以下のバージョンです。

  • Spring Boot 2.2.4.RELEASE
  • Graphql Spring Boot Starter 6.0.1

大まかな処理としては、クライアントから

query {
  users {
    userName
    groups {
      groupName
    }
  }
}

で問い合わせると、、、

  • GraphqlTransactionApplication.usersメソッドが呼出され、まずUserModelオブジェクトを返す。
  • UserModelオブジェクトにはgroupsプロパティが無いので、UserModelGraphQLResolver.groupsが呼出される。ここではMappedBatchLoaderを実装したUserGroupDataLoaderを返す。
  • UserGroupDataLoader.loadメソッドが呼出され、GraphqlTransactionApplication.usersメソッドで返したユーザのIDがリストで渡されるので、該当するGroupModelオブジェクトを返す。
    • 同じスレッドでDBアクセス処理をさせるため、CompletableFuture.completedFutureで結果を返してます。

のように動きます。

これをクライアントからマルチスレッドでアクセスすると上記の現象が発生します。
そしてgroupsプロパティを指定せず、UserGroupDataLoaderを使わないと現象は発生しません。

デバッガで追ってみると通常時は

graphql.execution.Execution
    private CompletableFuture<ExecutionResult> executeOperation(ExecutionContext executionContext, InstrumentationExecutionParameters instrumentationExecutionParameters, Object root, OperationDefinition operationDefinition) {

        InstrumentationExecuteOperationParameters instrumentationParams = new InstrumentationExecuteOperationParameters(executionContext);
        // ①
        InstrumentationContext<ExecutionResult> executeOperationCtx = instrumentation.beginExecuteOperation(instrumentationParams);

        OperationDefinition.Operation operation = operationDefinition.getOperation();

        〜〜〜割愛〜〜〜

        CompletableFuture<ExecutionResult> result;
        try {
            ExecutionStrategy executionStrategy;
            if (operation == OperationDefinition.Operation.MUTATION) {
                executionStrategy = mutationStrategy;
            } else if (operation == SUBSCRIPTION) {
                executionStrategy = subscriptionStrategy;
            } else {
                executionStrategy = queryStrategy;
            }
            log.debug("Executing '{}' query operation: '{}' using '{}' execution strategy", executionContext.getExecutionId(), operation, executionStrategy.getClass().getName());
            // ②
            result = executionStrategy.execute(executionContext, parameters);
        } catch (NonNullableFieldWasNullException e) {
            result = completedFuture(new ExecutionResultImpl(null, executionContext.getErrors()));
        }
        executeOperationCtx.onDispatched(result);
        // ③
        result = result.whenComplete(executeOperationCtx::onCompleted);
        return deferSupport(executionContext, result);
    }

①でinstrumentationのbeginExecuteOperationが呼び出されてトランザクションが開始し、②でGraphqlTransactionApplication.usersなどの処理が行われ、③でinstrumentationが返却した終了処理メソッドでコミットまたはロールバックが行われます。
同じメソッド内でトランザクションの開始と終了が行われるので、別スレッドのexecuteOperationCtxオブジェクトのonCompletedを呼び出す理由がありません。

しかし、マルチスレッドでMappedBatchLoaderを使った処理をしていると、instrumentationのonCompletedが③から直接ではなく、他のinstrumentationのonCompletedからの連鎖で呼び出されることが判りました。

accept:102, GraphqlTransactionInstrumentation$beginExecuteOperation$1 (quo.vadis.megasys)
accept:78, GraphqlTransactionInstrumentation$beginExecuteOperation$1 (quo.vadis.megasys)
onCompleted:48, SimpleInstrumentationContext (graphql.execution.instrumentation)
lambda$onCompleted$1:254, ChainedInstrumentation$ChainedInstrumentationContext (graphql.execution.instrumentation)
accept:-1, 1449497430 (graphql.execution.instrumentation.ChainedInstrumentation$ChainedInstrumentationContext$$Lambda$1476)
forEach:1540, ArrayList (java.util)
forEach:1085, Collections$UnmodifiableCollection (java.util)
onCompleted:254, ChainedInstrumentation$ChainedInstrumentationContext (graphql.execution.instrumentation)
accept:-1, 1123005170 (graphql.execution.Execution$$Lambda$1571)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
tryFire:837, CompletableFuture$UniWhenComplete (java.util.concurrent)
postComplete:506, CompletableFuture (java.util.concurrent)
complete:2073, CompletableFuture (java.util.concurrent)
lambda$handleResults$0:35, AbstractAsyncExecutionStrategy (graphql.execution)
accept:-1, 476641882 (graphql.execution.AbstractAsyncExecutionStrategy$$Lambda$1563)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
tryFire:837, CompletableFuture$UniWhenComplete (java.util.concurrent)
postComplete:506, CompletableFuture (java.util.concurrent)
complete:2073, CompletableFuture (java.util.concurrent)
lambda$each$0:40, Async (graphql.execution)
accept:-1, 1627764878 (graphql.execution.Async$$Lambda$1552)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
tryFire:837, CompletableFuture$UniWhenComplete (java.util.concurrent)
postComplete:506, CompletableFuture (java.util.concurrent)
complete:2073, CompletableFuture (java.util.concurrent)
lambda$completeValueForList$8:553, ExecutionStrategy (graphql.execution)
accept:-1, 1358196039 (graphql.execution.ExecutionStrategy$$Lambda$1560)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
tryFire:837, CompletableFuture$UniWhenComplete (java.util.concurrent)
postComplete:506, CompletableFuture (java.util.concurrent)
complete:2073, CompletableFuture (java.util.concurrent)
lambda$each$0:40, Async (graphql.execution)
accept:-1, 1627764878 (graphql.execution.Async$$Lambda$1552)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
tryFire:837, CompletableFuture$UniWhenComplete (java.util.concurrent)
postComplete:506, CompletableFuture (java.util.concurrent)
complete:2073, CompletableFuture (java.util.concurrent)
lambda$handleResults$0:35, AbstractAsyncExecutionStrategy (graphql.execution)
accept:-1, 476641882 (graphql.execution.AbstractAsyncExecutionStrategy$$Lambda$1563)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
uniWhenCompleteStage:883, CompletableFuture (java.util.concurrent)
whenComplete:2251, CompletableFuture (java.util.concurrent)
lambda$execute$1:89, AsyncExecutionStrategy (graphql.execution)
accept:-1, 403822411 (graphql.execution.AsyncExecutionStrategy$$Lambda$1554)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
tryFire:837, CompletableFuture$UniWhenComplete (java.util.concurrent)
postComplete:506, CompletableFuture (java.util.concurrent)
complete:2073, CompletableFuture (java.util.concurrent)
lambda$each$0:40, Async (graphql.execution)
accept:-1, 1627764878 (graphql.execution.Async$$Lambda$1552)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
tryFire:837, CompletableFuture$UniWhenComplete (java.util.concurrent)
postComplete:506, CompletableFuture (java.util.concurrent)
complete:2073, CompletableFuture (java.util.concurrent)
lambda$dispatchQueueBatch$2:200, DataLoaderHelper (org.dataloader)
apply:-1, 649559205 (org.dataloader.DataLoaderHelper$$Lambda$1580)
uniApplyNow:680, CompletableFuture (java.util.concurrent)
uniApplyStage:658, CompletableFuture (java.util.concurrent)
thenApply:2094, CompletableFuture (java.util.concurrent)
dispatchQueueBatch:177, DataLoaderHelper (org.dataloader)
dispatch:141, DataLoaderHelper (org.dataloader)
dispatch:461, DataLoader (org.dataloader)
accept:-1, 1574118341 (org.dataloader.DataLoaderRegistry$$Lambda$1532)
forEach:1540, ArrayList (java.util)
dispatchAll:94, DataLoaderRegistry (org.dataloader)
dispatch:290, FieldLevelTrackingApproach (graphql.execution.instrumentation.dataloader)
onFieldValuesInfo:159, FieldLevelTrackingApproach$1 (graphql.execution.instrumentation.dataloader)
lambda$onFieldValuesInfo$2:278, ChainedInstrumentation$ChainedExecutionStrategyInstrumentationContext (graphql.execution.instrumentation)
accept:-1, 968785148 (graphql.execution.instrumentation.ChainedInstrumentation$ChainedExecutionStrategyInstrumentationContext$$Lambda$1567)
forEach:1540, ArrayList (java.util)
forEach:1085, Collections$UnmodifiableCollection (java.util)
onFieldValuesInfo:278, ChainedInstrumentation$ChainedExecutionStrategyInstrumentationContext (graphql.execution.instrumentation)
lambda$execute$1:88, AsyncExecutionStrategy (graphql.execution)
accept:-1, 403822411 (graphql.execution.AsyncExecutionStrategy$$Lambda$1554)
uniWhenComplete:859, CompletableFuture (java.util.concurrent)
uniWhenCompleteStage:883, CompletableFuture (java.util.concurrent)
whenComplete:2251, CompletableFuture (java.util.concurrent)
execute:81, AsyncExecutionStrategy (graphql.execution)
executeOperation:161, Execution (graphql.execution)

何故、このようなことになるのか?GraphQL Javaは非同期用に作成されているからしょうがないのか?詳細は終えていませんが、これにより別スレッドのトランザクションが終了してしまう現象が発生しているようです。

そこで、同じスレッドで確実に呼び出されるよう、以下のように対処してみました。

    @Override
    public InstrumentationContext<ExecutionResult> beginExecuteOperation(
            InstrumentationExecuteOperationParameters parameters) {
        var tx = new TransactionTemplate(this.transactionManager);
        var op = parameters.getExecutionContext().getOperationDefinition().getOperation();
        if (!Operation.MUTATION.equals(operation)) {
            tx.setReadOnly(true);
        }
        var status = this.transactionManager.getTransaction(tx);
        return SimpleInstrumentationContext.whenDispatched { codeToRun ->
          codeToRun.join()

          if (codeToRun.isCompletedExceptionally || codeToRun.get().errors.isNotEmpty()) {
            this.transactionManager.rollback(status)
          } else {
            this.transactionManager.commit(status)
          }
        }
    }

result.whenCompleteを挟んで呼び出されるonCompletedと異なり、onDispatchedの呼び出しは確実に同じスレッドで呼び出される筈です。そしてMappedBatchLoaderの処理は同期式で行っているので、whenDispatchedが呼び出された時には既に処理が終了している筈です。ただ、念のためjoin()で処理の終了を確実に待ち合わせるようにしています。

(2020/02/25追記)
処理中に例外が発生しても、GraphQL側でExecutionResultのエラー情報に翻訳されるので、codeToRun.isCompletedExceptionallyがtrueにならないことが判りました。
なので、エラー情報が存在した場合もロールバックするようにしました。

考察

上記に載せたコードは現時点で動いていますが、弊害を精査できている訳ではないのでご注意下さい。

SpringでもReactiveでトランザクションができるようになりつつあり、もっと自分の理解が進めば、それを使ったトランザクションを実装できるようになるかもしれませんが、今時点ではここまでが精一杯です。
isCompletedExceptionally

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした