LoginSignup
6
6

More than 3 years have passed since last update.

JavaのFutureの取り回しがダルすぎるので色々工夫してみる。

Posted at

まえがき

何時もの如く、ブログからの転載です。
https://munchkins-diary.hatenablog.com/entry/2019/11/09/221708

街の名はホーチミン1区。
一夜にして崩落・再構成されパフォーマンス向上の租界となったこの都市は、マルチスレッドを臨む境界点で、極度の緊張地帯となる。
ここで世界の均衡を守るため暗躍するJava標準クラスFuture。この物語はこのFutureに挑む開発者の戦いと日常の記録である。
(血界戦線風)

ということで、久々の記事はjavaの非同期処理の返り値ラッパーである、Futureの取扱いについて、僕がよくやる実装Tipsについて書いていきたいと思います。

今回のテーマとしては、

  • マルチスレッド、非同期処理でも上手いことエラーハンドリングしたい
  • Futureの返り値展開をスッキリさせたい

の2つです。

1. 標準的な非同期処理とFutureの取り回し

Javaでマルチスレッド処理を書く際、多くの人は以下のような処理を書いていると思います。

  1. Executorsクラスでスレッドプールを作成する
  2. Callable/Runnableを実装したサブクラスを作成する (もしくはLambda式を使う)
  3. ExecutorServiceのsubmitで別スレッドに処理を任せる
  4. 返り値のFutureをListなどに格納する
  5. 全部を別スレッドに投げ終えたところでlistの中のFutureクラスのgetメソッドで同期を取る
  6. チェック例外のInterruptedExceptionExecutionExceptionを処理する

コードにするとこんな感じ。(Exceptionでウケるなとか、parallelStream使うなとかは見逃してください。)

@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 10 threads.
    val executor = Executors.newFixedThreadPool(100);
    val futureList =
    IntStream.range(0,100)
        .mapToObj(num -> executor.submit(() -> waitAndSaySomething(num)))
        .collect(Collectors.toList());
    futureList.parallelStream()
        .map(future -> {
          try {
            return future.get();
          } catch (Exception e) {
            log.error("Error happened while extracting the thread result.", e);
            // Do something for recover. Here, just return null.
            return null;
          }
        }).forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep( num%10 * 1000);
    } catch (Exception e){
      // do nothing since just sample
    }
    if (num%2 ==0)
      throw new RuntimeException("Error if odd");
   return num + ": Something!";
  }
}

わりとよくあるコードだけど、Futureのget部分が冗長で書くのがダルい。
しかも、どのinputがエラー起こしたのかがわからないのでエラー処理がとても難しい。

2. まずエラー処理がちゃんとできるようにしてみる

Javaでマルチスレッドを実装する時、処理にinputを渡す方法は大体次の2つのうちのどちらかになる。

  1. Callable/Runnableを実装したクラスにプロパティとして持たせて、newする際にコンストラクタの引数で渡す。
  2. Lambdaの外部に宣言したfinalの変数をLambda式の中から参照する。

ただし、このいずれの場合も、エラーハンドリングは難しい。

1の場合、スレッドインスタンスを保存しておくのはそもそも微妙だし、FutureとThreadインスタンスのマッピングを何処かで管理しなきゃいけない。
2の場合、Futureからは投げた元のthreadの引数を取得できないためどのinputに対して起きたエラーなのか判別がつかない。

そこで、この問題を解決するために、以下の方法を取ることにしました。

Tupleを使ってプロパティとFutureをまとめて管理する

Tupleは簡単に言うと、複数の値の組のことで、プログラミングでよく使われる概念です。

Javaには標準でTupleが提供されて無いのですが、同様の方法は色々あって、Common LangPairTripleクラス、reactorTupleクラスやJavaTuplePairクラスなどを使って実現できます。

(複雑なクラスじゃないので、自分で実装してもいいです。)

Tupleを使って、inputをLeftに、FutureをRightに保存するようにすることで、Error処理で入力元の値を利用したエラー処理ができるようになります。
え?propertyがたくさんある?inputが重すぎてOOM起こしそう??設計を見直さんかい。

さて、今回はほぼすべてのプロジェクトで使われてると思われる、Commons LangのPairを使ってみます。
Tupleを使うと上のmainクラスはこんな感じに書き直せます。


@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 10 threads.
    val executor = Executors.newFixedThreadPool(100);
    val futureList =
    IntStream.range(0,100)
        .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
        .collect(Collectors.toList());
    futureList.parallelStream()
        .map(future -> {
          try {
            return future.getRight().get();
          } catch (Exception e) {
            log.error("Input {} was not processed correctly.", future.getLeft(), e);
            // Do something for recover. Here, just return null.
            return String.format("Input %s Failed in process, damn shit!! ", future.getLeft());
          }
        }).forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep( num%10 * 1000);
    } catch (Exception e){
      // do nothing since just sample
    }
    if (num%2 ==0) {
      throw new RuntimeException("Error if odd");
    }
   return num + ": Something!";
  }
}

これで、入力値を利用したエラー処理とLoggingができるようになりました。

ただ、Futureを展開する箇所があいかわらず冗長で、なんだかイライラします。

そこで、次はこの部分を共通化してみます。

3. Futureの展開部分を共通化する

やりたいことはfutureの展開だけなので、そこだけ切り取れば、共通化は非常に簡単です。
一方で、エラー処理はinputに対して相対的に行いたいので、その部分は柔軟にできるような設計にしたいです。

そこで、エラーハンドリングの部分をExceptionとinputを利用して、任意のFunctionで処理できるように以下のようなFutureの展開クラスを用意します。

@RequiredArgsConstructor
public class FutureFlattener<L, R> implements Function<Pair<L, Future<R>>, R> {
  /**
   * Callback function to recover when exception such as {@link InterruptedException} or {@link
   * java.util.concurrent.ExecutionException}.
   */
  private final BiFunction<L, Exception, R> recoveryCallback;

  @Override
  public R apply(Pair<L, Future<R>> futurePair) {
    try {
      return futurePair.getRight().get();
    } catch (Exception e) {
      return recoveryCallback.apply(futurePair.getLeft(), e);
    }
  }
}

これを先程のMainクラスに組み込むと以下のようになります。

@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 10 threads.
    val executor = Executors.newFixedThreadPool(100);
    BiFunction<Integer,Exception,String> errorHandler =
        (in, e) -> {
          log.error("Input {} was not processed correctly.", in, e);
          return String.format("Input %s Failed in process, damn shit!! ", in);
        };
    val flattener = new FutureFlattener<Integer, String>(errorHandler);
    val futureList =
        IntStream.range(0, 100)
            .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
            .collect(Collectors.toList());
    futureList
        .parallelStream()
        .map(flattener)
        .forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep(num % 10 * 1000);
    } catch (Exception e) {
      // do nothing since just sample
    }
    if (num % 2 == 0) {
      throw new RuntimeException("Error if odd");
    }
    return num + ": Something!";
  }
}

しかし、せっかくFunctionインターフェイスを使ってるのに、内部で関数をプロパティに持つのは、正直めちゃくちゃダサいです。

ダサいのはいけません、クールに決めたい。
ということで、もう少しだけ拡張してみます。

4. JavaのFunctionインターフェイスを継承してエラーハンドリングを追加する

多くの他の言語がモナド型のクラスに、onCatchやthenCatchなど、Exceptionが投げられた時のためのメソッドを用意しています。
しかし、残念なことに、JavaのFunction Interfaceはcompose, apply, andThenの成功を前提としたメソッドチェーンしかできません。

そこで、JavaのFunctionインターフェイスを継承して、onCatchを実装してみます。

public interface CatchableFunction<T, R> extends Function<T, R> {

  /**
   * by calling this method in advance of calling {@link Function#apply}, any exception thrown in
   * the apply method will be handled as defined in the argument onCatch.
   *
   * @param onCatch callback method to handle the exception. First Type T is the input of the base
   *     function.
   * @return fail-safe function with a callback. This method will generate a new Function instance
   *     instead of modifying the existing function instance.
   */
  default Function<T, R> thenCatch(BiFunction<T, Exception, R> onCatch) {
    return t -> {
      try {
        return apply(t);
      } catch (Exception e) {
        return onCatch.apply(t, e);
      }
    };
  }
}

Javaの使用上、Type parameterをcatchすることはできないため、Exceptionで受けなくてはいけないのがもどかしいですが、これでかなりfunctionalに書けるようになりました。

このクラスを先程のFutureFlattenerクラスに実装すると以下のようになります。



@RequiredArgsConstructor
public class FutureFlattener<L, R> implements CatchableFunction<Pair<L, Future<R>>, R> {

  @Override
  public R apply(Pair<L, Future<R>> futurePair) {
    try {
      return futurePair.getRight().get();
    } catch (InterruptedException | ExecutionException e) {
      throw new FutureExpandException(e);
    }
  }

  // To be caught in the then catch method.
  private static class FutureExtractException extends RuntimeException {
    FutureExpandException(Throwable cause) {
      super(cause);
    }
  }

チェック例外はLamdba式の中で処理しなくては行けないため、FutureExtractExceptionでラップしてあります。
これでMainクラスもスッキリします。

@Slf4j
public class Main {
  public static void main(String[] args) {
    // If wanna make it parallel with 10 threads.
    val executor = Executors.newFixedThreadPool(100);
    val flattener =
        new FutureFlattener<Integer, String>()
            .thenCatch(
                (in, e) -> {
                  log.error("Input {} was not processed correctly.", in, e);
                  return String.format("Input %s Failed in process, damn shit!! ", in);
                });
    val futureList =
        IntStream.range(0, 100)
            .mapToObj(num -> Pair.of(num, executor.submit(() -> waitAndSaySomething(num))))
            .collect(Collectors.toList());
    futureList.parallelStream().map(flattener).forEach(System.out::println);
    executor.shutdown();
  }

  static String waitAndSaySomething(int num) {
    try {
      Thread.sleep(num % 10 * 1000);
    } catch (Exception e) {
      // do nothing since just sample
    }
    if (num % 2 == 0) {
      throw new RuntimeException("Error if odd");
    }
    return num + ": Something!";
  }
}

ネストが減って、関数の宣言もスッキリして、Futureの展開周りのソースもスッキリしました。

終わりに

さて、いかがだったでしょうか?
Functional Javaを使えばもっと楽に実装できたりする箇所はあるのですが、急いでいたため自前で実装してしまいました。

並列処理に関して言えば、最近はkafkaなどのメッセージキューを使って非同期かつ粗結合に作るのが基本ですが、だからといってマルチスレッドを使わないわけではありません。

一方で冗長なFuture展開はネストを増やし可読性を下げるだけでなく、最も気を使うべきエラーハンドリングに気が回らなくなります。

今回僕は上記のような解決方法を取りましたが、いかがだったでしょうか?
もっとええ方法あるで、という方がいらっしゃいましたら、コメント欄にお願いします。

それでは!

6
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
6