LoginSignup
4
5

Java 非同期処理

Last updated at Posted at 2024-01-13

Javaの非同期処理

Javaにおいてマルチスレッドとは、一つのプロセス内で同時に実行される複数のスレッドを指す。

プロセスとは、実行中のプログラムのインスタンスを指す。プロセスはOSによって管理されるプログラムの実行単位である。

スレッドもまた、プログラムの実行単位を指す。一つのプロセス内には複数のスレッドが存在することができるため、プロセスが親、スレッドは子という関係にある。

Javaは、一つのプロセス内で動作していて、そのプロセス内では複数のスレッドが活用されることで、より効率的に処理が実行されるようになっている。

コンピュータ上で複数のプロセスが起動されている場合、各プロセスは独立したそれぞれのメモリ領域を持ち、通常プロセス同士はそれぞれ隔離された環境を持っている。プロセス内での資源の割り当てやスケジューリングは、OSによって管理されていて、異なるプロセス間ではメモリ領域が共有されない

「資源の割り当て」とは具体的に、ヒープ、スタックなどのメモリ領域の割り当て(メモリ上のどの番地からどの番地を使うか)や、CPUの使用権利の割り当て、ファイルや入出力デバイスの割り当てなどを指す。異なるプロセス間で情報をやり取りするためには、IPC(Inter Process Communication)やネットワーク通信(ソケット、パイプ)などの特殊な技術を使用する必要がある。

Javaは一つのプロセス内で動作していて、プロセスに割り当てられたメモリ領域を、プロセス内の各スレッドが共有するため、複数のスレッドが同時に同じメモリ領域にアクセスする競合を起こすことがある。

競合を避けるためには、開発者がスレッドセーフなプログラミングを行う必要がある。

スレッド

スレッドとは、プログラムの実行単位であり、一つのプロセス内で動作する独立した処理フローのことを指す。同じプロセス内で、複数存在することができる。各スレッドは個別の実行コンテキストを持ち、プロセス内で共有されるメモリ領域へアクセスを行う。

プロセスがOSによって管理されるのと同様に、スレッドもまた、OSによって管理される。具体的には、各スレッドに対してCPUの使用権の割り当て、つまり実行順の管理がOSによって行われる。

Javaでは、スレッドが起動されると、各スレッドに「スタック」と呼ばれるメモリ領域が割り当てられる。スタックとは、スレッドがメソッドを実行した時に、そのメソッド内のローカル変数、ステータス情報などのコンテキストが保持される領域のことを指す。メソッドの実行が終了すると、スタックに積まれたコンテキストは取り除かれる。

プログラムの実行中は、メソッドの実行によってコンテキストが積まれ、またメソッド内から別のメソッドが実行されることで、その上にさらに新たなコンテキストが積まれる。反対に、メソッドが終了するとスタックの一番上に積まれたコンテキストは除去され、これらが繰り返されることで、スレッド(Thread、糸)が形成される。

つまりスレッドとは、物理的な何かを指すものではなく、プログラムの処理の流れを抽象化したものと言うことができる。

(言語は異なりますが、スレッドの概念は下記の記事でもまとめています。異なるプログラミング言語においては、実行モデルが異なることがあるため、ご注意ください。)

Main.java
public class Main {
    public static void main(String[] args) {
        fn();
    }

    public static void fn() {
        System.out.println("hello world");
    }
}

Thread_2.gif

マルチスレッドと並列処理

プログラムには並行処理並列処理という概念がある。(詳細は下記の記事にまとめています)

簡単に区別すると、実行中の処理を高速に切り替えることで複数の処理が同時に実行されているように見せるのが並行処理で、実際に複数の処理を物理的に同時実行しているのが並列処理。

物理的な並列処理は、マルチコアプロセッサなどの複数のコアを持つハードウェア上でプログラムを実行することによって実現される。複数のコアに演算処理を分散させ、同時に実行させることで、全体としてプログラムの処理が高速に実行される。

プログラムが、シングルコアのプロセッサ上で動作している場合や、スレッドが競合状態(複数のスレッドが同時にデータにアクセスしようとする状態)に陥っている場合などは、物理的な同時性は実現しない。

マルチコアプロセッサとは、複数のコアを持つCPUのことを指す。コアは、算術論理演算(加算、減算、乗算、除算など)や論理演算(AND、OR、NOTなど)などの基本的な演算を実行する能力を持っている。シングルコアプロセッサは、コアが一つしかないCPU。

マルチスレッド環境で実行されるプログラムは、実際には、必ずしも複数のスレッドが物理的に同時に動いているとは限らない。人間から見ると同時に実行されているように見えても、実際には複数のスレッドが高速に一つのCPUを譲り合って並行処理を進めているだけかもしれない。

並行処理におけるCPUの割り当てはスレッドスケジューラによって行われ、処理中のスレッドはアクティブスレッドとも呼ばれる。スレッドスケジューラとは、実行可能なスレッドの優先度や状態に基づいて、どのスレッドを次に実行するかを決定している。

スレッドスケジューラは、複数のスレッドを物理的に並列処理するためのものではなく、複数のスレッドを効果的に管理し、複数のスレッドの並行処理を効率的に実行するための仕組みである

つまり、物理的な同時性はハードウェアが提供するものであり、マルチスレッドによるプログラミングが提供できるものではない。

スレッドを生成する

Javaのプログラムは通常、JVMによって自動的に起動されたメインスレッド上で実行される。そのため、開発者は「メインスレッドを起動するためのプログラム」を書かない。

一方で、メインスレッド以外のスレッドは、明示的に生成を行わない限り存在しない(ライブラリやフレームワークが提供する非同期処理や、ガベージコレクションなどのJVMによって自動的に生成されるスレッドは存在する)。

スレッドを明示的に生成しない場合のプログラムの処理は、すべてメインスレッド上で実行される。

スレッドを生成する方法には以下の方法がある。

  • Threadクラスを継承する(古い)
  • Runnableインターフェースを実装する(新しい)

実際には後述するCompletableFutureを使用した方が書きやすく便利。ただ、スレッドの原理や仕組みを理解するためにはThreadRunnableの理解も重要。

Threadクラスを継承する

Threadクラスは、Javaの標準ライブラリ(java.langパッケージ)に含まれている。Threadクラスを利用する場合、Threadクラスを継承して、非同期的に実行したい処理をオーバーライドしたrun()メソッド内に記述する必要がある。

MyThread.java
public class MyThread extends Thread {
    @Override
    public void run(){
        System.out.println("非同期的に処理を実行します。");
    }
}
Main.java
public class Main {
    public static void main(String[] args) {
        // スレッドを生成
        var thread = new MyThread();

        // 生成したスレッド上で、処理を実行
        thread.start();
        // >> 非同期的に処理を実行します。
    }
}

この時、スレッドは以下のような様子になっている。

Thread_new.png

JavaScriptの非同期処理との違い

JavaScriptはJavaのマルチスレッドと異なり、単一スレッド(シングルスレッド)上で実行され、非同期処理はイベントループと呼ばれる実行モデルによって実現される。

イベントループにおける非同期処理は、コールバック関数としてコールバックキューに格納され、「スタックが空になった」という条件時にスタックに積まれ、実行される。

Thread_javascript.png

一方のJavaは、マルチスレッドのプログラミングが可能となっていて、JVMのコンポーネントであるスレッドスケジューラが異なるスレッド間で実行中のスレッド(アクティブスレッド)を切り替える制御を行っている。

Thread_java.png

そのため、異なるスレッドで実行される処理は、どのような順序で実行されるかがJVMの実装に依存していて、予測することはできない。(開発者がこの制御に影響を与えることはできるが、強制的に特定のスレッドをアクティブスレッドとすることはできない。)マルチスレッドのプログラミングで排他制御が重要な理由はこういった理由によるものである。

スレッドスケジューラ

Javaのマルチスレッド環境における実行中のスレッド(アクティブスレッド)は、JVMのコンポーネントであるスレッドスケジューラが制御している。

正確には、OSのスレッドを、JVMがJavaのスレッドに関連づけることで、JVMによる間接的な制御が行われている。スケジュール機能はOSに依存するが、JVMがOSごとの差異を吸収することで、プラットフォームに依存しない仕組みとなっている。

スレッドには「実行中」、「実行可能」、「ブロック中」の3つの状態が存在し、スレッドの生成直後の状態は実行可能状態であり、生成してすぐに実行中になるわけではない

実行可能状態のスレッドはスレッドスケジューラによる判断や決定によって、実行中へと昇格する。スレッドが実行中になると、そのスレッドのスタック(コンテキスト)が有効になる。

Threadクラスのsleep()wait()が実行されると、スレッドはブロック状態になる。また、synchronizedキーワードによって、アクセスしたオブジェクトがロックされていた場合にも、スレッドはブロック状態になる。つまりブロック状態とは、スレッドが実行を一時的に停止した状態を指す。これらの制御がスレッドスケジューラによって行われている。

またコンピュータのコアが1つであれば、同時に二つのスレッドが実行中になることはない。

スレッドスケジューラはJVMのコンポーネントであり、JVMの実装に依存しているため、開発者が直接制御を行うことはできない。そのため、同じマシン上で同じプログラムを動かしたとしても、スレッドスケジューラが全く同じ挙動をすることは保証されない。

マルチスレッド環境においてはこのことを前提として開発を行う必要がある。

Runnableインターフェースを実装する

Runnableインターフェースも、Javaの標準ライブラリであるjava.langパッケージに含まれている。Runnableでは、run()メソッドが定義されている。

Threadクラスを継承した時と形がよく似ているが、今回はThreadを直接インスタンス化する。

MyRunner.java
public class MyRunner implements Runnable{
    @Override
    public void run() {
        System.out.println("非同期的に処理を実行します。");
    }
}
Main.java
public class Main {
    public static void main(String[] args) {
        // スレッドを生成
        var thread = new Thread(new MyRunner());

        // 生成したスレッド上で、処理を実行
        thread.start();
        // >> 非同期的に処理を実行します。
    }
}

匿名クラスを使って記述する

スレッド生成のコードは、匿名クラスを使うことで簡潔に記述できるようになる。

匿名クラスとは

匿名クラスとは、名前のないクラスのインスタンスを生成するための特殊な構文のこと。

普通、Javaのクラスは名前とともに宣言され、宣言された名前とnewキーワードを使ってインスタンス化が行われる。

new クラス名()

匿名クラスは宣言と同時にインスタンスが生成される。

new インターフェース名() {
    ここがクラス宣言部分
    インターフェースの実装をここで行う
}

通常インターフェースや抽象クラスを実装する場合、実装用のクラスを宣言する必要がある。ただ、1度しかインスタンス化しないクラスを、わざわざ宣言したくない時などに匿名クラスが有効な手段になる。

MyInterface.java
public interface MyInterface {
    void method();
}
本来はインターフェースを実装するためのクラスを宣言する必要がある(クリックで展開する)
MyClass.java
// 匿名クラスを使用することでこのクラス宣言が不要になる
public class MyClass implements MyInterface {
    @Override
    public void method() {
        System.out.println("処理を実行します。");
    }
}
Main.java
public class Main {
    public static void main(String[] args){
        // インターフェースの名前を利用して、
        // インターフェースを実装した、匿名クラスのインスタンスを生成
        var instance = new MyInterface(){
            @Override
            public void method(){
                System.out.println("処理を実行します。");
            }
        };

        // 実装した処理を呼び出す
        instance.method();
        // >> 処理を実行します。
    }
}

匿名クラスにはコードが簡潔になるというメリットがある一方で、コンストラクタを持つことができない、継承関係になれないなどの制約がある。

匿名クラスを使って記述する

匿名クラスを利用すると、先ほどのコードは以下のようになる。宣言する必要のあるクラスがそれぞれ一つ減っていることがわかる。

匿名クラスはインターフェースに使用した場合にはインターフェースを「実装(implements)」したクラスになり、通常のクラスに使用した場合にはそのクラスをスーパークラスとして「継承(extends)」したサブクラスになる。

Threadクラスに対して使用したときは継承(extends)した匿名クラスになり、Runnnableインターフェースに対して使用したときは実装(implements)した匿名クラスになる。

匿名クラスを使ってThreadクラスを継承する
public class Main {
    public static void main(String[] args) {
        // 匿名クラスを使ってスレッドを生成
        var thread = new Thread(){
            @Override
            public void run() {
                System.out.println("非同期的に処理を実行します。");
            }
        };

        thread.start();
        // >> 非同期的に処理を実行します。
    }
}
匿名クラスを使ってRunnableインターフェースを実装する
public class Main {
    public static void main(String[] args) {
        // 匿名クラスを使ってスレッドを生成
        var thread = new Thread(new Runnable(){
            @Override
            public void run() {
                System.out.println("非同期的に処理を実行します。");
            }
        });

        thread.start();
        // >> 非同期的に処理を実行します。
    }
}

ラムダ式を使って記述する

Runnableインターフェースは関数型インターフェースでもあるため、ラムダ式を利用することでさらに簡潔にできる。

ラムダ式とは

Java8から導入された、メソッドを引数として別のメソッドに渡すための構文。

ラムダ式を理解するには、関数型インターフェースについて理解する必要がある。

関数型インターフェースとは

関数型インターフェースは、抽象メソッドを一つだけ持つという条件を満たすインターフェースのこと。SAM(Single Abstract Method)と呼ばれることもある。

関数型インターフェースは、メソッドを格納するための型として機能する

メソッドの引数、戻り値の有無、戻り値の型の組み合わせがメソッドの型としての表現になる。

標準ライブラリのjava.util.functionパッケージには、引数、戻り値のそれぞれ異なるFunctionPredicateConsumerといった関数型インターフェースがあらかじめ用意されているため、自分で関数型インターフェースを宣言して作成する機会はあまりないかも。

java.util.functionパッケージの関数型インターフェース
関数型インターフェース 定義
Cunsumer<T> void accept(T)
Supplier<T> T get()
Predicate<T> boolean test(T)
Function<T, R> R apply(T)

また通常、インターフェースにはdefaultメソッドや、Objectクラスのメソッド(toString()equals()など)をオーバーライドしたメソッドも宣言できるため、これらが含まれるインターフェースでは、関数型インターフェースかどうかがわかりづらいことがある。そのような場合には、@FunctioalInterfaceアノテーションを使用すると、仮に関数型インターフェースの条件を満たさない宣言をしてしまった場合でも、コンパイルエラーによって知らせてくれるようになる。

関数型インターフェース
// 関数型インターフェース
@FunctionalInterface
public interface MyInterface {
    void method();
}	

// これも関数型インターフェース
@FunctionalInterface
public interface MyInterface2 {
    void method();

    default void defaultMethod() {
        System.out.println("デフォルトの処理です。");
    }
    
    String toString();
}

// 抽象メソッドが2つあるため、コンパイルエラーが起きる
@FunctionalInterface
public interface MyInterface {
    void method1();

    void method2();
}

基本構文

ラムダ式は、匿名の関数を定義するための構文。メソッドの定義を「式」によって表現することができる。メソッド実行時の実引数として利用することができる。メソッドを定義しながら同時に、引数として別のメソッドに渡すことができる。

() -> {
  ---処理内容---
}

ラムダ式は、関数型インターフェースの実装として利用される

関数型インターフェースは実装すべき抽象メソッドを一つしか持たないため、ラムダ式を使用して実装する際には、実装するメソッド名を記述する必要がない。

(個人的に、「ラムダ式は、関数型インターフェースの実装として利用される」の意味が、最初全く理解できなかった。コールバック関数を代入した変数をなぜ()で実行しないのかがわからなかったが、ここでのラムダ式は、コールバック関数としてのラムダ式ではないと理解したことで納得できた。「=」が何かを代入しているように見えるのでわかりづらいが、ラムダ式はコールバック関数としても利用できるし、関数型インターフェースの実装としても利用できる。「=」を「代入」と捉えると頭が混乱してしまうが、匿名メソッドを「定義」している、と捉えることで納得できた。「コールバック関数を代入している」のではなく、あくまでも、「インターフェースでオーバーライドすべきメソッドを実装している」だけ。つまり、ラムダ式は、関数型インターフェースの実装として利用される。)

MyInterface.java
// 関数型インターフェース
@FunctionalInterface
public interface MyInterface {
    void method();
}
Main.java
public class Main {
    public static void main(String[] args) {
        // ラムダ式が関数型インターフェースを実装する
        MyInterface instance = () -> {
            System.out.println("hello world");
        };

        instance.method();
        // >> hello world
    }
}

{}return は省略できる

ラムダ式の処理が1行で表現できる場合、{}ブロックとreturnキーワードは省略することができる。

MyInterface instance = () -> System.out.println("hello world");

引数ありの構文

(引数の型 仮引数) -> {
  ---処理内容---
}
MyInterface.java
@FunctionalInterface
public interface MyInterface {
    void method(int i);
}
Main.java
public class Main {
    public static void main(String[] args) {
        // 引数ありのラムダ式
        MyInterface instance = (int index) -> System.out.println(index + " hello world");

        instance.method(100);
        // >> 100 hello world
    }
}

引数の型は省略できる

ラムダ式の引数の型は、暗黙的に推論されるため、省略できる。

MyInterface instance = (index) -> System.out.println(index + " hello world");

引数が一つの場合、()が省略できる。

MyInterface instance = index -> System.out.println(index + " hello world");

ただし、引数がない場合の()は省略できない。

メソッド参照

メソッド参照とはラムダ式を簡潔にするための手法、構文のことを指す。

メソッドへの参照を表現することができ、既存のメソッドを変数に代入したり、別のメソッドへの引数として渡すことができる。

主に関数型インターフェースの実装として利用される。3種類の構文がある。

クラス名::静的メソッド名
インスタンス変数::メソッド名
クラス名::new
MyInterface.java
@FunctionalInterface
public interface MyInterface {
    void method();
}
MyClass.java
public class MyClass {
    // 静的メソッド
    public static void staticMethod() {
        System.out.println("静的メソッドです。");
    }

    // インスタンスメソッド
    public void instanceMethod() {
        System.out.println("インスタンスメソッドです。");
    }

    // コンストラクタ
    public MyClass() {
        System.out.println("コンストラクタです。");
    }
}

下の3つは全てインターフェースMyInterfaceを、ラムダ式を簡略化したメソッド参照によって実装している。

Main.java
public class Main {
    public static void main(String[] args) {

        // クラス名::静的メソッド名
        // ラムダ式によってここで実装を行うのではなく、既存のメソッドをインターフェースの実装として利用する
        MyInterface staticMethodReference = MyClass::staticMethod;
        staticMethodReference.method();
        // >> 静的メソッドです。

        // インスタンス変数::メソッド名
        var myClass = new MyClass();
        MyInterface instanceMethodReference = myClass::instanceMethod;
        instanceMethodReference.method();
        // >> コンストラクタです。
        // >> インスタンスメソッドです。

        // クラス名::new
        MyInterface constructorReference = MyClass::new;
        constructorReference.method();
        // >> コンストラクタです。
    }
}

defaultメソッドとは

Java8で導入された修飾子。インターフェース内でdefault修飾子をつけて宣言したメソッドは、実装するクラスでオーバーライドしなくても良い。

MyInterface.java
public interface MyInterface {
    // defaultメソッド
    default void method(){
        System.out.println("デフォルトの処理です。");
    }
}
MyClass.java
public class MyClass implements MyInterface {
    // defaultメソッドはオーバーライドしなくてもOK(もちろん、オーバーライドしてもOK)
}
Main.java
public class Main {
    public static void main(String[] args) {
        var instance = new MyClass();
        instance.method();
        // >> デフォルトの処理です。
    }
}

さらに、defaultメソッドはsuperキーワードを使用することで明示的に呼び出すこともできる。

MyInterface,java
public interface MyInterface {
    // defaultメソッド
    default void method(){
        System.out.println("デフォルトの処理です。");
    }
}
MyClass.java
public class MyClass implements MyInterface {
    @Override
    public void method(){
        // defaultメソッドを明示的に呼び出すこともできる
        MyInterface.super.method();
        
        System.out.println("実装クラスの処理です。");
    }
}
Main.java
public class Main {
    public static void main(String[] args) {
        var instance = new MyClass();
        instance.method();
        // >> デフォルトの処理です。
        // >> 実装クラスの処理です。
    }
}

ラムダ式を使ってRunnableを実装する

Runnableインターフェースは関数型インターフェースでもあるため、ラムダ式を利用することができる。

ラムダ式を使ってRunnableインターフェースを実装する
public class Main {
    public static void main(String[] args) {
        // ラムダ式を使ってスレッドを生成
        var thread = new Thread(() -> System.out.println("非同期的に処理を実行します。"));

        thread.start();
        // >> 非同期的に処理を実行します。
    }
}

排他制御

スレッドは一つのプロセス内で複数存在し、プロセスに割り当てられたメモリ領域を共有するため、複数のスレッドから同時にアクセスがされて競合が起きないように排他制御を行う必要がある。

具体的に「競合」とは、複数のスレッドが同じフィールドにアクセスし、それぞれのスレッド内で値を同時に更新しようとするような状況を指す。

デッドロック

デッドロックとは、以下の状態に陥ることを指す。

  • A、Bという2つのリソースが存在する
  • スレッドXは、A → Bという順番でリソースに対するロックを獲得する
  • スレッドYは、B → Aという順番でリソースに対するロックを獲得する

この時スレッドXがAのロックを獲得した直後に、スレッドYがBに対するロックを獲得したとする。

この状況に陥ると、スレッドXとスレッドYがお互いのロック解除を永遠に待つことになる。この状態をデッドロックという。

synchronized

synchronizedキーワードを使用することによって、目的のフィールドに対するアクセスを特定のスレッドのみにすることができる。

複数のスレッドが同時にアクセスしてはいけない共有データに対する処理範囲をクリティカルセクションという。

synchronizedブロック
synchronized(ロック対象のオブジェクト) {
    クリティカルセクションロックされた領域での処理
}

Javaでは慣例的に、ロック対象のオブジェクトの変数名をlockにすることが多い。

MyClass lock = new MyClass();
synchronized(lock) {
    // クリティカルセクション
}

synchronizedはメソッドに対しても、修飾子として使用することができる。

synchronized修飾子
synchronized void メソッド名() {
    クリティカルセクションロックされた領域での処理
}

ただし、メソッドの修飾子として使用した場合、異なるインスタンスのメソッド同士の排他制御は行われないため注意(インスタンスAとインスタンスBのメソッド同士は排他制御されない)。

synchronizedはインスタンスの内部での競合を防ぐ

排他制御は競合を防ぐ一方で、クリティカルセクションをむやみに広げるとパフォーマンスに悪影響を与える。そのため、安全性とパフォーマンスのバランスを考えながら使用する必要がある。

またsynchronizedと似たものにvolatile修飾子というのもある。

volatile

volatileは「揮発性」を意味し、値が変化しやすいことをコンパイラに通知する修飾子。

volatileで宣言されたフィールドは、コンパイラによって最適化されず、常にメインメモリからアクセスされることが保証される。

メモリの種類

メモリには以下の種類があり、それぞれアクセス速度と容量が異なる。

  • メインメモリ : 補助記憶装置からプログラムが展開される主要なメモリ
  • キャッシュメモリ : メインメモリよりも高速でCPUに近い位置にある
  • レジスタ : 最も高速なメモリでCPU内部にある

アクセス速度順に並べると、

遅い < メインメモリ < キャッシュメモリ < レジスタ < 速い

容量はその逆。

小さい < レジスタ < キャッシュメモリ < メインメモリ < 大きい

という関係になっている。

コンパイル

Javaのプログラムは、実行されるまでに2回のコンパイルを経ることがある。

事前コンパイル

ソースコード(Main.java)から中間バイトコード(Main.class)への静的コンパイルを指す。

「実行前に」 javac によって行われる。

プラットフォーム(OS)に依存しない中間コードが生成される。

実行時コンパイル

JVM によって「実行時に」行われる動的コンパイル。

事前コンパイルされら中間コードの一部が機械度に変換される。

パフォーマンス向上を目的として行われる。

JVM

JVMのJava実行エンジンには インタプリタモードと、 JITコンパイラモード (JIT = Just-In-Time)がある。

インタプリタモード

中間コードを1行づつ解釈して実行する。

JITコンパイラモード

中間コードの一部を、CPUが直接解釈できる機械語に変換することで実行速度が速くなる。

最適化

JITコンパイラは、頻繁に使用されるコードをキャッシュメモリを利用して最適化 を行う。

通常、フィールドはメインメモリ上に展開されて、プログラム内で更新されることがある一方で、プログラム内で頻繁に利用される一部のフィールドなどは、最適化によってキャッシュメモリ上に展開される。

キャッシュされたフィールドは、メインメモリ上のフィールドと同期されないため、最新の値が反映されず、複数スレッドが同一のフィールドを更新したときに矛盾が生じることがある。

volatileの効果

複数のスレどから更新される可能性のあるフィールドに対してvolatile修飾子を使用して宣言しておくと、そのフィールドが他のスレッドによって変更される可能性があることをコンパイラに対して示すことになるため、コンパイラがそのフィールドへのアクセスを最適化しないようにすることができる。

volatileを付与したフィールドは、常にメインメモリから読み込まれ、変更が即座にメインメモリ上に反映されることが保証される

volatileで宣言したフィールドは、キャッシュされずに常にメインメモリから読み込まれる

複数のスレッド間で最新の値にアクセスできることから、「変数の可視性が確保される」とも表現される。

ただし、複数のスレッド間からの更新処理の競合を防ぐものではないので、「原子性は確保されない」と表現される。

volatileは競合自体を防ぐものではない

volatileの例
class MyClass {
    // volatileを使ったフィールド
    volatile boolean flag = false;
}

MyClass myClass = new MyClass();

// スレッドAからのアクセス
myClass.flag = true;  // メインメモリ上で更新が即反映される

// スレッドBからのアクセス
if (myClass.flag) {
    // volatileを使用しない場合、キャッシュされていたflag(falseが格納)にアクセスされることがある
    // volatileを使用することで、必ずメインメモリから読み出されるため、矛盾が発生しない
}

synchronzedvolatile

synchronzedvolatileは、どちらもマルチスレッド環境でデータの一貫性を確保するための手段であるが、それぞれ違いがある。

synchronized

排他制御を実現するためにメソッドやブロックに対して使用される。

あるスレッドがsynchronizedブロックに入ると、他のスレッドはロックしたインスタンスやメソッドへアクセスすることができなくなる。

メソッドやブロックに対して使用することができるため、トランザクションのようなある程度まとまった単位で処理の原子性を確保することができる。デッドロックの危険がある。

volatile

複数のスレッドからの可視性を確保するために、変数に対して使用される。volatileを使用した変数はキャッシュメモリではなく、必ずメインメモリを介してアクセスされる。単一な操作に対しては効果があるものの、synchronizedと比べて、局所的な制御という感じで、複数の処理を排他制御するなどはできない。volatileを使用しても、複数のスレッドが同時に同じフィールドを変更しようとする競合は防ぐことができない。

volatile
class MyClass {
    // キャッシュされないようにできる
    volatile boolean flag = false;
}
synchronized
class MyClass {
    private int i1 = 0;
    private int i2 = 0;

    public synchronized void incrementCounters() {
        // 複数のスレッドから呼び出されたとしても、競合しない
        i1++;
        i2++;
    }
}
volatile と synchronized
// 組み合わせて使用することもできる
class MyClass {
    private volatile int i1 = 0;
    private volatile int i2 = 0;

    public void incrementCounters() {
        synchronized (this) {
            i1++;
            i2++;
        }
    }
}

複数の文を排他制御したり、複雑な処理を同期処理したい場合にはsynchronizedを使用し、単に一つの変数の可視性を確保したい場合にはvolatileを使用する。

ReentrantLock

java.util.concurrent.locksパッケージには、synchronizedよりもより柔軟で細かい排他制御が可能なReentrantLockクラスがある。

synchronizedを使用したときに意識されなかった実際のロック操作を、lock()メソッドによって明示的に行うことができる。また、同じスレッドが同じロックを再帰的に取得することもできる。(reentrant=再入可能。synchronizedでも再帰的なロックは可能)取得したロックは必ずunlock()によって解放する必要がある。ロック解放にはtry-finallyを利用する。

特に複雑な非同期処理の制御を行う際に使用する。基本的な使い方は以下のようになっている。

明示的なロック
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyClass {
    private final Lock lock = new ReentrantLock();

    public void method() {
        // ロックの取得
        lock.lock();
        try {
            // クリティカルセクション:ロックされた領域での処理
        } finally {
            // ロックの解放
            lock.unlock();
        }
    }
}

また、ロックを獲得できなかった場合の処理、指定時間内にロックを獲得できなかった場合の処理なども制御できる。

ロックを獲得できなかった場合の処理
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyClass {
    private final Lock lock = new ReentrantLock();

    public void method() {
        if (lock.tryLock()) {
            try {
                // ロックが取得できた場合の処理
            } finally {
                lock.unlock();
            }
        } else {
            // ロックが取得できなかった場合の処理
        }
    }
}
指定時間内にロックを獲得できなかった場合の処理
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyClass {
    private final Lock lock = new ReentrantLock();

    public void method() throws InterruptedException {
        if (lock.tryLock(500, TimeUnit.MILLISECONDS)) {
            try {
                // 指定時間内にロックが取得できた場合の処理
            } finally {
                lock.unlock();
            }
        } else {
            // 指定時間内にロックが取得できなかった場合の処理
        }
    }
}

tryLock()がスローする可能性があるInterruptedExceptionは、スレッドが中断されたときに発生する。具体的には、スレッドがsleep()wait()join()などによってブロック状態にあるときに、外部からそのスレッドに対してinterrupt()が呼ばれると、スレッドは中断状態になり、InterruptedException例外が発生する。

java.util.concurrent.atomicパッケージ

スレッドセーフなプログラミング手法には他にも、java.util.concurrent.atomicパッケージに含まれるAtomicBooleanAtomicIntegerなどのクラス群を利用する方法もある。

これらのクラス群を使用した型は、値の取得、演算、再代入までの操作を、他のスレッドに割り込まれることなく実行できる(スレッド同士が競合しない)。これはアトミックな操作と表現とされる(Atomic=原子の、不可分の)。

元々、Javaのlongdouble型以外の基本型(intshortbytecharfloat)はアトミックに操作できるため、AtomicLongAtomicDoubleが主に利用される。

Javaのlongdoubleのデータ型は、64bitのメモリ領域を必要とする。64bitのデータ型は、ハードウェア上では2回にわたる32bitの操作として扱われることがあり、この2回の操作の間に他のスレッドに割り込まれる可能性があるため、アトミックな操作が保証されない。

AtomicInteger
import java.util.concurrent.atomic.AtomicInteger;

AtomicInteger atomicInt = new AtomicInteger(10);

// 値の取得
int currentValue = atomicInt.get();

// 値の設定(インクリメント)
int newValue = atomicInt.incrementAndGet();

// アトミックな更新
// 10と等しい場合、20を代入する。10と等しい場合、メソッドはtrueを返却する
boolean isUpdated = atomicInt.compareAndSet(10, 20);

スレッドプール

スレッドはプロセスよりも軽量であるものの、数が増えればパフォーマンスに影響を与えてしまうことがある。これを防ぐためにスレッドプールという仕組みがある。

スレッドプールでは、アプリで使用するスレッドがあらかじめ複数用意され、必要になったときに取り出して使用したり、使用後はプールに戻すことで、再利用したりする。

このように、スレッドの新規生成による負荷を極力抑えて、一度生成したスレッドを繰り返し再利用することで、オーバーヘッドを減らし、パフォーマンスを向上させることができる。

スレッドプールを生成する

スレッドプールを生成するには、java.util.concurrentパッケージに含まれるExecutorsクラスを利用する。

Executorクラスのインスタンスは、ファクトリメソッドと呼ばれるインスタンス生成のためのnewFixedThreadPool()などの静的メソッドを通じて行う。

生成したインスタンスは、submit(Callable<T> task)を使用して実行結果を取得することができる。submit()Future<T>型を返す。

また、submit()と似たものでexecute(Runnable task)というメソッドもあり、execute()は非同期処理が戻り値を持たない場合に使用する。

Futureは非同期処理のハンドリングをしやすくするためのクラスであり、処理の成否や処理結果を取得するために利用される。

execute()を使用した場合、生成したスレッドの処理で発生した例外はスレッドプール内で処理されてしまうため、キャッチすることができないのに対して、submit()を使用した場合はFutureを介してキャッチすることができる。

非同期処理が戻り値を返さないとき

Task.java
public class Task implements Runnable {
    private int id;

    public Task(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        System.out.println("タスクを実行します" + " id: " + id);
    }
}

Main.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static void main(String[] args) {
        // 10個のスレッドから構成されるスレッドプールを生成
        ExecutorService executor = Executors.newFixedThreadPool(10);

        // タスクをスレッドプールに送信
        for (int i = 0; i < 10; i++) {
            Runnable task = new Task(i);
            // 戻り値がない場合、executor.execute()としてもOK
            executor.submit(task);
        }

        // >> タスクを実行します id: 5
        // >> タスクを実行します id: 8
        // >> タスクを実行します id: 3
        // >> タスクを実行します id: 9
        // >> タスクを実行します id: 0
        // >> タスクを実行します id: 2
        // >> タスクを実行します id: 6
        // >> タスクを実行します id: 4
        // >> タスクを実行します id: 7
        // >> タスクを実行します id: 1

        // スレッドプールをシャットダウン(新しいタスクの送信を受け入れ停止し、既存のタスク実行が終了したらスレッドプールを破棄する)
        executor.shutdown();
    }
}

非同期処理が戻り値を返すとき

戻り値がある場合、Runnable ではなく Callble<T>インターフェース を使用する。(Tは戻り値の型)

ExecutorServiceクラスのsubmit()は、Future<T>オブジェクトを返却する。

Future<T>オブジェクトにはget()join()メソッドがあり、これらのメソッドによって、非同期処理の結果を取得することができる。

get()join()メソッドはどちらも非同期処理の完了を待つメソッドだが、get()InterruptedExceptionExecutionExceptionをスローする可能性があるのに対して、join()はそのようなチェック例外(try-catchが必要な例外)をスローしない。

join()を使う際には、非同期処理が正常に完了することが保証された場合に使用し、非同期処理が例外をスローしたり、キャンセルされる場合などの例外処理が必要な場合はget()を使う。

Task.java
import java.util.concurrent.Callable;

// 戻り値がある場合、Runnable ではなく Callble<T> を使用する(Tは戻り値の型)
public class Task implements Callable<Integer> {
    private int id;

    public Task(int id) {
        this.id = id;
    }

    // Callble では call() をオーバーライドする
    @Override
    public Integer call() throws Exception {
        return id;
    }
}
Main.java
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {
    public static void main(String[] args) {
        // 10個のスレッドから構成されるスレッドプールを生成
        ExecutorService executor = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 10; i++) {
            Callable task = new Task(i);
            // タスクをスレッドプールに送信
            Future<Integer> future = executor.submit(task);
            try {
                // 結果を取得する
                Integer taskId = future.get();
                System.out.println("タスクを実行します" + " id: " + taskId);
            } catch (Exception e) {
                // 例外処理
            }
        }

        // >> タスクを実行します id: 0
        // >> タスクを実行します id: 1
        // >> タスクを実行します id: 2
        // >> タスクを実行します id: 3
        // >> タスクを実行します id: 4
        // >> タスクを実行します id: 5
        // >> タスクを実行します id: 6
        // >> タスクを実行します id: 7
        // >> タスクを実行します id: 8
        // >> タスクを実行します id: 9

        // スレッドプールをシャットダウン(新しいタスクの送信を受け入れない)
        executor.shutdown();
    }
}

非同期処理が戻り値を返すとき(ラムダ式)

ラムダ式を使用してCallable<T>を実装した場合、戻り値はsubmit()の第2引数に渡すことで、Future<T>オグジェクトから取得する事ができる。

Main.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 10; i++) {
            // ラムダ式を使ってCallabel<T>を実装する
            Future<Integer> future = executor.submit(() -> {
                // 処理
            }, i); // submit()の第2引数 i が戻り値になる

            try {
                // ここで i を取得している
                Integer taskId = future.get();
                System.out.println("タスクを実行します" + " id: " + taskId);
            } catch (Exception e) {
                // 例外処理
            }
        }

        // >> タスクを実行します id: 0
        // >> タスクを実行します id: 1
        // >> タスクを実行します id: 2
        // >> タスクを実行します id: 3
        // >> タスクを実行します id: 4
        // >> タスクを実行します id: 5
        // >> タスクを実行します id: 6
        // >> タスクを実行します id: 7
        // >> タスクを実行します id: 8
        // >> タスクを実行します id: 9

        // スレッドプールをシャットダウン(新しいタスクの送信を受け入れない)
        executor.shutdown();
    }
}

複数の非同期処理のハンドリング

CompletableFuture

非同期処理を扱うときに、

ある非同期処理(A)が成功したら、その実行結果(A)を利用して、また別の処理(B)を実行したい

という場合がある。

複数の同期処理、非同期処理の実行順を制御したい場合には、CompletableFutureを利用する。

CompletableFutureはJava8以降のjava.util.concurrentパッケージに含まれる。

まず、最初に実行する非同期処理(コールバック関数)をCompletableFutureクラスの静的メソッドCompletableFuture.supplyAsync()に渡す必要がある。

ここでのポイントは、RunnableCallableを使用したときのように、開発者が明示的にスレッドを生成したりしなくても非同期処理が実現できること。

CompletableFutureを使うと、スレッドの生成処理を記述しなくて良い

CompletableFuture.supplyAsync()に渡したコールバック関数は、JavaのForkJoinPoolと呼ばれるデフォルトのスレッドプールが管理してくれる。

非同期処理の管理、スレッドの再利用、効率的なスケジューリングなどをForkJoinPoolが行ってくれることにより、開発者は手動でスレッドの生成や管理をする必要がなくなり、より直感的で簡単な非同期プログラミングが可能になる

直列実行

CompletableFuture.supplyAsync()CompletableFuture<T>型のオブジェクトを返却する。

返却されたオブジェクトには、thenAccept()thenAcceptAsync()といったメソッドが用意されているため、これらのメソッドにさらにコールバック関数を渡すことで、後続の処理を登録していくことができる。

thenAccept()は同期処理をコールバック関数として登録する際に使用するのに対して、thenAcceptAsync()は非同期処理をコールバック関数をして登録する際に使用する。

thenAccept()に登録した処理は、CompletableFuture.supplyAsync()に登録した非同期処理と同じスレッド上で実行されるのに対して、thenAcceptAsync()では、ForkJoinPoolによって、プラットフォームや環境によっては新しいスレッドがスレッドプールから割り当てられる可能性がある。そのため、非同期処理の後続に、さらに別の非同期処理が続く場合にはthenAcceptAsyncを使用する。

thenAccept()thenAcceptAsync()が戻り値を持たないのに対して、処理をさらにつなげたい場合には、CompletableFuture<T>型を戻り値に持つthenApply()thenApplyAsync()を使用する。イメージとしては下の図のような形。

Async.png

Main.java
import java.util.concurrent.CompletableFuture;

public class Main {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task1を実行します。");
            return "[Task1]";
        }).thenApplyAsync((result) -> {
            System.out.println("Task2を実行します。");
            return result + "[Task2]";
        }).thenApplyAsync((result) -> {
            System.out.println("Task3を実行します。");
            return result + "[Task3]";
        }).thenAcceptAsync((result) -> {
            System.out.println("完了したタスク:" + result);
        });

        // メインスレッドが終了しないように、非同期処理の完了を待つ
        future.join();

        // >> Task1を実行します。
        // >> Task2を実行します。
        // >> Task3を実行します。
        // >> 完了したタスク:[Task1][Task2][Task3]
    }
}

並列実行

CompletableFutureには、複数の非同期処理を並列で実行するためのメソッドallOf()がある。allOf()を使用することで、複数の非同期処理が全て完了したときの処理を登録することができる。

並列実行
import java.util.concurrent.CompletableFuture;

public class Main {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task1を実行します。");
            return "[Task1]";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task2を実行します。");
            return "[Task2]";
        });

        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task3を実行します。");
            return "[Task3]";
        });

        CompletableFuture.allOf(future1, future2, future3);

        // 全ての非同期処理の完了を待つ
        futures.join();

        String result1 = future1.join();
        String result2 = future2.join();
        String result3 = future3.join();

        System.out.println("完了したタスク:" + result1 + result2 + result3);

        // ⭐️並列実行なので、タスクの実行順は毎回変わる
        // >> Task1を実行します。
        // >> Task3を実行します。
        // >> Task2を実行します。
        // >> 完了したタスク:[Task1][Task2][Task3]
    }
}

allOf()CompletableFuture<T>を返すので、さらにその後に直列処理をつなげることもできる。下の図のようなイメージ。

Async_2.png

並列処理と直列処理
import java.util.concurrent.CompletableFuture;

public class Main {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task1を実行します。");
            return "[Task1]";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task2を実行します。");
            return "[Task2]";
        });

        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task3を実行します。");
            return "[Task3]";
        });

        CompletableFuture.allOf(future1, future2, future3)
                .whenCompleteAsync((voidResult, ex) -> {
                    if (ex == null) {
                        System.out.println("Task1、Task2、Tsk3が完了しました。");
                    } else {
                        // 例外処理
                    }
                }).thenApplyAsync((result) -> {
                    System.out.println("後続処理を実行します。");
                    return "[後続処理]";
                }).thenAcceptAsync((result) -> {
                    System.out.println("最後の処理を実行します。");
                });


        // ⭐️並列実行なので、タスクの実行順は毎回変わる
        // >> Task2を実行します。
        // >> Task3を実行します。
        // >> Task1を実行します。
        // >> Task1、Task2、Tsk3が完了しました。
        // >> 後続処理を実行します。
        // >> 最後の処理を実行します。
    }
}

同期プリミティブ

複数のスレッドで実行される処理同士の実行タイミングを一致させることを同期化という。

CompletableFutureを使った簡潔な記述で非同期処理を制御することもできるが、Javaにはバリアーというさらに低級な概念が存在する。

CompletableFutureが高級な非同期処理のためのフレームワークであるのに対して、バリアーを使った記述は低級でプリミティブな処理と言える。

参考

Head First Java 第2版 ―頭とからだで覚えるJavaの基本
独習Java 新版
徹底攻略Java SE 11 Gold問題集[1Z0-816]対応

4
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
5