Java
平行処理
JavaDay 6

JavaのCyclicBarrierを使って平行処理を行う

More than 1 year has passed since last update.


概要

Javaの平行処理ライブラリjava.util.concurrentにCyclicBarrierというクラスがあります。

CyclicBarrierは複数のスレッドに平行処理をさせるとき、同期を取るのに役立ちます。

このCyclicBarrierクラスを使って、実際に平行処理をやってみます。


CyclicBarrierの仕組み

CyclicBarrierは、公式ドキュメントにもある通り、複数のスレッドが特定のポイントまで到達するのを待機できるようにする同期化支援機能です。

複数スレッドが待ち合わせる待機ポイントのことを、バリアーと呼びます。

バリアーに全てのスレッドが到達すると、バリアーアクションと呼ばれる処理を実行できます。

1.JPG

この仕組みは、一つのCyclicBarrierインスタンスによって複数回実行できます。

2.JPG

繰り返し実行できることから、Cyclic(循環式)という名前が付いています。


CyclicBarrierの使い方

1. CyclicBarrierクラスのインスタンスを生成します。このとき、以下をコンストラクタ引数で指定します。


  • 同期を取るスレッドの数

  • バリアーアクションを実装したクラスのインスタンス

CyclicBarrier barrier = new CyclicBarrier(N, new BarrierAction());

2. ワーカースレッドを実行します。ワーカースレッドにはCyclicBarrierインスタンスの参照を持たせます。

ExecutorService service = Executors.newCachedThreadPool();

service.submit(new WorkerThread(barrier));

3. ワーカースレッドは任意のタイミングでCyclicBarrierクラスのawaitメソッドを呼び出し、処理を中断します。


WorkerThreadクラス

    @Override

public void run(){
// before process...
barrier.await();
// after process...
}

4. ワーカースレッドの中断が、コンストラクタで設定したスレッド数に達すると、想定しているワーカースレッドが全て中断している事になります。このとき、CyclicBarrierクラスはバリアーアクションを実行します。


BarrierActionクラス

    @Override

public void run(){
// execute command...
}

5. バリアーアクションの実行が終わると、中断していた全てのワーカースレッドが処理を再開します。

6. 3~5の処理を繰り返します。


サンプルプログラム


概要

ファイルバックアップ機能を作成します。


■ バックアップ対象


  • 売上帳票

  • 在庫帳票

  • クーポン帳票


■ 機能要件


  • バックアップ対象期間を指定できるようにします。

  • 帳票は日付単位で、ひとつのアーカイブファイルに圧縮して保存します。


設計

対象期間の帳票に対して、以下の処理を実行します。

1. 同じ日付の帳票ファイルを、workフォルダにコピーします。

2. workフォルダに集めた同じ日付の帳票ファイルを圧縮し、アーカイブフォルダに保存します。

image.png


■ CyclicBarrierの役割

この機能におけるCyclicBarrierの役割は以下です。

機能   
CyclicBarrierの役割 

ファイルをWORKフォルダにコピー
スレッドの同期

WORKフォルダのファイルの圧縮
共通処理の実行

3.JPG


■ フォルダ構成

フォルダの構成は以下です。

C:

└─test
├─archive(圧縮した帳票の格納先フォルダ)

├─config
│ config.properties

├─dailyReport
│ ├─coupon ← 帳票「クーポン」フォルダ
│ │ coupon_20171001.csv
│ │ coupon_20171002.csv
│ │ coupon_20171003.csv
│ │
│ ├─sales ← 帳票「売上」フォルダ
│ │ sales_20171001.csv
│ │ sales_20171002.csv
│ │ sales_20171003.csv
│ │
│ └─stock ← 帳票「在庫」フォルダ
│ stock_20171001.csv
│ stock_20171002.csv
│ stock_20171003.csv

└─work (帳票ファイルの一時格納先)


実装


■ 管理クラス

サイクリックバリアのインスタンスを生成し、処理をキックします。


管理クラス

package sample;

import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.LocalDate;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Controller {

public static void main(String[] args){

LocalDate startDate = LocalDate.parse(args[0]);
LocalDate endDate = LocalDate.parse(args[1]);

if(startDate.isAfter(endDate)){
throw new IllegalArgumentException(MessageFormat.format("開始日付が終了日付より後になっています。 開始日付:{0} 終了日付:{1}",startDate,endDate));
}

// バックアップ対象期間を保持するQUEUE
ConcurrentLinkedQueue<LocalDate> dateQueue = Util.createDateQueue(startDate,endDate);

// 処理対象帳票SET
ConcurrentSkipListSet<Path> reportPathSetInWorkDir = new ConcurrentSkipListSet<Path>();

/*
* サイクリックバリアのインスタンスを生成する。
* ・待ち合わせるワーカースレッドの数には帳票の種類の数を指定する。
* ・共通処理を実行するため、バリアーアクションを引数に指定する。
*/

CyclicBarrier barrier = new CyclicBarrier(Report.values().length, new BarrierAction(dateQueue,reportPathSetInWorkDir));

// Executorを使ってワーカースレッドをキックする。
ExecutorService service = Executors.newCachedThreadPool();

for(Report report: Report.values()){
service.submit(new CopyFile(report, dateQueue, reportPathSetInWorkDir, barrier));
}
service.shutdown();
}
}



■ ワーカースレッド

ワーカースレッドはファイルコピーを行います。


ワーカースレッド

package sample;

import static java.lang.System.*;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.LocalDate;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CyclicBarrier;

/**
* ファイルコピーを行うワーカースレッド
* @author nakam
*
*/

public class CopyFile implements Runnable {

/** 担当する帳票 */
private Report report;

/** 対象期間 */
private ConcurrentLinkedQueue<LocalDate> dateQueue;

/** WORKフォルダ内の帳票ファイル */
private ConcurrentSkipListSet<Path> reportPathSetInWorkDir;

/** CyclicBarrierで同期を取る */
private CyclicBarrier barrier;

public CopyFile(Report report, ConcurrentLinkedQueue<LocalDate> dateQueue,ConcurrentSkipListSet<Path> reportPathSetInWorkDir, CyclicBarrier barrier) {
this.report = report;
this.dateQueue = dateQueue;
this.reportPathSetInWorkDir = reportPathSetInWorkDir;
this.barrier = barrier;
}

@Override
public void run(){

FilePath filePath = new FilePath();

while(!dateQueue.isEmpty()){

try {
Path src = filePath.getReportPath(report.getConfigKey(), dateQueue.peek());
Path dst = filePath.getWorkDirPath().resolve(src.getFileName());

// WORKフォルダに帳票ファイルをコピーする
Files.copy(src, dst);
out.println(MessageFormat.format("ファイルをコピーしました。コピー元:{0} コピー先:{1}",src,dst));
reportPathSetInWorkDir.add(dst);

// サイクリックバリアを使って待機する
barrier.await();
} catch (IOException | InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}



■ バリアーアクション

バリアーアクションは、ファイルの圧縮と保存を行います。


バリアーアクション

package sample;

import java.nio.file.Path;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;

/**
* バリアーアクションとして圧縮処理を行う。
* @author nakam
*
*/

public class BarrierAction implements Runnable {

/** 対象期間 */
private ConcurrentLinkedQueue<LocalDate> dateQueue;

/** WORKフォルダ内の帳票ファイル */
private ConcurrentSkipListSet<Path> reportPathSetInWorkDir;

public BarrierAction(ConcurrentLinkedQueue<LocalDate> dateQueue, ConcurrentSkipListSet<Path> reportPathSetInWorkDir) {
this.dateQueue = dateQueue;
this.reportPathSetInWorkDir = reportPathSetInWorkDir;
}

@Override
public void run() {

if(!dateQueue.isEmpty()){

// WORKフォルダにある帳票ファイルを圧縮し、アーカイブフォルダに保存する
Compress.execute(new ArrayList<Path>(reportPathSetInWorkDir), new FilePath().getArchivePath(dateQueue.poll()));
reportPathSetInWorkDir.clear();
}
}
}



■ その他のクラス

その他、以下のクラスを使っていますが、CyclicBarrierと関係がないので割愛します。

興味がある方は、Githubにソースコードを置いていますので、参照してください。

クラス名
処理内容

Compress
ファイルの圧縮処理

ConfigKey
設定ファイルのキーを保持する

ConfigUtil
設定ファイルをロードする

ExternalCommand
Javaから外部コマンドを実行する

FilePath
ファイルパスを生成する

Report
帳票の種類を保持する

Util
ユーティリティ


テスト


実行

以下の引数を与えて実行します。バックアップ対象期間を(2017/10/01 - 2017/10/03)に指定しています。


Java引数

2017-10-01 2017-10-03



実行結果


コンソール出力

ファイルをコピーしました。コピー元:C:\test\dailyReport\sales\sales_20171001.csv コピー先:C:\test\work\sales_20171001.csv

ファイルをコピーしました。コピー元:C:\test\dailyReport\coupon\coupon_20171001.csv コピー先:C:\test\work\coupon_20171001.csv
ファイルをコピーしました。コピー元:C:\test\dailyReport\stock\stock_20171001.csv コピー先:C:\test\work\stock_20171001.csv

7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04

Scanning the drive:
3 files, 43 bytes (1 KiB)

Creating archive: C:\test\archive\archive_20171001.7z

Items to compress: 3

Files read from disk: 3
Archive size: 238 bytes (1 KiB)

Everything is Ok
外部コマンド「C:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171001 C:\test\work\coupon_20171001.csv C:\test\work\sales_20171001.csv C:\test\work\stock_20171001.csv」を実行しました。終了コード:0
ファイルをコピーしました。コピー元:C:\test\dailyReport\sales\sales_20171002.csv コピー先:C:\test\work\sales_20171002.csv
ファイルをコピーしました。コピー元:C:\test\dailyReport\stock\stock_20171002.csv コピー先:C:\test\work\stock_20171002.csv
ファイルをコピーしました。コピー元:C:\test\dailyReport\coupon\coupon_20171002.csv コピー先:C:\test\work\coupon_20171002.csv

7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04

Scanning the drive:
3 files, 43 bytes (1 KiB)

Creating archive: C:\test\archive\archive_20171002.7z

Items to compress: 3

Files read from disk: 3
Archive size: 240 bytes (1 KiB)

Everything is Ok
外部コマンド「C:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171002 C:\test\work\coupon_20171002.csv C:\test\work\sales_20171002.csv C:\test\work\stock_20171002.csv」を実行しました。終了コード:0
ファイルをコピーしました。コピー元:C:\test\dailyReport\sales\sales_20171003.csv コピー先:C:\test\work\sales_20171003.csv
ファイルをコピーしました。コピー元:C:\test\dailyReport\stock\stock_20171003.csv コピー先:C:\test\work\stock_20171003.csv
ファイルをコピーしました。コピー元:C:\test\dailyReport\coupon\coupon_20171003.csv コピー先:C:\test\work\coupon_20171003.csv

7-Zip [64] 16.04 : Copyright (c) 1999-2016 Igor Pavlov : 2016-10-04

Scanning the drive:
3 files, 43 bytes (1 KiB)

Creating archive: C:\test\archive\archive_20171003.7z

Items to compress: 3

Files read from disk: 3
Archive size: 239 bytes (1 KiB)

Everything is Ok
外部コマンド「C:\Program Files\7-Zip\7z.exe a -sdel C:\test\archive\archive_20171003 C:\test\work\coupon_20171003.csv C:\test\work\sales_20171003.csv C:\test\work\stock_20171003.csv」を実行しました。終了コード:0


ワーカースレッドがファイルをコピーした後、バリアーアクションが実行されているのが分かります。

処理は3回実行されますが、ファイルコピーだけは平行で実行されているため、コピーが終わる順番が違っているところがあります。

その他の処理は、CyclicBarrierが同期を取っており、逐次的に実行されています。


テスト実行後のフォルダ

C:

└─test
├─archive
│ archive_20171001.7z
│ archive_20171002.7z
│ archive_20171003.7z

├─config
│ config.properties

├─dailyReport
│ ├─coupon
│ │ coupon_20171001.csv
│ │ coupon_20171002.csv
│ │ coupon_20171003.csv
│ │
│ ├─sales
│ │ sales_20171001.csv
│ │ sales_20171002.csv
│ │ sales_20171003.csv
│ │
│ └─stock
│ stock_20171001.csv
│ stock_20171002.csv
│ stock_20171003.csv

└─work


考察

CyclicBarrierは飽くまでも同期化支援機能であるため、CyclicBarrierを使わなくても同じ処理を実装できます。今回実装したレベルのプログラムでは、Thread.joinを使ってワーカースレッドの同期を取っても、問題なく動作するでしょう。しかしその場合は、管理クラスが中央集権的に管理を担い、毎回ひとつのファイルをコピーするだけのワーカースレッドを生成する事になります。

CyclicBarrierを使えば、スレッドを毎回生成することなく、同じスレッドに処理を任せる事ができます。スレッド間の同期はCyclicBarrierが担当してくれるので、ワーカースレッドの生成後、管理クラスは処理を終了して構いません。バリアーアクションは、スレッドが待ち合わせたタイミングで共有リソースを更新するか、共通処理を実行する役割を担います。

■ サンプルプログラムでのバリアーアクション

役割
処理内容

共有リソースの更新
処理対象日付を進める

共有処理の実行
帳票ファイルの圧縮保存


感想

管理クラスがいなくても、CyclicBrrierを仲立ちにして、ワーカースレッドとバリアーアクションが同期を取りながら処理を進めていくのが面白いと思います。


サンプルプログラム格納先

Github


参考

Java SE 8 & JDK 9 クラスCyclicBarrier

テクニカルブログ 第51回 シンクロナイザ編 CyclicBarrier


実行環境


  • java version 9.0.1

  • 7-Zip 16.04