はじめに
Spring GraphQL で実装した GraphQL API を WebMVC と組み合わせて動かしていると、
すべての Resolver が直列で動いているため、 全 Resolver の実行時間が累積して応答が遅くなっていることに気付きました。
今回は、 Resolver の実行を並列化するために行った内容を記載します。
環境
- java 11
- spring-graphql 1.1.5
- spring-webmvc 5.3.29
遅くなる例
例えば、以下のようなデータ構造をホストする GraphQL API があるとします。
これに対して、こんなクエリを投げたとします。
query FindAllPRs {
pullRequests {
title
tags {
name
}
reviewers {
id
}
}
}
この実行結果は当然 Resolver の実装次第ですが、親要素、子要素を素直に分けて実装していた場合、以下のような問い合わせが直列で動くことになります。
select * from pull_request
select * from tags where pull_request_id in (...)
select * from reviewers where pull_request_id in (...)
少なくとも、子要素の情報を取得する 2. と 3. のクエリはお互いに依存関係が無いので、並列で実行しても良いはずです。
WebMVC で Resolver を並列実行する方法
公式ドキュメントに書いてありました。
Schema mapping handler methods can return:
- java.util.concurrent.Callable to have the value(s) produced asynchronously. For this to work, AnnotatedControllerConfigurer must be configured with an Executor.
つまり、以下の 2 点で対応できそうです。
- Resolver の戻り値型を
Callable<T>
にする - AnnotatedControllerConfigurer に Executor を設定する
アプリケーションが ThreadLocal で状態管理している場合の注意
Callable (Resolver の本体) を実行するスレッドは HTTP ワーカスレッドでは無くなるので、 ThreadLocal で管理しているような情報は そのままでは Resolver の実行スレッドに引き継がれません。
代表的なものではログの MDC などです。
こういった状態は ThreadLocalAccessor を実装 & Bean 登録してスレッド間で連携する必要があります。
トランザクションの扱い
スレッドごとに DB トランザクションが分かれるので注意が必要です。
採用事例はほとんど聞いたことありませんが、例えば Open-session-in-view で実装しているアプリケーションは、この対応を行う前にトランザクションスコープの見直しが必要でしょう。
また、Reader と Writer のエンドポイントを分けている場合、レプリカラグが原因で、ある Resolver が commit したデータが別の Resolver でまだ見えない、というようなことも起き得ます。
現在のリクエストを Reader, Writer どちらで処理すべきかというコンテキスト情報を ThreadLocalAccessor で引き渡す必要があるかもしれません。
実装
比較のため、非同期(並列)で実行する Resolver と、同期(直列)で実行する Resolver の両方を実装してみます。
type Query {
# 非同期(並列) API
pullRequests: [PullRequest!]!
# 同期(直列) API
syncPullRequests: [SyncPullRequest!]!
}
type PullRequest {
id: Int!
title: String!
tags: [Tag!]!
reviewers: [User!]!
}
type Tag {
id: Int!
name: String!
}
type User {
id: Int!
name: String!
}
# 同期(直列)処理の確認用
type SyncPullRequest {
id: Int!
title: String!
tags: [Tag!]!
reviewers: [User!]!
}
@Configuration(proxyBeanMethods = false)
@RequiredArgsConstructor
public class Config {
@Bean
Executor resolverExecutor() {
return Executors.newFixedThreadPool(200);
}
}
@Configuration(proxyBeanMethods = false)
@RequiredArgsConstructor
public class ApplicationPostInitializer {
private final AnnotatedControllerConfigurer configurer;
private final Executor resolverExecutor;
@PostConstruct
void init() {
configurer.setExecutor(resolverExecutor);
}
}
@Controller
@Slf4j
public class PullRequestController {
@QueryMapping
public Callable<List<PullRequest>> pullRequests() {
return () -> {
log.info("(async)pullRequests");
// 並列で動作していることを実感するために sleep を入れてみる
Thread.sleep(1000);
return List.of(PullRequest.builder().id(1).title("async PR").build());
};
}
@BatchMapping(typeName = "PullRequest", field = "tags")
public Callable<List<List<Tag>>> pullRequestTags(List<PullRequest> pullRequests) {
return () -> {
log.info("(async)pullRequestTags");
Thread.sleep(1000);
return pullRequests.stream()
.map(id -> List.of(Tag.builder().id(1).name("タグX").build()))
.collect(Collectors.toList());
};
}
@BatchMapping(typeName = "PullRequest", field = "reviewers")
public Callable<List<List<User>>> pullRequestReviewers(List<PullRequest> pullRequests) {
return () -> {
log.info("(async)pullRequestReviewers");
Thread.sleep(1000);
return pullRequests.stream()
.map(id -> List.of(User.builder().id(1).name("ユーザX").build()))
.collect(Collectors.toList());
};
}
@QueryMapping
public List<PullRequest> syncPullRequests() throws InterruptedException {
log.info("syncPullRequests");
Thread.sleep(1000);
return List.of(PullRequest.builder().id(1).title("sync PR").build());
}
@BatchMapping(typeName = "SyncPullRequest", field = "tags")
public List<List<Tag>> syncPullRequestTags(List<PullRequest> pullRequests) throws InterruptedException {
log.info("syncPullRequestTags");
Thread.sleep(1000);
return pullRequests.stream()
.map(id -> List.of(Tag.builder().id(1).name("タグX").build()))
.collect(Collectors.toList());
}
@BatchMapping(typeName = "SyncPullRequest", field = "reviewers")
public List<List<User>> syncPullRequestReviewers(List<PullRequest> pullRequests) throws InterruptedException {
log.info("syncPullRequestReviewers");
Thread.sleep(1000);
return pullRequests.stream()
.map(id -> List.of(User.builder().id(1).name("ユーザX").build()))
.collect(Collectors.toList());
}
}
実行結果
非同期 (並列) API の場合
実行ログから、各 resolver が AnnotatedControllerConfigurer に渡した Executors のスレッドプールで実行されていることがわかります。
また、子要素 (tags, reviewers) の resolver は並列で実行されており、全体として 1 (親) + 1 (子) = 約 2 秒で応答していることがわかります。
リクエスト
query AsyncFindAllPRs {
pullRequests {
title
tags {
name
}
reviewers {
id
}
}
}
実行ログ
2023-07-31 19:21:55.643 INFO 29493 --- [pool-1-thread-1] com.example.demo.PullRequestController : (async)pullRequests
2023-07-31 19:21:56.674 INFO 29493 --- [pool-1-thread-2] com.example.demo.PullRequestController : (async)pullRequestReviewers
2023-07-31 19:21:56.683 INFO 29493 --- [pool-1-thread-3] com.example.demo.PullRequestController : (async)pullRequestTags
HTTP リクエストのトレース (Chrome Dev Tool)
同期 (直列) API の場合
一方、同期の場合は HTTP のワーカスレッド上で直列に実行されています。
各 resolver で 1 秒 sleep しているので、 1 * 3 = 約 3 秒かかっています。
リクエスト
query SyncFindAllPRs {
syncPullRequests {
title
tags {
name
}
reviewers {
id
}
}
}
実行ログ
2023-07-31 19:22:34.612 INFO 29493 --- [nio-8080-exec-3] com.example.demo.PullRequestController : syncPullRequests
2023-07-31 19:22:35.617 INFO 29493 --- [nio-8080-exec-3] com.example.demo.PullRequestController : syncPullRequestReviewers
2023-07-31 19:22:36.640 INFO 29493 --- [nio-8080-exec-3] com.example.demo.PullRequestController : syncPullRequestTags
HTTP リクエストのトレース (Chrome Dev Tool)
おわりに
WebMVC で GraphQL の Resolver を並列で実行し、応答時間を短縮する方法を紹介しました。
(WebFlux ではなく) WebMVC を採用する背景としては、開発メンバーがマルチスレッドプログラミングに慣れていないからという事情があることもあります。
こういうチームでは、スレッドセーフではないコードを知らず知らずのうちに実装してしまったり、そもそもどこを非同期で実装するのが効果的か、適切に判断できなかったりすることもあります。
最初のうちは丁寧なレビューが必要かもしれません。