Javaで並列処理をおこなう際に使うものをまとめておきます.
#並列化の種類
一口に並列化と言っても色々あって,扱うデータの種類と走らせるプログラムの個数によって主に分類できます.
- SPSD (Single Program Single Data):これは並列化もなにもない,1つのプログラムで1つのデータを処理することです
- SPMD (Single Program Multi Data):1つのプログラムで複数のデータを処理することです
- MPSD (Multi Program Single Data):複数のプログラムで1つのデータを処理することです
- MPMD (Multi Program Multi Data):複数のプログラムで複数のデータを処理することです
これらのうちどのタイプの並列化をしたいかによって書くプログラムが変わってきます.
#SPMDの並列化
これはあるクラスのメソッドを同時に実行したいときなどに使われます.以下の例ではTestJobクラスのcallメソッドに書かれた処理を実行しています.このときにメソッドの処理をしてくれればいい場合と,結果を取ってきたい場合の2通りが考えられます.
##結果がいらない場合
以下のコードのように書きます.匿名クラスを使用してジョブリストにaddしています.また, return null
を忘れないでください.
public class TestParallel {
static final int threadNum = 4;
public static void main(String[] args) {
//jobs: 実行したい処理が書かれたクラス, threadpool: 実行するスレッド
Collection<Callable<Void>> jobs = new ArrayList<Callable<Void>>();
ExecutorService threadpool = Executors.newFixedThreadPool(threadNum);
for(int i = 0; i < threadNum; i++){
jobs.add(new Callable<Void>(){
@Override
public Void call() throws Exception {
//これはスレッドセーフな必要あり
Thread.sleep(100L);
return null;
}
});
}
try {
threadpool.invokeAll(jobs); //スレッドの実行
threadpool.shutdown();//新しいタスクを受け付けない
if(threadpool.awaitTermination(5L,TimeUnit.MINUTES)){
//5分待つ
System.out.println("Finish!");
}else{
//5分以内に終了しない,または異常終了する
System.out.println("Fail!");
}
} catch (InterruptedException e) {
e.printStackTrace();
threadpool.shutdown();
}
}
}
##結果が欲しい場合
以下のコードのように書きます.今回は匿名クラスを使用せずにジョブ用のクラスを作成しました.結果は Future<T>
という型であり,これはざっくり言うと返り値としてT型のインスタンスがそのうち返ってくるよって型です.こいつのget()メソッドを呼ぶと結果を取得するまで待機してくれます.
public class TestParallel {
static final int threadNum = 4;
public static void main(String[] args) {
//jobs: 実行したい処理が書かれたクラス, threadpool: 実行するスレッド
Collection<Callable<String>> jobs = new ArrayList<Callable<String>>();
ExecutorService threadpool = Executors.newFixedThreadPool(threadNum);
List<Future<String>> futures; //結果を格納する
List<String> result = new ArrayList<String>();
for(int i = 0; i < threadNum;i++){
TestJob t = new TestJob(i);
jobs.add(t);
}
try {
futures = threadpool.invokeAll(jobs); //スレッドの実行
threadpool.shutdown();//新しいタスクを受け付けない
for(int i = 0; i < threadNum; i++){
//結果の取得
result.add(futures.get(i).get());
}
for(String s: result){
//結果の表示
System.out.println(s);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
threadpool.shutdown();
}
}
}
class TestJob implements Callable<String>{
private int i;
public TestJob(int i){
this.i = i;
}
@Override
public String call() throws Exception {
//これはスレッドセーフな必要あり
Thread.sleep(100L);
return "thread " + Integer.toString(i);
}
}
##Java8のStreamを利用
ここまで紹介してきたExecutorService以外にもJava8からStreamを使用した並列処理が可能になりました.
public class TestParallel {
public static void main(String[] args) {
List<TestJob> jobs = new ArrayList<TestJob>();
for(int i = 0; i < 4; i++){
jobs.add(new TestJob(i));
}
jobs.parallelStream().forEach(x -> {
//この中間操作はスレッドセーフな必要あり
x.doJob();
});
}
}
class TestJob{
private int i;
public TestJob(int i){
this.i = i;
}
public void doJob() {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
結果を取ってきたい場合はmain内で宣言したリストかなんかに代入するんですかね...ただその場合,代入時の操作は排他的制御をする必要がある上,どのスレッドの結果なのかはわかりませんね.Futureのように取ってくる方法を知っている方はご教授いただけると幸いです...
#MPSDやMPMDの並列化
素直にスレッドを作りましょう.MPSDは結構難しいと思うので,ここではMPMDを例に取ってコードを書きます.クエリ1群に対しては"a"を末尾に追加し,クエリ2群に対しては"b"を末尾に追加する処理です.
public class TestParallel {
public static void main(String[] args) {
List<String> query1List = new ArrayList<String>();
List<String> res1 = new ArrayList<String>();
List<String> query2List = new ArrayList<String>();
List<String> res2 = new ArrayList<String>();
for(int i = 0; i < 10; i++){
query1List.add(Integer.toString(i));
query2List.add(Integer.toString(i+10));
}
Thread t1 = new Thread(new Runnable(){
@Override
public void run() {
query1List.stream().forEach(x -> {
res1.add(x + "a");
});
}
});
Thread t2 = new Thread(new Runnable(){
@Override
public void run() {
query2List.stream().forEach(x ->{
res2.add(x + "b");
});
}
});
t1.start();
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
スレッドを終了させたい場合は Thread#interrupt()
で例外を発生させて,それをスレッド内でcatchすることで終了させることができます. Thread#stop()
は非推奨なのであまり使わないほうがいいみたいです.
Threadを作る手法はGUIなどを作ったことがあると実装したことがあると思います(たぶん).さらに競合を起こす部分については適宜 synchronized
を使用して排他的制御をしてください.
#プロセス並列
これまではマルチスレッドで並列処理することを考えていましたが,外部プロセスを作成して並列で動かすこともできます.下のコードではhoge.tar.gzを解凍しています.ここではwatiForメソッドにより終了を待機していますが,待機せずにそのまま処理をすすめることも可能です.
public class TestParallel {
ProcessBuilder pb = new ProcessBuilder(new String[] {"tar", "zxvf", "hoge.tar.gz"});
try {
Process p = pb.start();
p.waitFor();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
#まとめ
個人的にJava8の記法がとても簡潔で好きでした.基本的に並列化はデータ並列で競合が起こらないところを並列化すると楽だと思います(当たり前).また,他にもこんな手法あるよって方は教えていただけると幸いです.