gRPC サーバーを書いて理解するだけなのだが、なかなか知らないことが多くて先に進まない。ただし、自分なりにゆっくりやってみる。
二回目のトピックは Future
と Callable
インターフェイスについてだ。これを理解しないことには、gRPC
は理解できない。この辺りは、C# の Task や async/await の仕組みに近そうな香りを感じるのでお題として
FanOut/FanIn を Java で実装してみようと思う。
Callable インターフェイス
単純にT型の戻り値を戻すようなインターフェイスになっている。
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
こちらを実装した、実装クラスを作ってみる。こちらが並列で動作するパートになる。複数のスレッドで並列で動かすことを想定しているので、CurrentThread の id を表示するようにしている。ここで一つ気になるのが、Thread.sleep()
を使ってよかったのだろうか?ということだ。C# の async/await だと、これはご法度で、理由は、async コールが、常に Thread を占有するわけではないので、Thread を sleep してしまうと都合が悪いからだ。Java Delay – 4 Ways to Add Delay in Javaを読んでも特にコメントが出てこないので、今のうちは使っておこう。
Activity
public class Activity implements Callable<String> {
public String call() throws Exception {
Thread currentThread = Thread.currentThread();
for (int i = 0; i < 100; i++) {
Thread.sleep(100);
System.out.println("[" + currentThread.getId() + "] Activity: " + i);
}
System.out.println("[" + currentThread.getId() +"] Done");
return "Thread [" + currentThread.getId() + "] has done.";
}
}
さて、これを実行する側のクラスを作成する。並列でActivityを処理させて、全部終了すると結果を表示して終わらせたい。
Java の場合は、ExecutorService
というものを使うようだ。Executors
には複数メソッドが定義されており、今使っているのは、固定数のスレッドプールを作るもの、シングルのスレッドプールを作るもの、キャッシュするものなど複数のものが定義されていた。この場合は5つのスレッドプールを持っている状態になる。ということは、C#と違って、スレッドを使う時々は占有しているので、Thread.sleep()
でもよいのだろうか?
Future interface
Future は、C# で言う Task
に相当するもののイメージだ。このオブジェクトをスレッド実行時には戻り値としてもどして、並行作業を開始する。get
メソッドを発行すると、スレッドの処理が終わるまでブロックする。
public interface Future<V>
{
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
このFuture
を作成するのが、executor.submit
メソッドだ。ここに、先ほどの Activity
を渡してあげると、並列で処理が実施される。Future
のオブジェクトをリストに格納する。そして、executor.awaitTermination
で待ち受ける。最終的な結果は、get()
メソッドで取得して表示する。ところが、ここで問題がある。
public class FanOutFanIn {
public void execute() throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(5);
List<Future<String>> results = new ArrayList<>();
for (int i = 0; i < 10 ; i++) {
Future<String> result = executor.submit(new Activity());
results.add(result);
}
executor.awaitTermination(30, TimeUnit.SECONDS);
for (int i = 0; i < results.size(); i++) {
System.out.println(results.get(i).get());
}
}
}
私の本来のイメージはすべての処理が完了、もしくはタイムアウトで実行結果を表示という感じだったが、実はそうならない。awaitTermination
は、すべての処理が終わっても、ブロックし続けて、このコードだと、30秒経過しないと次のブロックに進んでくれない、、、ううむダサい。
CompletableFuture
そこで、CompletabledFuture
というものがあるらしい。実際のインターフェイスの実装は長いので省略するが、Future
と CompletionStage
インターフェイスを実装したクラスだ。Future
のうち、明確な「完了」を持っている。これは様々な用途につかえるようだが、FanOut/FanIn
シナリオにも向いているらしい。上記のやつで自分の望んだ動作をするものを書いてみる。
CompletableFuture
は、Callable
が使えないので、Runnable
で実装している。ちょっとダサいところがあって、多分もっといい書き方ができると思うけど、Runnable
は戻り値を返せない。さっきのコードと同じふるまい(String を返す)をしようと思ったら、苦肉の策だけど、getResult
というメソッドを実装してみた。うん。絶対もっといい書き方あるはず。是非コメントを。
public class RunnableActivity implements Runnable {
private String result;
@Override
public void run() {
Thread currentThread = Thread.currentThread();
for (int i = 0; i < 100; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("[" + currentThread.getId() + "] Activity: " + i);
}
System.out.println("[" + currentThread.getId() +"] Done");
this.result = "Thread [" + currentThread.getId() + "] has done.";
}
public String getResult() {
return this.result;
}
}
自分で、runAsync
を書いているあたり、だっさださなんだが、Runnable の戻り値を無理やり返すために自分で書いている。うん。絶対もっといい方法あるはずだw 先ほどと似ているけど、CompletableFuture
を使っているため、Runnable
を使っているところ、そして、CompletableFuture.allOf
メソッドで、今回は気持ちよく、全部の終了を待ち受けしてくれる。これが欲しかったやつですわ。
終わったら、前回と同じようなループでもいいけど、しっかり全部のスレッドがちゃんと終わってたら、Linq 的な書き方で、ラムダとして値をとってきて出力している。
public class CompletableFanOutFanIn {
public void execute() throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(5);
// ExecutorService executor = Executors.newSingleThreadExecutor();
List<CompletableFuture<String>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
CompletableFuture<String> result = runAsync(new RunnableActivity(), executor);
results.add(result);
}
CompletableFuture<Void> cf = CompletableFuture.allOf(results.toArray(new CompletableFuture[10]));
cf.whenComplete((ret, ex) -> {
if(ex == null) {
String msg = results.stream().map(future -> {
try {
return future.get();
} catch (Exception iex) {
return iex.toString();
}
}).collect(Collectors.joining("\n"));
System.out.println("result = " + msg);
} else {
System.err.println(ex);
}
});
}
private CompletableFuture<String> runAsync(RunnableActivity runnable, ExecutorService executor) {
CompletableFuture<String> cf = new CompletableFuture<>();
executor.execute(() -> {
runnable.run();
cf.complete(runnable.getResult());
});
return cf;
}
}
CompletableFuture 改
上記のがダサいので、こんな風に変えてみた。.NET のプログラミングモデルと、Java のプログラミングモデルが違うので、考え方を変えたほうがよさそうだ。Node に近いノリを感じる。多分、Java が得意な人はもっとかっこいい書き方をしそう。あと、Stream
にmap
とかflatmap
があったので、関数型のコードはそれでいけそうやな。この場合、ArrayListってコンカレントな処理に耐えられるんだろうか?
public void execute() throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(5);
// ExecutorService executor = Executors.newSingleThreadExecutor();
List<CompletableFuture<Void>> tasks = new ArrayList<>();
List<String> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
CompletableFuture<Void> task= CompletableFuture.supplyAsync(this::runTask, executor).thenAcceptAsync((ret) -> {
results.add(ret);
});
tasks.add(task);
}
CompletableFuture<Void> cf = CompletableFuture.allOf(tasks.toArray(new CompletableFuture[10]));
cf.whenComplete((ret, ex) -> {
if(ex == null) {
results.stream().forEach(System.out::println);
} else {
System.err.println(ex);
}
});
}
private String runTask() {
Thread currentThread = Thread.currentThread();
for (int i = 0; i < 100; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("[" + currentThread.getId() + "] Activity: " + i);
}
System.out.println("[" + currentThread.getId() +"] Done");
return "Thread [" + currentThread.getId() + "] has done.";
}
Result
ちなみに、C#のTaskだと、こうならず、Thread.id が共有されたりするが、Javaは明確に1スレッドのようだ。
[23] Activity: 0
[22] Activity: 0
[21] Activity: 0
[20] Activity: 0
[24] Activity: 0
[20] Activity: 1
[23] Activity: 1
[22] Activity: 1
[24] Activity: 1
[21] Activity: 1
[20] Activity: 2
:
[20] Activity: 97
[24] Activity: 97
[22] Activity: 97
[20] Activity: 98
[23] Activity: 98
[21] Activity: 98
[24] Activity: 98
[22] Activity: 98
[22] Activity: 99
[22] Done
[24] Activity: 99
[24] Done
[23] Activity: 99
[23] Done
[21] Activity: 99
[21] Done
[20] Activity: 99
[20] Done
result = Thread [20] has done.
Thread [21] has done.
Thread [22] has done.
Thread [23] has done.
Thread [24] has done.
Thread [21] has done.
Thread [20] has done.
Thread [22] has done.
Thread [23] has done.
Thread [24] has done.
まとめ
正直まだまだ Java8 以降のコンカレントプログラムには自信がない。じっくり時間をとって、がっつり学んでみよう。Pluralsight あたりで。焦らず、一歩一歩。まずは当初に思っていたことを最初の一歩だけど実装できたので今日は良しとする。
- Callable Future Example
- JavaのCallableとFutureで、スレッドで実行した結果を受け取れるよ!
- Class Executors
- How To Get Thread ID In Java With Example
- Thread.stop、Thread.suspend、 Thread.resume、および Runtime.runFinalizersOnExit が推奨されない理由
- CompletableFutureの使い方の基本形とFutureTaskとの連携例
- CompletableFuture.anyOf - Help required on the language used