LoginSignup
13
11

More than 5 years have passed since last update.

Java 単純な分散処理(複数のJVM間のSocket通信+ExecutorService)

Last updated at Posted at 2016-02-10

はじめに

1台のサーバでさばききれない業務を、
複数のサーバで起動したアプリケーションを連携させてこなす例です。
非常に単純な構造&単純なSocket通信+ExecutorService管理によるマルチスレッドです。

こんな構造になっています。

  • 仕事割振担当
    • →仕事受付担当(1)
      • →いくつかの子スレッドで実務処理
    • →仕事受付担当(2)
      • →いくつかの子スレッドで実務処理

仕事割振担当と仕事受付担当の間でSocket通信し、
仕事割振担当が仕事受付担当それぞれに10個ずつの仕事を割り当てます。
仕事受付担当は自身で仕事をこなすのではなく、子スレッドを生成して実務にあたらせます(たらいまわしの例ではありません・・・)。

実装例

仕事割振担当クラス(GridTest)定義

物理的なサーバを複数台用意することが難しかったため、
すべて同一のサーバ上で実施することになりましたが、
仕事受付担当それぞれで異なるポートで待機していますので、「受付場所」は異なります。

スレッドの管理にはExecutors#newCachedThreadPoolを使っていますので、
10個ずつx2=計20個の仕事の割振(Socket通信)は一気に処理されていきます。
結果の戻り方については、後述する仕事受付担当と、仕事受付担当子スレッドの実装を見てみましょう。

GridTest.java
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *
 * @author tool-taro.com
 */
public class GridTest implements Callable<String> {

    public static void main(String[] args) throws IOException {

        //仕事をさせる対象のGrid(サーバ+ポート)のリスト
        InetSocketAddress[] gridList = new InetSocketAddress[]{
            new InetSocketAddress("127.0.0.1", 5100),
            new InetSocketAddress("127.0.0.1", 5101)
        };

        ExecutorService service = Executors.newCachedThreadPool();
        GridTest gridTest;

        //すべてのGridに10回ずつ仕事を投げる
        for (int i = 0; i < 10; i++) {
            for (InetSocketAddress gridList1 : gridList) {
                gridTest = new GridTest(gridList1);
                service.submit(gridTest);
            }
        }
    }

    private InetSocketAddress address = null;

    public GridTest(InetSocketAddress address) {
        this.address = address;
    }

    //GridへのSocketをオープンし、request(=仕事)を投げ、response(=結果)を受け取る
    @Override
    public String call() throws Exception {
        Socket socket = null;
        BufferedWriter writer = null;
        BufferedReader reader = null;
        String request = "Hello, who are you?";

        try {
            socket = new Socket();
            socket.connect(this.address, 300000);
            writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), "UTF-8"));
            reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"));

            writer.write(request + "\n");
            writer.flush();
            System.out.println(System.currentTimeMillis() + " " + "request to: " + this.address.getHostName() + ":" + this.address.getPort() + "[" + request + "]");
            String response = reader.readLine();
            System.out.println(System.currentTimeMillis() + " " + "response from: " + this.address.getHostName() + ":" + this.address.getPort() + " [" + response + "]");
        }
        finally {
            if (reader != null) {
                try {
                    reader.close();
                }
                catch (IOException e) {
                }
            }
            if (writer != null) {
                try {
                    writer.close();
                }
                catch (IOException e) {
                }
            }
            if (socket != null) {
                try {
                    socket.close();
                }
                catch (IOException e) {
                }
            }
        }

        //戻り値で呼び出し元の挙動に変化を及ぼすこともできる(今回は使っていない)
        return "ok";
    }
}

仕事受付担当クラス(Grid)定義

仕事受付担当は、起動時に指定されたTCPポートに対するSocket接続を待ち続けます。
Socket接続が到達すると、子スレッド(GridProcess)を生成し、
スレッド実行を管理するExecutorsに委ねますが、
このクラスではExecutors#newFixedThreadPoolを使っており、
その名前から想定できる通り、最大並列処理数に制限があります。
今回は最大並列処理数を2としていますので、少しずつ処理されていく状況が予想されます。

Grid.java
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *
 * @author tool-taro.com
 */
public class Grid {

    public static void main(String[] args) throws IOException {
        /*
            引数1: Grid名(任意の名前でよい)
            引数2: 使用するTCPポート番号
         */
        Grid grid = new Grid(args[0], Integer.parseInt(args[1]));
        grid.start();
    }

    private String name = null;
    private int port = -1;

    public Grid(String name, int port) {
        this.name = name;
        this.port = port;
    }

    //メインスレッドでSocketのコネクションが到達するまで待機する
    public void start() throws IOException {
        System.out.println(System.currentTimeMillis() + " " + this.name + ": grid started");

        ServerSocket server = null;
        Socket socket;

        //GridProcess(実際に仕事をこなす子スレッド)の最大同時起動数は2とする
        ExecutorService service = Executors.newFixedThreadPool(2);
        GridProcess process;
        int counter = 0;

        try {
            server = new ServerSocket(this.port);
            while (true) {
                while (true) {
                    //Socketを受け付けるとループを抜け、GridProcessに引き渡される
                    try {
                        socket = server.accept();
                        socket.setSoTimeout(30000);
                        break;
                    }
                    catch (IOException e) {
                        try {
                            Thread.sleep(1000);
                        }
                        catch (InterruptedException e2) {
                        }
                    }
                }

                //最大同時起動数に達するまで子スレッドが作られ、並行して仕事を実行する
                process = new GridProcess(this.name + "_" + ++counter, socket);
                service.submit(process);
            }
        }
        finally {
            if (server != null) {
                try {
                    server.close();
                }
                catch (IOException e) {
                }
            }
        }
    }
}

子スレッドクラス(GridProcess)定義

今回の「仕事」は、名前を尋ねられて答えるだけの簡単なお仕事なのですが、
処理の流れをわかりやすくするため、3秒かけて応答することにしました。
仕事受付担当(Grid)からSocketインスタンスを受け取っており、
直接的に仕事割振担当に結果を返す仕様となっています。

GridProcess.java
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.util.concurrent.Callable;

/**
 *
 * @author tool-taro.com
 */
public class GridProcess implements Callable<String> {

    private String name = null;
    private Socket socket = null;

    public GridProcess(String name, Socket socket) {
        this.name = name;
        this.socket = socket;
    }

    @Override
    public String call() throws Exception {
        System.out.println(System.currentTimeMillis() + " " + this.name + ": grid process started");

        BufferedReader reader = null;
        BufferedWriter writer = null;
        String request;
        String response;

        try {
            reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream(), "UTF-8"));
            writer = new BufferedWriter(new OutputStreamWriter(this.socket.getOutputStream(), "UTF-8"));

            //仕事を受け取る
            request = reader.readLine();
            System.out.println(System.currentTimeMillis() + " " + this.name + ": grid process request received [" + request + "]");

            //何らかの処理を実行する
            this.doSomething();

            //結果を返す
            response = "Hello, my name is " + this.name;
            writer.write(response + "\n");
            writer.flush();
            System.out.println(System.currentTimeMillis() + " " + this.name + ": grid process response returned [" + response + "]");
        }
        finally {
            if (writer != null) {
                try {
                    writer.close();
                }
                catch (IOException e) {
                }
            }
            if (reader != null) {
                try {
                    reader.close();
                }
                catch (IOException e) {
                }
            }
        }

        //戻り値で呼び出し元の挙動に変化を及ぼすこともできる(今回は使っていない)
        return "ok";
    }

    private void doSomething() {
        //時間がかかる処理であると仮定する
        try {
            Thread.sleep(3000);
        }
        catch (InterruptedException e) {
        }
    }
}

動作確認

まず、仕事受付担当(1)と(2)を起動して待機させておきます。
出力されている行の先頭にある数値はUNIX時間(ミリ秒)です。

仕事受付担当(1)と子スレッドたち
$ java Grid serv1 5100
1455093045926 serv1: grid started
仕事受付担当(2)と子スレッドたち
$ java Grid serv2 5101
1455093050757 serv2: grid started

では、仕事割振担当を起動して、連携プレーを見てみましょう。

仕事割振担当
$ java GridTest
(仕事受付担当へのリクエストは瞬時に送信し終えている 10個x2担当=計20回)
1455093061889 request to: 127.0.0.1:5100 [Hello, who are you?]
1455093061896 request to: 127.0.0.1:5101 [Hello, who are you?]
1455093061933 request to: 127.0.0.1:5101 [Hello, who are you?]
1455093061926 request to: 127.0.0.1:5100 [Hello, who are you?]
1455093061926 request to: 127.0.0.1:5101 [Hello, who are you?]
1455093061925 request to: 127.0.0.1:5100 [Hello, who are you?]
1455093061925 request to: 127.0.0.1:5101 [Hello, who are you?]
1455093061925 request to: 127.0.0.1:5100 [Hello, who are you?]
1455093061924 request to: 127.0.0.1:5101 [Hello, who are you?]
1455093061924 request to: 127.0.0.1:5100 [Hello, who are you?]
1455093061924 request to: 127.0.0.1:5101 [Hello, who are you?]
1455093061923 request to: 127.0.0.1:5100 [Hello, who are you?]
1455093061923 request to: 127.0.0.1:5101 [Hello, who are you?]
1455093061922 request to: 127.0.0.1:5100 [Hello, who are you?]
1455093061922 request to: 127.0.0.1:5101 [Hello, who are you?]
1455093061921 request to: 127.0.0.1:5100 [Hello, who are you?]
1455093061903 request to: 127.0.0.1:5101 [Hello, who are you?]
1455093061901 request to: 127.0.0.1:5100 [Hello, who are you?]
1455093061901 request to: 127.0.0.1:5101 [Hello, who are you?]
1455093061899 request to: 127.0.0.1:5100 [Hello, who are you?]

(仕事受付担当子スレッドからのレスポンスは少しずつ受信している 10個x2担当=計20回)
1455093064948 response from: 127.0.0.1:5100 [Hello, my name is serv1_1]
1455093064959 response from: 127.0.0.1:5101 [Hello, my name is serv2_1]
1455093064998 response from: 127.0.0.1:5101 [Hello, my name is serv2_2]
1455093064999 response from: 127.0.0.1:5100 [Hello, my name is serv1_2]
1455093067956 response from: 127.0.0.1:5100 [Hello, my name is serv1_3]
1455093067966 response from: 127.0.0.1:5101 [Hello, my name is serv2_3]
1455093068002 response from: 127.0.0.1:5100 [Hello, my name is serv1_4]
1455093068006 response from: 127.0.0.1:5101 [Hello, my name is serv2_4]
1455093070960 response from: 127.0.0.1:5100 [Hello, my name is serv1_5]
1455093070970 response from: 127.0.0.1:5101 [Hello, my name is serv2_5]
1455093071007 response from: 127.0.0.1:5100 [Hello, my name is serv1_6]
1455093071010 response from: 127.0.0.1:5101 [Hello, my name is serv2_6]
1455093073965 response from: 127.0.0.1:5100 [Hello, my name is serv1_7]
1455093073984 response from: 127.0.0.1:5101 [Hello, my name is serv2_7]
1455093074014 response from: 127.0.0.1:5101 [Hello, my name is serv2_8]
1455093074015 response from: 127.0.0.1:5100 [Hello, my name is serv1_8]
1455093076967 response from: 127.0.0.1:5100 [Hello, my name is serv1_9]
1455093076986 response from: 127.0.0.1:5101 [Hello, my name is serv2_9]
1455093077020 response from: 127.0.0.1:5101 [Hello, my name is serv2_10]
1455093077021 response from: 127.0.0.1:5100 [Hello, my name is serv1_10]
(すべてのレスポンスを受信し、アプリケーション終了)

仕事割振担当側のログを見る限り、きちんと連携できたようです。

仕事受付担当側のログも念のため見ておきましょう。

仕事受付担当(1)と子スレッドたち
$ java Grid serv1 5100
1455093045926 serv1: grid started

(ここでSocket接続があるまで待機している)

(仕事割振担当からのリクエスト20件がまとめて到達しているが、
 同時に処理できる子スレッド数は最大2のため、少しずつ進行する
 このとき、スレッドの生成の微妙なタイミングによって、必ずしも通番順とはならない)
1455093061936 serv1_1: grid process started
1455093061946 serv1_1: grid process request received [Hello, who are you?]
1455093061995 serv1_2: grid process started
1455093061998 serv1_2: grid process request received [Hello, who are you?]
1455093064948 serv1_1: grid process response returned [Hello, my name is serv1_1]
1455093064952 serv1_3: grid process started
1455093064954 serv1_3: grid process request received [Hello, who are you?]
1455093064998 serv1_2: grid process response returned [Hello, my name is serv1_2]
1455093064999 serv1_4: grid process started
1455093065001 serv1_4: grid process request received [Hello, who are you?]
1455093067955 serv1_3: grid process response returned [Hello, my name is serv1_3]
1455093067957 serv1_5: grid process started
1455093067957 serv1_5: grid process request received [Hello, who are you?]
1455093068001 serv1_4: grid process response returned [Hello, my name is serv1_4]
1455093068002 serv1_6: grid process started
1455093068004 serv1_6: grid process request received [Hello, who are you?]
1455093070958 serv1_5: grid process response returned [Hello, my name is serv1_5]
1455093070963 serv1_7: grid process started
1455093070964 serv1_7: grid process request received [Hello, who are you?]
1455093071007 serv1_6: grid process response returned [Hello, my name is serv1_6]
1455093071009 serv1_8: grid process started
1455093071014 serv1_8: grid process request received [Hello, who are you?]
1455093073964 serv1_7: grid process response returned [Hello, my name is serv1_7]
1455093073966 serv1_9: grid process started
1455093073966 serv1_9: grid process request received [Hello, who are you?]
1455093074015 serv1_8: grid process response returned [Hello, my name is serv1_8]
1455093074018 serv1_10: grid process started
1455093074019 serv1_10: grid process request received [Hello, who are you?]
1455093076967 serv1_9: grid process response returned [Hello, my name is serv1_9]
1455093077020 serv1_10: grid process response returned [Hello, my name is serv1_10]

^C ←Grid側はSocket接続を待ち続ける実装なので強制終了
仕事受付担当(2)と子スレッドたち
$ java Grid serv2 5101
1455093050757 serv2: grid started

(ここでSocket接続があるまで待機している)

(仕事割振担当からのリクエスト20件がまとめて到達しているが、
 同時に処理できる子スレッド数は最大2のため、少しずつ進行する
 このとき、スレッドの生成の微妙なタイミングによって、必ずしも通番順とはならない)
1455093061953 serv2_1: grid process started
1455093061956 serv2_1: grid process request received [Hello, who are you?]
1455093061961 serv2_2: grid process started
1455093061997 serv2_2: grid process request received [Hello, who are you?]
1455093064958 serv2_1: grid process response returned [Hello, my name is serv2_1]
1455093064959 serv2_3: grid process started
1455093064965 serv2_3: grid process request received [Hello, who are you?]
1455093064998 serv2_2: grid process response returned [Hello, my name is serv2_2]
1455093065001 serv2_4: grid process started
1455093065004 serv2_4: grid process request received [Hello, who are you?]
1455093067966 serv2_3: grid process response returned [Hello, my name is serv2_3]
1455093067967 serv2_5: grid process started
1455093067968 serv2_5: grid process request received [Hello, who are you?]
1455093068005 serv2_4: grid process response returned [Hello, my name is serv2_4]
1455093068007 serv2_6: grid process started
1455093068009 serv2_6: grid process request received [Hello, who are you?]
1455093070969 serv2_5: grid process response returned [Hello, my name is serv2_5]
1455093070980 serv2_7: grid process started
1455093070983 serv2_7: grid process request received [Hello, who are you?]
1455093071010 serv2_6: grid process response returned [Hello, my name is serv2_6]
1455093071013 serv2_8: grid process started
1455093071013 serv2_8: grid process request received [Hello, who are you?]
1455093073983 serv2_7: grid process response returned [Hello, my name is serv2_7]
1455093073984 serv2_9: grid process started
1455093073985 serv2_9: grid process request received [Hello, who are you?]
1455093074014 serv2_8: grid process response returned [Hello, my name is serv2_8]
1455093074017 serv2_10: grid process started
1455093074018 serv2_10: grid process request received [Hello, who are you?]
1455093076986 serv2_9: grid process response returned [Hello, my name is serv2_9]
1455093077019 serv2_10: grid process response returned [Hello, my name is serv2_10]

^C ←Grid側はSocket接続を待ち続ける実装なので強制終了

少し込み入ったログとなってしまいましたが、

  • Socket通信を通じて「仕事と結果(今回はただの文字列)」のやり取りが簡単にできること
  • 複数のJVMそれぞれにおいて子スレッドを生成し、処理能力を高められる可能性

を確認できたので終了とします。

環境

  • 開発

    • Windows 10 Pro
    • JDK 1.8.0_112
    • NetBeans IDE 8.2
  • 動作検証

    • CentOS Linux release 7.2
    • JDK 1.8.0_112

Webツールも公開しています。
Web便利ツール@ツールタロウ

13
11
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
11