Java
java8
並列処理
JavaDay 12

さあ、並列プログラミングをはじめよう

More than 1 year has passed since last update.

この記事はJava Advent Calendar 2016の12日目です。

前日は、leak4mk0さんの「Railsアプリケーションでより良いリソース設計を実装するためのテクニック」 でした。

次は、tkxlabさんのJavaSEでもオブジェクト指向データベース(JPA)をとなります。


はじめに

「フリーランチの終焉」と言われマルチコアの時代に突入してはや数年。

Webアプリを作る事が多かったのも理由ですが、ほとんどシングルスレッド(サーブレッドとかでマルチスレッドを意識するケースはあるけど)で開発してた私にも、今年は並列プログラミングの波が押し寄せてきました。

そんなわけで今年のアドベントカレンダーは並列プログラミング入門に関してまとめてみます。


Java 8標準の並列API

Javaは当初よりマルチスレッドプログラミングを喧伝してただけあって、それなりに並列プログラミングがやりやすい言語です。

バージョンアップと共に着々と拡張されていて、現在は下記のAPIがあります。


  • Thread

  • Executor framework

  • Fork/Join

  • CompletableFuture

  • ParallelStream

それでは、一つ一つ使い方を見ていきましょう。


Threadクラス

「ボクのこと忘れてください」

と言うのは半分冗談ですが、ほとんどのケースで素のThreadクラスを直接使うべきでは無いでしょう。

私も最初は昔の知識をベースに素のThreadを使うところから始めましたし、マルチスレッドでググると未だに上位にヒットしますが、

FutureやParallelStreamを使ったほうがずっと簡単ですし、Threadで出来る事はExecutorで大抵よりスマートに出来ます。


Executor framework

スレッドプールをベースとしたシンプルな並列処理APIです。

処理を「Task」と言われる粒度に分解して考え、スレッドプールに割り当てて実行します。

TaskはRunnableインタフェースを継承したクラスまたはラムダ式で表現するため、実質的には「スレッドプールをサポートしたThreadの上位互換」として利用することが出来るでしょう。

また通常はFutureを取り扱えるExecutorServiceというExecutor(名前がややこしい!)を使います。

public static void main(String[] args) throws Exception {

ExecutorService es = Executors.newFixedThreadPool(2);
try {
es.execute(() -> System.out.println("executor:1, thread-id:" + Thread.currentThread().getId()));
es.execute(() -> System.out.println("executor:2, thread-id:" + Thread.currentThread().getId()));
es.execute(() -> System.out.println("executor:3, thread-id:" + Thread.currentThread().getId()));
es.execute(() -> System.out.println("executor:4, thread-id:" + Thread.currentThread().getId()));
es.execute(() -> System.out.println("executor:5, thread-id:" + Thread.currentThread().getId()));
} finally {
es.shutdown();
es.awaitTermination(1, TimeUnit.MINUTES);
}
}

実行結果は下記のようになります。

executor:1, thread-id:11

executor:2, thread-id:11
executor:3, thread-id:12
executor:4, thread-id:12
executor:5, thread-id:12

executeを5回実行しているのにスレッドはnewFixedThreadPoolで指定した2個が使い回されている事が分かります。

ExecutorsはExecutorServiceを作るファクトリで以下のような戦略のスレッドプールを作るメソッドを備えています。

メソッド名
概要

newCachedThreadPool
必要に応じ、新規スレッドを作成するが利用可能な場合には以前に構築されたスレッドを再利用するスレッド・プール

newFixedThreadPool
指定された固定数のスレッドを再利用するスレッド・プール

newWorkStealingPool
CPUのコア数の最大値または指定された並列数を保つスレッドプール。各スレッドにタスクのキューが割り当てられ、キューに空きが出来ると他のスレッドからタスクを横取り(Work Stealing)して処理する

newWorkStealingPoolはJava8から登場した新しいメソッドです。スレッドの割り当てを効率的に実施してくれるので、特に問題なければこれを採用する形になるでしょう。


Fork/Join

Unixのプロセス管理のようなForkとJoinを使って並列プログラミングを行います。

再帰による分割統治のようなCPUヘビーな処理を実行することを念頭に作られているAPIです。

Work Stealingアルゴリズムを採用したForkJoinPoolを用いるため、Executorよりも粒度が小さいタスクを効率的に実行することを念頭に置かれています。

ただ、登場したJava7当時とは異なり、Java8ではExecutorにもnewWorkStealingPoolがあるので、実行効率としてはあまり変わりないんじゃないかという気もします。(※ 未確認)

再帰の高速化が得意ということなので、お馴染みのフィボナッチ数列を求めるサンプルを作ってみます。

まずは比較のために通常の再帰で作ったフィボナッチ数列。

public static int fib(int n) {

if (n == 0) {
return 0;
} else if (n == 1) {
return 1;
} else {
return fib(n - 1) + fib(n - 2);
}
}

public static void main(String[] args) {
System.out.println(fib(45));
}

これをForkJoinで実装すると下記のようになります。

static class FibonacciTask extends RecursiveTask<Integer> {

private final int n;

FibonacciTask(int n) {
this.n = n;
}

@Override
protected Integer compute() {
if (n == 0) {
return 0;
} else if (n == 1) {
return 1;
} else {
FibonacciTask f1 = new FibonacciTask(n - 1);
FibonacciTask f2 = new FibonacciTask(n - 2);

f2.fork();
return f1.compute() + f2.join();
}
}
}

public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
System.out.println(pool.invoke(new FibonacciTask(45)));
}

RecursiveTaskを継承したFibonacciTaskを作って処理を書いています。

元々、両方とも再帰を使って求めていたf1:fib(n-1)とf2:fib(f-2)の値の取得をf1は再帰、f2はfork/joinを使って実現しているのがポイントです。

forkをするたびに非同期タスクが新規に作られるため並列度が上がっていきます。f1は再帰による計算ですから、f2の結果を待つことなくどんどん処理が実行されていくので、

結果として大量の非同期タスクが出来ることになり並列計算が進む形になります。

invokeメソッドは「処理が終わるまで同期的に待ち戻り値を返す」メソッドです。Task.fork.joinのラッパーメソッドともいえます。

ただ、私の何か書き方ミスってるのかオーバーヘッドの問題なのか、私の今回書いたコードだとシングルスレッドで回してる単純なコードの方がfork/joinより倍近く速いという。。。

なので、実際に使うときは高速化されるかをちゃんと測ったほうが良いですね。ユースケースによって変わってくるでしょうし。


CompletableFuture

CompletableFutureはJava8から入った並列プログラミングのデザインパターンの一つであるFuture/promiseを実現するためのAPIです。

自分の理解としては非同期プログラミングを同期プログラミング的に扱うためのデザインパターンで、Java8の場合は非同期性をスレッドにより担保しているという所でしょうか。

こんな感じで書きます。

public static void main(String[] args) throws InterruptedException, ExecutionException {

ExecutorService es = Executors.newWorkStealingPool();

// 単純な非同期実行
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "heavy process", es);
System.out.println(future.get());

//合成も可能
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "task1", es);
CompletableFuture<String> f2 = f1.thenApply((x) -> x + ":task2");
System.out.println(f2.get());

// Listに詰めたり、一括処理したり
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int n = i;
futures.add(CompletableFuture.runAsync(() -> System.out.println("task" + n), es));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
}

このサンプルではExecutorServiceを引数として渡しています。

こうすることで、JavaEEなどミドルでスレッドを制御している場合もそれに対応したExecutorServiceがあれば、競合することなく活用することが出来ます。

デフォルトだとForkJoinPool.commonPool() を使っているようです。

まず、単純な非同期実行ですがsupplyAsyncやrunAsyncなどでラムダ式を引数にとって実行する形です。getを実行することでFutureの終了を待ち戻り値を受け取ります。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "heavy process", es);

System.out.println(future.get());

CompletableFutureの便利なところはFutureという形で非同期処理自体を変数として取り扱えるので、ある非同期処理の結果を引数にとる処理を簡潔に書けることです。

たとえば、Webページをとってきてそれを加工してDBに登録という処理があった場合、通常はコールバック関数を引数に渡すことになります。これは簡単にコールバック地獄を招きます。

CompletableFutureの場合はthenApplyなどを使って、そういった非同期処理の後処理を下記のようにコールバックを用いずに表現することが出来ます。

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "task1", es);

CompletableFuture<String> f2 = f1.thenApply((x) -> x + ":task2");
System.out.println(f2.get());

こういった処理は「合成」という言い方をされるので、慣れ無いと「合成って何さ?」って感じになると思いますが、そんなに大したことじゃないのが分かってもらえるかと思います。

また、CompletableFutureは単なる値なのでListに詰め込むこともできますし、allOfを使って配列(メソッド的には可変長引数)を使って同時実行することも可能です。

List<CompletableFuture<Void>> futures = new ArrayList<>();

for (int i = 0; i < 10; i++) {
final int n = i;
futures.add(CompletableFuture.runAsync(() -> System.out.println("task" + n), es));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));

CompletableFutureの機能は他にも盛沢山なのですが、ちょっと書ききれないので、この辺で割愛。


ParallelStream

Java8で入った並列性といえば、こちらをイメージする人が多いのではないでしょうか?

並列コレクション操作のParallelStreamです。

並列コレクションは関数型言語のような宣言型で副作用のない言語では昔から利用されていましたが、Java8でついにJavaにもついに導入されました。

これは今までとはずいぶん趣が違って、並行性や非同期性を直接取り扱うのではなく、システム側で並列性を自動でマネージする方式です。

明示的な並列操作とは違い実現できることに制約が付きますが、コードが非常に簡潔になります。また、以外と多くの並列化による高速化要件のユースケースをカバーできたりもします。

まずはコードを見てみましょう。

public static void main(String[] args){

IntStream.range(1, 10).boxed()
.parallel()
.map(x -> "task" + x)
.forEach(System.out::println);
}

ほとんど普通のStreamと区別が付きませんね? 違いは.parallelが付くところになります。

これだけで並列処理が組めますし、Streamなので後続の処理で前の処理の戻り値を使うのは当たり前なので「Futureの合成」みたいな話もシンプルです。

非常に強力かつ簡潔なAPIですが注意点もあります。それは並列プログラミングだということを忘れないことです。

変数でも外部のリソースでも排他などの並列性をサポートした環境以外のものを使うとあっさり壊れるので注意が必要です。


非標準ライブラリ

Javaの標準ライブラリ以外にも並列性をサポートするライブラリはあります。

私はまだ試せてないのがほとんどですが、記載だけしておきます。


Apache Spark

http://spark.apache.org/

大規模バッチを実行するための分散処理基盤です。

プログラミングモデルについてはParallelStreamと同様に並列コレクションを採用しています。

これは他のライブラリと違い使ったことがあるので、下記に書いた記事とかを参考いただければと思います。


Akka

http://doc.akka.io/docs/akka/snapshot/intro/getting-started.html

アクターモデルを実現するためのライブラリ/ミドルウェアです。Scala由来のものですがJavaでも使えます。

スレッドより軽量なアクターを扱うための並列プログラミングのライブラリではありますが、それ以上に分散システム基盤としてよく使われている認識です。


JCSP

https://www.cs.kent.ac.uk/projects/ofa/jcsp/

Javaでプロセス代数をやるためのライブラリのようです。

プロセス代数は並列計算を数学的に取り扱う手法なので、今度試してみたいです。


まとめ

さて、簡単ではありますがJavaでの並列プログラミングに関してまとめてみました。

多くの方がやはり素のマルチスレッドのイメージが強いと思いますがJavaもずいぶんと変わってきています。

基本的にはParallelStream、それで難しいケースならCompletableFutureが今後の主流になりそうです。

いずれもスレッド間の値渡しのために共通変数を必ずしも使わないので、安全なコードが書きやすくなっていると思います。

今回は紹介しませんでしたが、どうしても共通変数が必要な場合もノンブロッキング系のコレクションを使うこともできるケースがありますので、必要に応じてそれらも活用していくことになるかと思います。

また、最近は単一マシンでの並列プログラミングを超えて分散システムによる大規模並列もクラウドのおかげで身近になってきました。

このあたりもしっかり勉強して押さえていきたいですね。

それでは来年もHappy Hacking!


参考