0
1

More than 3 years have passed since last update.

Java で gRPC サーバーを書くことによって Java 知識をアップデートする (2)

Last updated at Posted at 2020-08-19

gRPC サーバーを書いて理解するだけなのだが、なかなか知らないことが多くて先に進まない。ただし、自分なりにゆっくりやってみる。
二回目のトピックは Future と Callable インターフェイスについてだ。これを理解しないことには、gRPC は理解できない。この辺りは、C# の Task や async/await の仕組みに近そうな香りを感じるのでお題として

FanOut/FanIn を Java で実装してみようと思う。

Callable インターフェイス

単純にT型の戻り値を戻すようなインターフェイスになっている。

@FunctionalInterface
public interface Callable<V> {
/**
 * Computes a result, or throws an exception if unable to do so.
 *
 * @return computed result
 * @throws Exception if unable to compute a result
 */

V call() throws Exception;
}

こちらを実装した、実装クラスを作ってみる。こちらが並列で動作するパートになる。複数のスレッドで並列で動かすことを想定しているので、CurrentThread の id を表示するようにしている。ここで一つ気になるのが、Thread.sleep() を使ってよかったのだろうか?ということだ。C# の async/await だと、これはご法度で、理由は、async コールが、常に Thread を占有するわけではないので、Thread を sleep してしまうと都合が悪いからだ。Java Delay – 4 Ways to Add Delay in Javaを読んでも特にコメントが出てこないので、今のうちは使っておこう。

Activity

public class Activity implements Callable<String> {
    public String call() throws Exception {
        Thread currentThread = Thread.currentThread();
        for (int i = 0; i < 100; i++) {
            Thread.sleep(100);
            System.out.println("[" + currentThread.getId() + "] Activity: " + i);
        }
        System.out.println("[" + currentThread.getId() +"] Done");
        return "Thread [" + currentThread.getId() + "] has done.";
    }
}

さて、これを実行する側のクラスを作成する。並列でActivityを処理させて、全部終了すると結果を表示して終わらせたい。
Java の場合は、ExecutorService というものを使うようだ。Executors には複数メソッドが定義されており、今使っているのは、固定数のスレッドプールを作るもの、シングルのスレッドプールを作るもの、キャッシュするものなど複数のものが定義されていた。この場合は5つのスレッドプールを持っている状態になる。ということは、C#と違って、スレッドを使う時々は占有しているので、Thread.sleep() でもよいのだろうか?

Future interface

Future は、C# で言う Task に相当するもののイメージだ。このオブジェクトをスレッド実行時には戻り値としてもどして、並行作業を開始する。get メソッドを発行すると、スレッドの処理が終わるまでブロックする。

public interface Future<V> 
{
    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

このFuture を作成するのが、executor.submit メソッドだ。ここに、先ほどの Activity を渡してあげると、並列で処理が実施される。Future のオブジェクトをリストに格納する。そして、executor.awaitTermination で待ち受ける。最終的な結果は、get() メソッドで取得して表示する。ところが、ここで問題がある。

public class FanOutFanIn {
    public void execute() throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        List<Future<String>> results = new ArrayList<>();

        for (int i = 0; i < 10 ; i++) {
            Future<String> result = executor.submit(new Activity());
            results.add(result);
        }

        executor.awaitTermination(30, TimeUnit.SECONDS);

        for (int i = 0; i < results.size(); i++) {
            System.out.println(results.get(i).get());
        }

    }
}

私の本来のイメージはすべての処理が完了、もしくはタイムアウトで実行結果を表示という感じだったが、実はそうならない。awaitTermination は、すべての処理が終わっても、ブロックし続けて、このコードだと、30秒経過しないと次のブロックに進んでくれない、、、ううむダサい。

CompletableFuture

そこで、CompletabledFuture というものがあるらしい。実際のインターフェイスの実装は長いので省略するが、FutureCompletionStage インターフェイスを実装したクラスだ。Future のうち、明確な「完了」を持っている。これは様々な用途につかえるようだが、FanOut/FanIn シナリオにも向いているらしい。上記のやつで自分の望んだ動作をするものを書いてみる。

CompletableFuture は、Callable が使えないので、Runnable で実装している。ちょっとダサいところがあって、多分もっといい書き方ができると思うけど、Runnable は戻り値を返せない。さっきのコードと同じふるまい(String を返す)をしようと思ったら、苦肉の策だけど、getResult というメソッドを実装してみた。うん。絶対もっといい書き方あるはず。是非コメントを。

public class RunnableActivity implements Runnable {
    private String result;
    @Override
    public void run() {
        Thread currentThread = Thread.currentThread();
        for (int i = 0; i < 100; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("[" + currentThread.getId() + "] Activity: " + i);
        }
        System.out.println("[" + currentThread.getId() +"] Done");
        this.result =  "Thread [" + currentThread.getId() + "] has done.";
    }

    public String getResult() {
        return this.result;
    }
}

自分で、runAsync を書いているあたり、だっさださなんだが、Runnable の戻り値を無理やり返すために自分で書いている。うん。絶対もっといい方法あるはずだw 先ほどと似ているけど、CompletableFuture を使っているため、Runnable を使っているところ、そして、CompletableFuture.allOf メソッドで、今回は気持ちよく、全部の終了を待ち受けしてくれる。これが欲しかったやつですわ。

終わったら、前回と同じようなループでもいいけど、しっかり全部のスレッドがちゃんと終わってたら、Linq 的な書き方で、ラムダとして値をとってきて出力している。

public class CompletableFanOutFanIn {

        public void execute() throws InterruptedException, ExecutionException {
           ExecutorService executor = Executors.newFixedThreadPool(5);
           // ExecutorService executor = Executors.newSingleThreadExecutor();
            List<CompletableFuture<String>> results = new ArrayList<>();

            for (int i = 0; i < 10; i++) {
                CompletableFuture<String> result = runAsync(new RunnableActivity(), executor);
                results.add(result);
            }
            CompletableFuture<Void> cf = CompletableFuture.allOf(results.toArray(new CompletableFuture[10]));

            cf.whenComplete((ret, ex) -> {
               if(ex == null) {
                   String msg = results.stream().map(future -> {
                       try {
                           return future.get();
                       } catch (Exception iex) {
                           return iex.toString();
                       }
                   }).collect(Collectors.joining("\n"));
                   System.out.println("result = " + msg);
               } else {
                   System.err.println(ex);
               }
            });
        }

        private CompletableFuture<String> runAsync(RunnableActivity runnable, ExecutorService executor) {
            CompletableFuture<String> cf = new CompletableFuture<>();
            executor.execute(() -> {
                runnable.run();
                cf.complete(runnable.getResult());
            });
            return cf;
        }
}

CompletableFuture 改

上記のがダサいので、こんな風に変えてみた。.NET のプログラミングモデルと、Java のプログラミングモデルが違うので、考え方を変えたほうがよさそうだ。Node に近いノリを感じる。多分、Java が得意な人はもっとかっこいい書き方をしそう。あと、Streammap とかflatmap があったので、関数型のコードはそれでいけそうやな。この場合、ArrayListってコンカレントな処理に耐えられるんだろうか?

       public void execute() throws InterruptedException, ExecutionException {
           ExecutorService executor = Executors.newFixedThreadPool(5);
           // ExecutorService executor = Executors.newSingleThreadExecutor();
            List<CompletableFuture<Void>> tasks = new ArrayList<>();
            List<String> results = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                CompletableFuture<Void> task= CompletableFuture.supplyAsync(this::runTask, executor).thenAcceptAsync((ret) -> {
                     results.add(ret);
                 });
                tasks.add(task);
            }


            CompletableFuture<Void> cf = CompletableFuture.allOf(tasks.toArray(new CompletableFuture[10]));

            cf.whenComplete((ret, ex) -> {
               if(ex == null) {
                   results.stream().forEach(System.out::println);
               } else {
                   System.err.println(ex);
               }
            });
        }

        private String runTask() {
            Thread currentThread = Thread.currentThread();
            for (int i = 0; i < 100; i++) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("[" + currentThread.getId() + "] Activity: " + i);
            }
            System.out.println("[" + currentThread.getId() +"] Done");
            return  "Thread [" + currentThread.getId() + "] has done.";
        }

Result

ちなみに、C#のTaskだと、こうならず、Thread.id が共有されたりするが、Javaは明確に1スレッドのようだ。

[23] Activity: 0
[22] Activity: 0
[21] Activity: 0
[20] Activity: 0
[24] Activity: 0
[20] Activity: 1
[23] Activity: 1
[22] Activity: 1
[24] Activity: 1
[21] Activity: 1
[20] Activity: 2
     :
[20] Activity: 97
[24] Activity: 97
[22] Activity: 97
[20] Activity: 98
[23] Activity: 98
[21] Activity: 98
[24] Activity: 98
[22] Activity: 98
[22] Activity: 99
[22] Done
[24] Activity: 99
[24] Done
[23] Activity: 99
[23] Done
[21] Activity: 99
[21] Done
[20] Activity: 99
[20] Done
result = Thread [20] has done.
Thread [21] has done.
Thread [22] has done.
Thread [23] has done.
Thread [24] has done.
Thread [21] has done.
Thread [20] has done.
Thread [22] has done.
Thread [23] has done.
Thread [24] has done.

まとめ

正直まだまだ Java8 以降のコンカレントプログラムには自信がない。じっくり時間をとって、がっつり学んでみよう。Pluralsight あたりで。焦らず、一歩一歩。まずは当初に思っていたことを最初の一歩だけど実装できたので今日は良しとする。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1