1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【JavaGold】並列処理

Last updated at Posted at 2024-12-28

並列処理

スレッド

プログラムを実行した場合の処理の最小単位のこと。

  • javaでは指定したクラスのmain()メソッドを実行する際、新しくスレッドを生成している。
  • プログラムの実行単位を複数のスレッドに分割して実行することをマルチスレッドと呼ぶ。
  • スレッドの制御にはThreadクラスとRunnableインターフェースを用いる。
    java.lang.Thread
    java.lang.Runnable
  • スレッドの実行は実行可能状態の時にメソッドの呼び出しで行う。Runnable.run()メソッド
    Thread.start()メソッド
  • start()メソッドの中ではオーバーライドしたrun()メソッドを呼び出している
  • Threadクラスを継承したクラスを作り、runメソッドをオーバーライドする
  • Runnableインタフェースを実現したクラスを作り、runメソッドを実装する
実装例
public class Sample {
    public static void main(String[] args) {
        Thread t = new Thread(new Runnable() {
            @Override 
            public void run() {
                System.out.println("A");
            }
        });
        t.start();
        System.out.println("B");
    }
}
実装例
public class Sample {
    public static void main(String[] args) {
        Thread t = new Thread(() -> {
            System.out.println("A");
        });
        t.start();
        System.out.println("B");
    }
}
実行結果
A
B

image.png

状態 説明
実行可能 スレッドが生成され、実行の機械が 与えられるのを待っている状態(アイドリング状態)
実行中 ディスクの入力操作や、スレッドの排他制御や同期処理などにより、 スレッドが実行され、処理を行っている状態。
実行不可能 スレッドの動作が一時的に休止している状態。
終了 run()メソッドの処理が終わり、スレッドが終了した状態。

スレッドの優先度

・各スレッドには優先度が設定されている
・実行可能状態のスレッドが競合した場合、高い優先度をもつスレッドから
 実行される
・スレッドの実行はOSに依存する為、必ず優先度順に実行されることが
 保証されていない。

優先度を取得・設定するメソッド
メソッド名 説明
static Thread currentThread() 現在実行中のスレッドオブジェクトを取得。
final String getName() スレッドの名前を返す。
final int getPriority() スレッドの優先度を返す。
final void setPriority(int newPriority) スレッドの優先度を変更する。 1~10まで引数で指定できる。デフォルトは5
優先度を表す定数
定数名 説明
MAX_PRIORITY 最大値。10を表す
NORMPRIORITY デフォルト値。5を表す
MIN_PRIORITY 最小値。1を表す
実装例
public class Main {
    public static void main(String[] args) {
        Thread t = new Thread() {
            @Override 
            public synchronized void start() {
                super.start();
                System.out.println("A");
            }
            @Override 
            public void run() {
                super.run();
                System.out.println("B");
            }
        };
        t.run();
    }
}
実行結果
B

新しいスレッドを作成するのは、Threadクラスのstartメソッド。
startメソッドを呼び出さずに、runメソッドを呼び出しても、新しいスレッドが開始されることはないため、「B」のみが表示される。

スレッドの制御

  • スレッドの制御は、Threadクラスのメソッドとして提供されている
  • sleep(long) 引数ミリ秒呼び出したスレッドが休止する
  • join() 実行中のスレッドが終了するまで待機
  • yield() 呼び出したスレッドを休止し、他のスレッドを実行する
  • interrupt() 休止中のスレッドに割り込みを入れる
  • 割り込まれたスレッドはInterruptedException例外を受け取り、処理を再開する。
実装例
public class Test05 {														
    public static void main(String[] args) {							
        Runnable timertask = ()->{								
            int cnt=0;										
            while(cnt<5) {										
                sleep(1000);    //指定された時間待機						
                System.out.println(++cnt+"s");								
            }															
        };														
        Runnable task1 = ()->{												
            System.out.println(Thread.currentThread().getName() + ":task1開始");
            Thread.yield(); // 他のスレッドを実行					
            sleep(1000);											
            System.out.println(Thread.currentThread().getName() + ":task1終了");
        };								
        Runnable task2 = ()->{							
            System.out.println(Thread.currentThread().getName() + ":task2開始");
            System.out.println(Thread.currentThread().getName() + ":task2終了");
        };																
        Thread thread_0 = new Thread(task1);			
        Thread thread_1 = new Thread(task2);				
        Thread timer = new Thread(timertask);				
        timer.start();				
        thread_0.start();											
        thread_1.start();											
        try {													
            timer.join();   //スレッドが終了するまで待機					
        } catch (InterruptedException e) {							
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "スレッド");
	}
    static void sleep(long time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }    
}
実行結果
Thread-0:task1開始
Thread-1:task2開始
Thread-1:task2終了
1s
Thread-0:task1終了
2s	
3s	
4s	
5s
mainスレッド	

マルチスレッド

  • Threadクラスを継承したクラスを作り、runメソッドをオーバーライドする。
  • Runnableインタフェースを実現したクラスを作り、runメソッドを実装する。

排他制御と同期制御

複数のスレッドを運用する際、同じオブジェクトを操作することがあります。
その際の値の取り扱いを制御することです。

排他制御

  • メソッド内の処理が行われている間は、他スレッドの参照を禁止する
  • 競合が発生しないよう排他制御行う場合は、synchronizedキーワードを使用する。
    synchronized void method()
  • method(){ synchronized (this){・・・}
  • 複数のメソッドにまたがる排他制御を実現するには、ReentrantLockクラスを使用。
実装例
import java.util.concurrent.locks.ReentrantLock;

public class Sample {
    private final ReentrantLock lock = new ReentrantLock();
    public void test() throws Exception {
        try {
            lock.lock();
        } finally {
            lock.unlock();
        }
    }
}

同期制御

・実行順序を制御する
・set()が実行されてからprint()を実行したい
・Objectクラスには制御用のメソッドがある
・wait() 現在のスレッドを待機
・notify 待機中のスレッドを1つ再開させる
・notifyAll 待機中のスレッドを全て再開させる

デッドロック

全てのスレッドが待機状態になり、ロックを開放するnotify()メソッドを呼ぶスレッドがいない状態。
排他制御された複数のインスタンスを共有していて、それぞれのインスタンス同士が連携することで発生する待ち状態を意味する。

ライブロック

ライブロックに関係するプロセスは、相手の状態によって、自身の状態も定期的に変化していき、その結果としてお互いに進展しない状態を表している。

並行処理ユーティリティ

並行処理プログラミングにおける様々なユーティリティ機能の実装を目的としたライブラリ群です。

  • 主なパッケージ
    java.util.concurrent パッケージ
    java.util.conccurent.atomic パッケージ
    java.util.concurrent.locks パッケージ
  • 許可されていない操作に対する例外としてConcurrentModificationExceptionをスローします。

並行処理ユーティリティが提供している機能

スレッド・プール

  • スレッドを貯蔵(Pool)する仕組み
  • スレッド生成のシステム負荷(オーバーヘッド)を軽減する

並行コレクション

  • マルチスレッド環境下で安全に使用できるコレクションクラスを提供
  • Collectionsクラスで提供されている、スレッドセーフなオブジェクトを返staticメソッド

アトミック変数

  • ステートの読み込み → 値の変更 → ステートの書き戻しをアトミックに扱う。「完全に終了している」もしくは「未着手である」のいずれかでなければ、値を参照できない

カウンティング・セマフォ

  • 複数のスレッドを管理し、同期や割り込み制御を行う仕組みのこと

アトミック変数

  • java.util.concurrent.atomic にて宣言
  • AtomicBoolean
  • AtomicInteger
    AtomicIntegerのaddAndGetメソッドを使用すれば、synchronizedキーワードを使用せずにフィールドの競合が発生しないようにできる。
  • AtomicLong
  • AtomicReference
実装例
class Sample {
    public static void main(String[] args) {
        AtomicInteger atomicInteger =  new AtomicInteger(0);// 初期化
        int num = atomicInteger.get();// 取得
        AtomicReference<String> atomicReference = 
                                        new AtomicReference<String>("Hello");
        String str = atomicReference.get();
    }
}

並列コレクション

java.util パッケージで提供されるコレクション・クラスの多くは 、シングルスレッド・アプリケーションで使用することを前提に設計。

非スレッドセーフ

  • ArrayList
  • HashMap

スレッドセーフ
スレッドセーフとはアプリケーションをマルチスレッドで動作(複数のスレッドが同時並行的に実行)しても問題がないことを指す。

  • Vector
  • Hashtable
  • 利用状況によってはパフォーマンス劣化
実装例
public class Sample {
    private int num = 0;
    public static void main(String[] args) {
        Sample s = new Sample();
        ExecutorService service
                = Executors.newFixedThreadPool(10);
        for(int i = 0; i < 10; i++;) {
            service.submit(() -> {
                for(int j = 0; j < 10000; j++;) {
                    s.num++;                       // line
                }
            });
        }
        service.shutdown();
    }
}
解答
①
// lineの位置のコードを、次のコードに置き換える
synchronized (s) {
    s.num++;
}

②
numフィールドのデータ型をAtomicIntegerに置き換え、lineの位置コードを次のコードに置き換える
s.num.incrementAndGet();

変数numをスレッドセーフにするには、synchronizedを使って排他制御を行うか、AtomicIntegerなどのマルチスレッドに対応可能なプリミティブ型のラッパークラスを利用。

  • synchronizedはインスタンスをロックしてほかのスレッドからのアクセスを排除する。

java.util.concurrent パッケージが提供する並行コレクション

  • 同時並行正とパフォーマンスを最適化
  • BlockingQueue
  • ConcurrentHashMap
  • ConcurrentLinkedDeque
  • CopyOnWriteArrayList など

CopyOnWriteArrayList

  • add、set、remove などが呼び出されると内部で要素を保持している配列のコピーを作成
  • 変更処理中に他のスレッドが読み取りを行う場合、コピー前(イテレータ作成時点)のリストが返却される
  • 一方のスレッドがコレクションから要素を読み出している最中に、もう一方が変更を加えても例外が発生しないコレクション。
    java.util.ConcurrentModificationException がスローされることはない

java.util.concurrent.CyclicBarrier

  • 各スレッドの足並みを合わせるための機能を提供するクラス
  • 複数のスレッドの終了を待って処理をする同期化処理を実現。

CyclicBarrierは以下のように定義されている。

実装例
public CyclicBarrier(int parties, Runnable barrierAction)
引数について
  • parties:バリアーで待機するスレッドの数(到達する必要があるスレッドの数)を指定する。
  • barrierAction:全てのスレッドがバリアーに到達した時に実行されるタスク(Runnable)を指定する。この引数はオプションであり、必要な場合はnullを指定することもできる。
使い方
  1. CyclicBarrierオブジェクトを作成し、待機するスレッドの数を指定する。
  2. 各スレッドはawait()メソッドを呼び出して処理を中断し、バリアーで待機する。
  3. 指定された数のスレッドがawait()メソッドを呼び出すと、全てのスレッドが解放され、処理が続行される。
  4. バリアーに到達するたび、CyclicBarrierの初期化時に指定したタスク(オプション)が実行されることもある。
実装例
import java.util.concurrent.*;

public class CyclicBarrierExample {

    public static void main(String[] args) throws InterruptedException {
        final int THREADS = 3;
        CyclicBarrier barrier = new CyclicBarrier(THREADS, () -> {
            System.out.println("All threads have reached the barrier!");
        });

        Runnable task = () -> {
            try {
                System.out.println("Thread is waiting at the barrier");
                barrier.await();
                System.out.println("Thread has passed the barrier");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        };

        ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
        executorService.submit(task);
        executorService.submit(task);
        executorService.submit(task);

        executorService.shutdown();
    }
}
実行結果
Thread is waiting at the barrier
Thread is waiting at the barrier
Thread is waiting at the barrier
All threads have reached the barrier!
Thread has passed the barrier
Thread has passed the barrier
Thread has passed the barrier

上記の例では、3つのスレッドがCyclicBarrierのバリアーで待機する。CyclicBarrierのコンストラクタで待機するスレッドの数を指定し、Runnableを指定してバリアポイントに到達した時、実行されるタスクを定義している。

この例では、3つのスレッドがバリアーで待機し、全てのスレッドがバリアーに到達すると、指定したタスクが実行される。その後、各スレッドがバリアーを通過し、処理が続行される。

Excecutorフレームワーク

従来のThreadの問題点

  • Threadは1度しか利用できない為、2回目以降はリソースを生成するしかなかった。(2回目以降の実行はIllegalThreadStateException)
  • タスクの実行者とタスクの実行方法の両方が一括りに実装されていた。
  • 開発者がタスクの実行方法を詳細に制御できなかった。
    Excecutorフレームワークはこれらの問題点を解決した
  • タスクの実行者とタスクの実行方法を分離させる。
  • 2回目以降も同じリソースで実行することができる(submit() execute())

Excecutorフレームワークの主な機能

・スレッド・プールの実装を提供
・タスクの実行におけるライフサイクル管理や監視、統計収集などの機能も提供
・タスクのスケジューリング(遅延開始と周期的実行)を実現

Excecutorインターフェース

  • ExecutorService
  • タスクの終了管理
  • タスクの進行状況を管理
  • ScheduledExecutorService
  • ExecutorService を継承
  • 上記に加え、タスクのスケジュール管理

Executorオブジェクト

Executorオブジェクトを生成するにはExecutorsクラスのstaticメソッドから生成します。

主なメソッド

  • newSingleThreadExecutor()メソッド
  • newFixedThreadPool()メソッド
  • newCachedThreadPool()メソッド
  • newScheduledThreadPool()メソッド

newSingleThreadExecutorメソッド

単一のスレッドプールを作成する。

実装例
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorExample {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        // タスクを実行
        executorService.execute(() -> {
            System.out.println("Task 1 executed by thread: " + Thread.currentThread().getName());
        });

        executorService.execute(() -> {
            System.out.println("Task 2 executed by thread: " + Thread.currentThread().getName());
        });

        executorService.execute(() -> {
            System.out.println("Task 3 executed by thread: " + Thread.currentThread().getName());
        });

        // ExecutorServiceを終了する
        executorService.shutdown();
    }
}
実行結果
Task 1 executed by thread: pool-1-thread-1
Task 2 executed by thread: pool-1-thread-1
Task 3 executed by thread: pool-1-thread-1

上記の例では、newSingleThreadExecutor()メソッドを使用してExecutorServiceのインスタンスを作成している。このスレッドプールはシングルスレッドで動作し、タスクの実行を直列化する。

execute()メソッドを使用してタスクを送信し、非同期に実行する。各タスクはスレッドプール内の単一のスレッドによって順番に実行される。

thread-Xはスレッドの連番を示している。
タスクはスレッドプール内の単一のスレッドによって直列に実行されていることがわかる。

newFixedThreadPoolメソッド

指定された数のスレッドを持つ固定サイズのスレッドプールを作成するための静的メソッドである。

実装例
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolExample {

    public static void main(String[] args) {
        int nThreads = 3;
        ExecutorService executorService = Executors.newFixedThreadPool(nThreads);

        // タスクを実行
        executorService.execute(() -> {
            System.out.println("Task 1 executed by thread: " + Thread.currentThread().getName());
        });

        executorService.execute(() -> {
            System.out.println("Task 2 executed by thread: " + Thread.currentThread().getName());
        });

        executorService.execute(() -> {
            System.out.println("Task 3 executed by thread: " + Thread.currentThread().getName());
        });

        // ExecutorServiceを終了する
        executorService.shutdown();
    }
}
実行結果
Task 1 executed by thread: pool-1-thread-1
Task 2 executed by thread: pool-1-thread-2
Task 3 executed by thread: pool-1-thread-3

nThreadsパラメータは、スレッドプール内のスレッド数を示す。このスレッドプールは3つのスレッドを持ち、固定サイズのスレッドプールとなる。

スレッドプール内のスレッドは再利用され、必要に応じて新しいタスクを処理する。

pool-1はスレッドプールの識別子を示している。thread-Xを見ると、タスクはスレッドプール内のスレッドによって並列に実行されていることがわかる。

newCachedThreadPoolメソッド

可変サイズのスレッドプールを作成するための静的メソッドである。
このスレッドプールは可変であり、必要に応じてスレッドを動的に作成および再利用する。
スレッドプール内のスレッドは自動的に管理され、一定時間が経過すると自動的に破棄される。

実装例
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolExample {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();

        // タスクを実行
        executorService.execute(() -> {
            System.out.println("Task 1 executed by thread: " + Thread.currentThread().getName());
        });

        executorService.execute(() -> {
            System.out.println("Task 2 executed by thread: " + Thread.currentThread().getName());
        });

        executorService.execute(() -> {
            System.out.println("Task 3 executed by thread: " + Thread.currentThread().getName());
        });

        // ExecutorServiceを終了する
        executorService.shutdown();
    }
}
実行結果
Task 1 executed by thread: pool-1-thread-1
Task 2 executed by thread: pool-1-thread-2
Task 3 executed by thread: pool-1-thread-3

newScheduledThreadPoolメソッド

複数のスレッドをプールしながら効率よくマルチスレッドによる処理を実行。

実装例
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class newScheduledThreadPoolExample {
    public static void main(String[] args) throws Exception {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);

        // タスクを実行
        executorService.scheduleWithFixedDelay(() -> {
            System.out.println("A");
        }, 1, 1, TimeUnit.SECONDS);

        executorService.scheduleWithFixedDelay(() -> {
            System.out.println("B");
        }, 1, 1, TimeUnit.SECONDS);

        Thread.sleep(10000);
        // ExecutorServiceを終了する
        executorService.shutdown();
    }
}
実行結果
ABABABABABABABABAB

java.util.concurrent.ExecutorServiceインタフェース

<T> Future<T> submit(Callable<T> task)
<T> Future<T> submit(Runnable task, T result)
実行用のタスクを送信して、そのタスクを表すFutureを返す。
上記の違いは戻り値の有無

java.util.concurrent.ScheduledExecutorServiceインタフェース

ExecutorServiceを拡張しており、メソッドを定期的に実行するためのインタフェースである。

主なメソッド

  • schedule(Runnable task, long delay, TimeUnit unit)
    指定された遅延時間の後にタスクを実行する。
実装例
exec.schedule(() -> {
    System.out.println("A");
}, 1, TimeUnit.SECONDS);
  • schedule(Callable<V> task, long delay, TimeUnit unit)
    指定された遅延時間の後にタスクを実行し、結果を返す。

  • scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit)
    指定された初期遅延時間の後にタスクを実行し、一定の間隔で繰り返し実行する。

  • scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit)
    マルチスレッドで、指定された初期遅延時間の後にタスクを実行し、前のタスクの終了から指定された遅延時間が経過するごとに繰り返し実行する。

これらのメソッドを使用することで、タスクを遅延実行したり、定期的に繰り返し実行したりすることができる。TimeUnit列挙型を使用して遅延時間や間隔の単位を指定する。

その他のメソッド

  • newSingleThreadScheduledExecutor():単一のスレッドを持つスケジュール可能なスレッドプールを作成する

  • newScheduledThreadPool(int corePoolSize)
    指定された数のスレッドを持つスケジュール可能なスレッドプールを作成する。

TimeUnit

時間単位を表すための列挙型(Enum)である。
Javaのjava.util.concurrent.TimeUnitパッケージに含まれており、時間の経過を表すために使用される。

主な定数

  • NANOSECONDS:ナノ秒(1秒の10億分の1)
  • MICROSECONDS:マイクロ秒(1秒の100万分の1)
  • MILLISECONDS:ミリ秒(1秒の1000分の1)
  • SECONDS:秒
  • MINUTES:分
  • HOURS:時間
  • DAYS:日
    これらの定数は、ScheduledExecutorServiceなどのタイムアウトや遅延時間の指定など、時間に関連する処理で使用される。

例えば、schedule()メソッドやscheduleAtFixedRate()メソッドにおいて、遅延時間や間隔の単位を指定する際にTimeUnitを使用する。

実装例
import java.util.concurrent.TimeUnit;

public class TimeUnitExample {

    public static void main(String[] args) throws InterruptedException {
        long delay = 500; // 500ミリ秒

        // TimeUnitを使用して遅延時間を指定
        TimeUnit.MILLISECONDS.sleep(delay);

        System.out.println("Delay of " + delay + " milliseconds has passed.");
    }
}

上記の例では、TimeUnit.MILLISECONDSを使用して500ミリ秒の遅延時間を指定している。sleep()メソッドを呼び出すことで、指定した時間だけスレッドを一時停止させている。

java.util.concurrent.Future

  • get()メソッドはnullを受け取る。
  • null以外を指定した場合は、submit()メソッドの第2引数を指定する。
実装例
// 第2引数に0を指定している。
Future<Integer> future = exec.submit(() -> {
    try {
        // do something
    } catch(InterruptedException) {
        throw new RuntimeException(e);
    }
}, 0);
実行結果
0

java.util.concurrent.Callable

  • 処理を結果を戻したり、必要に応じて例外をスローしたりするマルチスレッドのタスクを定義するためのインタフェース。
  • Callableインタフェースには、callメソッドがあり、Booleanで返す。
  • 例外は、ExecutionExceptionを用意する。
  • 任意の戻り値を戻せる。

java.lang.Runnable

戻り値を戻せない

実装例
Runnable r = () -> {System.out.println("hello");};
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?