まえがき
何時もの如く、ブログからの転載です。
https://munchkins-diary.hatenablog.com/entry/2019/11/09/221708
街の名はホーチミン1区。
一夜にして崩落・再構成されパフォーマンス向上の租界となったこの都市は、マルチスレッドを臨む境界点で、極度の緊張地帯となる。
ここで世界の均衡を守るため暗躍するJava標準クラスFuture。この物語はこのFutureに挑む開発者の戦いと日常の記録である。
(血界戦線風)
ということで、久々の記事はjavaの非同期処理の返り値ラッパーである、Futureの取扱いについて、僕がよくやる実装Tipsについて書いていきたいと思います。
今回のテーマとしては、
- マルチスレッド、非同期処理でも上手いことエラーハンドリングしたい
- Futureの返り値展開をスッキリさせたい
の2つです。
1. 標準的な非同期処理とFutureの取り回し
Javaでマルチスレッド処理を書く際、多くの人は以下のような処理を書いていると思います。
- Executorsクラスでスレッドプールを作成する
- Callable/Runnableを実装したサブクラスを作成する (もしくはLambda式を使う)
- ExecutorServiceのsubmitで別スレッドに処理を任せる
- 返り値のFutureをListなどに格納する
- 全部を別スレッドに投げ終えたところでlistの中のFutureクラスのgetメソッドで同期を取る
- チェック例外の
InterruptedException
やExecutionException
を処理する
コードにするとこんな感じ。(Exceptionでウケるなとか、parallelStream使うなとかは見逃してください。)
@Slf4j
public class Main {
public static void main(String[] args) {
// If wanna make it parallel with 10 threads.
val executor = Executors.newFixedThreadPool(100);
val futureList =
IntStream.range(0,100)
.mapToObj(num -> executor.submit(() -> waitAndSaySomething(num)))
.collect(Collectors.toList());
futureList.parallelStream()
.map(future -> {
try {
return future.get();
} catch (Exception e) {
log.error("Error happened while extracting the thread result.", e);
// Do something for recover. Here, just return null.
return null;
}
}).forEach(System.out::println);
executor.shutdown();
}
static String waitAndSaySomething(int num) {
try {
Thread.sleep( num%10 * 1000);
} catch (Exception e){
// do nothing since just sample
}
if (num%2 ==0)
throw new RuntimeException("Error if odd");
return num + ": Something!";
}
}
わりとよくあるコードだけど、Futureのget部分が冗長で書くのがダルい。
しかも、どのinputがエラー起こしたのかがわからないのでエラー処理がとても難しい。
2. まずエラー処理がちゃんとできるようにしてみる
Javaでマルチスレッドを実装する時、処理にinputを渡す方法は大体次の2つのうちのどちらかになる。
-
Callable/Runnable
を実装したクラスにプロパティとして持たせて、new
する際にコンストラクタの引数で渡す。 - Lambdaの外部に宣言したfinalの変数をLambda式の中から参照する。
ただし、このいずれの場合も、エラーハンドリングは難しい。
1の場合、スレッドインスタンスを保存しておくのはそもそも微妙だし、FutureとThreadインスタンスのマッピングを何処かで管理しなきゃいけない。
2の場合、Futureからは投げた元のthreadの引数を取得できないためどのinputに対して起きたエラーなのか判別がつかない。
そこで、この問題を解決するために、以下の方法を取ることにしました。
Tupleを使ってプロパティとFutureをまとめて管理する
Tuple
は簡単に言うと、複数の値の組のことで、プログラミングでよく使われる概念です。
Javaには標準でTupleが提供されて無いのですが、同様の方法は色々あって、Common Lang
のPairやTripleクラス、reactor
のTupleクラスやJavaTuple
のPairクラスなどを使って実現できます。
(複雑なクラスじゃないので、自分で実装してもいいです。)
Tupleを使って、inputをLeftに、FutureをRightに保存するようにすることで、Error処理で入力元の値を利用したエラー処理ができるようになります。
え?propertyがたくさんある?inputが重すぎてOOM起こしそう??設計を見直さんかい。
さて、今回はほぼすべてのプロジェクトで使われてると思われる、Commons LangのPairを使ってみます。
Tupleを使うと上のmainクラスはこんな感じに書き直せます。
@Slf4j
public class Main {
public static void main(String[] args) {
// If wanna make it parallel with 10 threads.
val executor = Executors.newFixedThreadPool(100);
val futureList =
IntStream.range(0,100)
.mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
.collect(Collectors.toList());
futureList.parallelStream()
.map(future -> {
try {
return future.getRight().get();
} catch (Exception e) {
log.error("Input {} was not processed correctly.", future.getLeft(), e);
// Do something for recover. Here, just return null.
return String.format("Input %s Failed in process, damn shit!! ", future.getLeft());
}
}).forEach(System.out::println);
executor.shutdown();
}
static String waitAndSaySomething(int num) {
try {
Thread.sleep( num%10 * 1000);
} catch (Exception e){
// do nothing since just sample
}
if (num%2 ==0) {
throw new RuntimeException("Error if odd");
}
return num + ": Something!";
}
}
これで、入力値を利用したエラー処理とLoggingができるようになりました。
ただ、Futureを展開する箇所があいかわらず冗長で、なんだかイライラします。
そこで、次はこの部分を共通化してみます。
3. Futureの展開部分を共通化する
やりたいことはfutureの展開だけなので、そこだけ切り取れば、共通化は非常に簡単です。
一方で、エラー処理はinputに対して相対的に行いたいので、その部分は柔軟にできるような設計にしたいです。
そこで、エラーハンドリングの部分をExceptionとinputを利用して、任意のFunctionで処理できるように以下のようなFutureの展開クラスを用意します。
@RequiredArgsConstructor
public class FutureFlattener<L, R> implements Function<Pair<L, Future<R>>, R> {
/**
* Callback function to recover when exception such as {@link InterruptedException} or {@link
* java.util.concurrent.ExecutionException}.
*/
private final BiFunction<L, Exception, R> recoveryCallback;
@Override
public R apply(Pair<L, Future<R>> futurePair) {
try {
return futurePair.getRight().get();
} catch (Exception e) {
return recoveryCallback.apply(futurePair.getLeft(), e);
}
}
}
これを先程のMainクラスに組み込むと以下のようになります。
@Slf4j
public class Main {
public static void main(String[] args) {
// If wanna make it parallel with 10 threads.
val executor = Executors.newFixedThreadPool(100);
BiFunction<Integer,Exception,String> errorHandler =
(in, e) -> {
log.error("Input {} was not processed correctly.", in, e);
return String.format("Input %s Failed in process, damn shit!! ", in);
};
val flattener = new FutureFlattener<Integer, String>(errorHandler);
val futureList =
IntStream.range(0, 100)
.mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
.collect(Collectors.toList());
futureList
.parallelStream()
.map(flattener)
.forEach(System.out::println);
executor.shutdown();
}
static String waitAndSaySomething(int num) {
try {
Thread.sleep(num % 10 * 1000);
} catch (Exception e) {
// do nothing since just sample
}
if (num % 2 == 0) {
throw new RuntimeException("Error if odd");
}
return num + ": Something!";
}
}
しかし、せっかくFunctionインターフェイスを使ってるのに、内部で関数をプロパティに持つのは、正直めちゃくちゃダサいです。
ダサいのはいけません、クールに決めたい。
ということで、もう少しだけ拡張してみます。
4. JavaのFunctionインターフェイスを継承してエラーハンドリングを追加する
多くの他の言語がモナド型のクラスに、onCatchやthenCatchなど、Exceptionが投げられた時のためのメソッドを用意しています。
しかし、残念なことに、JavaのFunction Interfaceはcompose
, apply
, andThen
の成功を前提としたメソッドチェーンしかできません。
そこで、JavaのFunctionインターフェイスを継承して、onCatchを実装してみます。
public interface CatchableFunction<T, R> extends Function<T, R> {
/**
* by calling this method in advance of calling {@link Function#apply}, any exception thrown in
* the apply method will be handled as defined in the argument onCatch.
*
* @param onCatch callback method to handle the exception. First Type T is the input of the base
* function.
* @return fail-safe function with a callback. This method will generate a new Function instance
* instead of modifying the existing function instance.
*/
default Function<T, R> thenCatch(BiFunction<T, Exception, R> onCatch) {
return t -> {
try {
return apply(t);
} catch (Exception e) {
return onCatch.apply(t, e);
}
};
}
}
Javaの使用上、Type parameterをcatchすることはできないため、Exceptionで受けなくてはいけないのがもどかしいですが、これでかなりfunctionalに書けるようになりました。
このクラスを先程のFutureFlattenerクラスに実装すると以下のようになります。
@RequiredArgsConstructor
public class FutureFlattener<L, R> implements CatchableFunction<Pair<L, Future<R>>, R> {
@Override
public R apply(Pair<L, Future<R>> futurePair) {
try {
return futurePair.getRight().get();
} catch (InterruptedException | ExecutionException e) {
throw new FutureExpandException(e);
}
}
// To be caught in the then catch method.
private static class FutureExtractException extends RuntimeException {
FutureExpandException(Throwable cause) {
super(cause);
}
}
チェック例外はLamdba式の中で処理しなくては行けないため、FutureExtractException
でラップしてあります。
これでMain
クラスもスッキリします。
@Slf4j
public class Main {
public static void main(String[] args) {
// If wanna make it parallel with 10 threads.
val executor = Executors.newFixedThreadPool(100);
val flattener =
new FutureFlattener<Integer, String>()
.thenCatch(
(in, e) -> {
log.error("Input {} was not processed correctly.", in, e);
return String.format("Input %s Failed in process, damn shit!! ", in);
});
val futureList =
IntStream.range(0, 100)
.mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
.collect(Collectors.toList());
futureList.parallelStream().map(flattener).forEach(System.out::println);
executor.shutdown();
}
static String waitAndSaySomething(int num) {
try {
Thread.sleep(num % 10 * 1000);
} catch (Exception e) {
// do nothing since just sample
}
if (num % 2 == 0) {
throw new RuntimeException("Error if odd");
}
return num + ": Something!";
}
}
ネストが減って、関数の宣言もスッキリして、Futureの展開周りのソースもスッキリしました。
終わりに
さて、いかがだったでしょうか?
Functional Javaを使えばもっと楽に実装できたりする箇所はあるのですが、急いでいたため自前で実装してしまいました。
並列処理に関して言えば、最近はkafkaなどのメッセージキューを使って非同期かつ粗結合に作るのが基本ですが、だからといってマルチスレッドを使わないわけではありません。
一方で冗長なFuture展開はネストを増やし可読性を下げるだけでなく、最も気を使うべきエラーハンドリングに気が回らなくなります。
今回僕は上記のような解決方法を取りましたが、いかがだったでしょうか?
もっとええ方法あるで、という方がいらっしゃいましたら、コメント欄にお願いします。
それでは!