CompletableFuture を使ったリトライ処理にはいろいろな書き方があるが、以前共通で処理時間を計測しようとしてうまく行かなかったり、意図せずデッドロックが発生したりしたので、そこで得た知見をまとめる。
exceptionallyで再帰実行する
以前別のコードを見て、この書き方をしていた。
private CompletableFuture<String> foo(final int cnt) {
return CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.println("hello");
if (1 == 1) {
throw new RuntimeException();
}
return "hello";
}, Executors.newSingleThreadExecutor())
.exceptionally(t -> {
if (cnt > 0) {
System.err.println("retry: " + cnt);
foo(cnt - 1);
}
return null;
});
}
しかし、以下のような感じで、非同期処理の処理時間を共通的に計測しようとして都合が悪いことに気づいた。
long startTime = System.currentTimeMillis();
foo(3)
.exceptionally(t -> {
System.out.println("finish error");
return null;
})
.thenAccept(s -> System.out.println("finished: " + (System.currentTimeMillis() - startTime)));
上記のコードを実行した結果は以下の通り。
hello
retry: 3
finished: 1031
hello
retry: 2
hello
retry: 1
hello
このようにこの書き方では、return される CompletableFuture とリトライ時で処理されるCompletableFuture が別になる。
そのため、全体の処理時間を計測するにはそれぞれの機能で集計する必要があった。
retry処理を待つ
前述の問題を解消するため、以下のように join
で待つことにした。
private CompletableFuture<String> foo(final int cnt) {
return CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.println("hello");
if (1 == 1) {
throw new RuntimeException();
}
return "hello";
}, Executors.newSingleThreadExecutor())
.exceptionally(t -> {
if (cnt > 0) {
System.err.println("retry: " + cnt);
foo(cnt - 1).join();
}
return null;
});
}
当然ながら、ブロックする分スレッドは無駄になるが、リトライ処理は基本発生しない上、試した機能ではそこまでスレッドプールを使わないので妥協した。
hello
retry: 3
hello
retry: 2
hello
retry: 1
hello
finished: 4071
一応このやり方であれば上記のように意図した通り実行時間を計測できる。
が、この書き方だとエラーによってデッドロックすることがあるようだ。
exceptionally で同期実行し、それを繋ぐ
回避方法の1つとしてはこんな感じで実際のロジックと非同期・再実行制御を分けること。
private String bar() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.println("hello");
if (1 == 1) {
throw new RuntimeException();
}
return "hello";
}
private CompletableFuture<String> foo(final int cnt) {
var cf = CompletableFuture.supplyAsync(() -> bar(),
Executors.newSingleThreadExecutor());
for (int i = 0; i < cnt; i++) {
cf = cf.exceptionally(t -> {
System.err.println("retry");
bar();
return null;
});
}
return cf;
}
上記のように exceptionally をつなげ、その中の処理を同期実行することで、return した CompletableFuture で処理時間を計測できる。
hello
retry
hello
retry
hello
retry
hello
finish error
finished: 4042
基本的にこのやり方が使えればそれがシンプルだと思うのだが、exceptionally で非同期メソッドを実行する場合、そこでブロックする必要がある。
( thenCompose の exceptionally 版のようなメソッドがあれば良いのだが )
CompletableFuture#complete で処理完了を通知する
もう1つの回避方法は return する CompletableFuture と、処理に使う非同期処理を別にするやり方だ。
private CompletableFuture<String> foo(final int cnt, CompletableFuture<String> cf) {
final var completableFuture = cf == null
? new CompletableFuture<String>() : cf;
CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.println("hello");
if (1 == 1) {
throw new RuntimeException();
}
completableFuture.complete("finished");
return "hello";
}, Executors.newSingleThreadExecutor()).exceptionally(t -> {
if (cnt > 0) {
System.err.println("retry: " + cnt);
foo(cnt - 1, completableFuture);
} else {
completableFuture.completeExceptionally(t);
}
return null;
});
return completableFuture;
}
こんな感じで complete や completeExceptionally を使うと、処理の終了タイミングを制御しやすくなる。
この方法を使うと、最初に書いた exceptionally で再帰実行して場合に処理が途切れる問題を回避できる。
long startTime = System.currentTimeMillis();
foo(3, null)
.exceptionally(t -> {
System.out.println("finish error");
return null;
})
.thenAccept(s -> System.out.println("finished: " + (System.currentTimeMillis() - startTime)));
hello
retry: 3
hello
retry: 2
hello
retry: 1
hello
finish error
finished: 4054
ただし、呼び出し先のメソッドに CompletableFuture を渡す必要が発生する。
この方法では他の方法よりも余分に CompletableFuture を生成するが、他の非同期処理や並列処理などと組み合わせて書ける。
ただ、シンプルさで言えば exceptionally を繋いだ書き方の方がシンプルなので、使い分けるのが良さそうだ。