36
37

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Javaの並列化コードサンプル集

Last updated at Posted at 2017-03-08

Javaで並列処理をおこなう際に使うものをまとめておきます.

#並列化の種類
一口に並列化と言っても色々あって,扱うデータの種類と走らせるプログラムの個数によって主に分類できます.

  • SPSD (Single Program Single Data):これは並列化もなにもない,1つのプログラムで1つのデータを処理することです
  • SPMD (Single Program Multi Data):1つのプログラムで複数のデータを処理することです
  • MPSD (Multi Program Single Data):複数のプログラムで1つのデータを処理することです
  • MPMD (Multi Program Multi Data):複数のプログラムで複数のデータを処理することです

これらのうちどのタイプの並列化をしたいかによって書くプログラムが変わってきます.

#SPMDの並列化
これはあるクラスのメソッドを同時に実行したいときなどに使われます.以下の例ではTestJobクラスのcallメソッドに書かれた処理を実行しています.このときにメソッドの処理をしてくれればいい場合と,結果を取ってきたい場合の2通りが考えられます.

##結果がいらない場合
以下のコードのように書きます.匿名クラスを使用してジョブリストにaddしています.また, return null を忘れないでください.

TestParallel.java
public class TestParallel {
	
	static final int threadNum = 4;

	public static void main(String[] args) {
		//jobs: 実行したい処理が書かれたクラス, threadpool: 実行するスレッド
		Collection<Callable<Void>> jobs = new ArrayList<Callable<Void>>();
		ExecutorService threadpool = Executors.newFixedThreadPool(threadNum);
		for(int i = 0; i < threadNum; i++){
			jobs.add(new Callable<Void>(){
				@Override
				public Void call() throws Exception {
					//これはスレッドセーフな必要あり
					Thread.sleep(100L);
					return null;
				}
				
			});
		}
		try {
			threadpool.invokeAll(jobs); //スレッドの実行
			threadpool.shutdown();//新しいタスクを受け付けない
			if(threadpool.awaitTermination(5L,TimeUnit.MINUTES)){
				//5分待つ
				System.out.println("Finish!");
			}else{
				//5分以内に終了しない,または異常終了する
				System.out.println("Fail!");
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
			threadpool.shutdown();
		}
	}
}

##結果が欲しい場合
以下のコードのように書きます.今回は匿名クラスを使用せずにジョブ用のクラスを作成しました.結果は Future<T> という型であり,これはざっくり言うと返り値としてT型のインスタンスがそのうち返ってくるよって型です.こいつのget()メソッドを呼ぶと結果を取得するまで待機してくれます.

TestParallel.java
public class TestParallel {
	
	static final int threadNum = 4;

	public static void main(String[] args) {
		//jobs: 実行したい処理が書かれたクラス, threadpool: 実行するスレッド
		Collection<Callable<String>> jobs = new ArrayList<Callable<String>>();
		ExecutorService threadpool = Executors.newFixedThreadPool(threadNum);
		List<Future<String>> futures; //結果を格納する
		List<String> result = new ArrayList<String>();
		for(int i = 0; i < threadNum;i++){
			TestJob t = new TestJob(i);
			jobs.add(t);
		}
		try {
			futures = threadpool.invokeAll(jobs); //スレッドの実行
			threadpool.shutdown();//新しいタスクを受け付けない
			for(int i = 0; i < threadNum; i++){
				//結果の取得
				result.add(futures.get(i).get());
			}
			for(String s: result){
				//結果の表示
				System.out.println(s);
			}
		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();
			threadpool.shutdown();
		}
	}
}

class TestJob implements Callable<String>{
	private int i;
	
	public TestJob(int i){
		this.i = i;
	}
	
	@Override
	public String call() throws Exception {
		//これはスレッドセーフな必要あり
		Thread.sleep(100L);
		return "thread " + Integer.toString(i);
	}
}

##Java8のStreamを利用
ここまで紹介してきたExecutorService以外にもJava8からStreamを使用した並列処理が可能になりました.

TestParallel.java
public class TestParallel {
	
	public static void main(String[] args) {
		List<TestJob> jobs = new ArrayList<TestJob>(); 
		for(int i = 0; i < 4; i++){
			jobs.add(new TestJob(i));
		}
		jobs.parallelStream().forEach(x -> {
		   //この中間操作はスレッドセーフな必要あり
			x.doJob();
		});
	}
}

class TestJob{
	private int i;
	
	public TestJob(int i){
		this.i = i;
	}
	
	public void doJob() {
		try {
			Thread.sleep(100L);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

結果を取ってきたい場合はmain内で宣言したリストかなんかに代入するんですかね...ただその場合,代入時の操作は排他的制御をする必要がある上,どのスレッドの結果なのかはわかりませんね.Futureのように取ってくる方法を知っている方はご教授いただけると幸いです...

#MPSDやMPMDの並列化
素直にスレッドを作りましょう.MPSDは結構難しいと思うので,ここではMPMDを例に取ってコードを書きます.クエリ1群に対しては"a"を末尾に追加し,クエリ2群に対しては"b"を末尾に追加する処理です.

TestParallel.java
public class TestParallel {
	
	public static void main(String[] args) {
		List<String> query1List = new ArrayList<String>();
		List<String> res1 = new ArrayList<String>();
		List<String> query2List = new ArrayList<String>();
		List<String> res2 = new ArrayList<String>();
		for(int i = 0; i < 10; i++){
			query1List.add(Integer.toString(i));
			query2List.add(Integer.toString(i+10));
		}
		Thread t1 = new Thread(new Runnable(){
			@Override
			public void run() {
				query1List.stream().forEach(x -> {
					 res1.add(x + "a");
				});
			}
		});
		Thread t2 = new Thread(new Runnable(){
			@Override
			public void run() {
				query2List.stream().forEach(x ->{
					res2.add(x + "b");
				});
			}
		});
		t1.start();
		t2.start();
		try {
			t1.join();
			t2.join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

スレッドを終了させたい場合は Thread#interrupt() で例外を発生させて,それをスレッド内でcatchすることで終了させることができます. Thread#stop() は非推奨なのであまり使わないほうがいいみたいです.
Threadを作る手法はGUIなどを作ったことがあると実装したことがあると思います(たぶん).さらに競合を起こす部分については適宜 synchronized を使用して排他的制御をしてください.

#プロセス並列
これまではマルチスレッドで並列処理することを考えていましたが,外部プロセスを作成して並列で動かすこともできます.下のコードではhoge.tar.gzを解凍しています.ここではwatiForメソッドにより終了を待機していますが,待機せずにそのまま処理をすすめることも可能です.

TestParallel.java
public class TestParallel {
		ProcessBuilder pb = new ProcessBuilder(new String[] {"tar", "zxvf", "hoge.tar.gz"});
		try {
			Process p = pb.start();
			p.waitFor();
		} catch (IOException | InterruptedException e) {
			e.printStackTrace();
		}
}

#まとめ
個人的にJava8の記法がとても簡潔で好きでした.基本的に並列化はデータ並列で競合が起こらないところを並列化すると楽だと思います(当たり前).また,他にもこんな手法あるよって方は教えていただけると幸いです.

36
37
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
36
37

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?