LoginSignup
2
1

More than 5 years have passed since last update.

Azure Service Fabric を使う(2)Stateless Service

Last updated at Posted at 2016-11-15

今までの Azure 関連の記事は以下。

Azure Container Serviceを使う(1) 基本のデプロイ編
Azure Container Serviceを使う(2) 自動化してみる編
Azure Service Facricを使う(1) 利用するための環境を揃える

StatelessとStateful

Service Fabric では、Reliable ServiceとReliable Actorというアーキテクチャの違いと、 StatelessStateful という区別があります。ですので、組み合わせ的には、以下の4つのサービスを作ることが出来ます。

  • Reliable Stateless Service
  • Reliable Stateful Service
  • Reliable Stateless Actor
  • Reliable Stateful Actor

今回は、この組み合わせのうち、 Reliable Stateless Service と Reliable Stateless Actor をJavaで作ってみようかと思ったんですが、予想外に長くなってしまったので、Reliable Stateless Serviceだけになりました。

Reliable Service

Reliable Service とは、Microsoft Azureにおいて以下のように説明されています。

使い慣れたプログラミング モデルに似た、独自のコードを実行するためのシンプルなモデル。コードには、適切に定義されたエントリ ポイントと管理が簡単なライフサイクルが含まれます。
https://azure.microsoft.com/ja-jp/documentation/articles/service-fabric-reliable-services-introduction/

Reliable = 信頼性のある、と謳っていることもあって、特徴として

  • 信頼性
  • 可用性
  • スケーラビリティ
  • 整合性(Statefulだけ)

が挙げられています。既存の作り方によく似た作り方で、可用性とか信頼性を簡単に得られることが利点ってなりますね。

(事前作業)Service Fabric SDKのコピー

Vagrantの中でEclipseを立ち上げて〜という場合は問題ないですが、大抵はそうではないと思います。テンプレートから作ると、当然Vagrant内の環境におけるパスが利用されます。そうなるとホスト側からSDKの解決が出来ないため、Eclipseで補完とか使えません。

ですので、すごい適当ではありますが、SDKをホストから見える場所にコピーしちゃいましょう。

$ cp -r /opt/microsoft/sdk/servicefabric/java/packages/lib /vagrant/servicefabricsdk

こうしておけば、ホスト側からもSDKが見えるようになります。

Reliable Stateless Serviceを作ってみる

では早速作っていきましょう。すでにVagrant上に環境がある前提で進みます。

Vagrant上の任意のディレクトリで、以下のコマンドを実行します。

$ yo azuresfjava

色々聞かれますが、 Reliable Stateless Service を選んでいれば、後はだいたい何でもいいかと思います。ここでは、 Application: sampleService: base64 という名前にしたという前提で進めます。

生成すると、こんな感じのディレクトリ構造ができます。

./sample
|--.classpath
|--.project
|--.settings
| |--org.eclipse.buildship.core.prefs
|--.yo-rc.json
|--base64
| |--.classpath
| |--.gitignore
| |--.project
| |--.settings
| | |--org.eclipse.buildship.core.prefs
| |--bin
| | |--statelessservice
| | | |--base64Service.class
| | | |--base64ServiceHost.class
| |--build.gradle
| |--src
| | |--statelessservice
| | | |--base64Service.java
| | | |--base64ServiceHost.java
|--build.gradle
|--install.sh
|--sample
| |--ApplicationManifest.xml
| |--base64Pkg
| | |--Code
| | | |--_readme.txt
| | | |--entryPoint.sh
| | |--Config
| | | |--_readme.txt
| | |--Data
| | | |--_readme.txt
| | |--ServiceManifest.xml
|--settings.gradle
|--uninstall.sh

この中の base64 ディレクトリが、実際のStateless Serviceです。Service Fabricのテンプレート的には、Eclipseの利用が推奨されているようです。プラグインとかもありますが、とりあえず生のEclipse for Javaで進めます。

さて、プロジェクトのインポート自体は、Import -> Gradle Project でインポートできますが、事前に一つやっておくことがあります。stateless/build.gradleの中に書いてあるSDKのパスを、以下のように書き換えます。

apply plugin: 'java'
apply plugin: 'eclipse'

sourceSets {
  main {
     java.srcDirs = ['src']
     output.classesDir = 'out/classes'
      resources {
       srcDirs = ['src']
     }
   }
}

// これを追加
def sfsdkdir = "../../servicefabricsdk"

// SDKのディレクトリをそれぞれ書き換える
dependencies {
    compile fileTree(dir: sfsdkdir, include: ['*.jar'])
}

clean.doFirst {
    delete "${rootDir}/out"
}

jar {
    manifest {
    attributes(
        'Main-Class': 'statelessservice.statelessServiceHost',
        "Class-Path": configurations.compile.collect { 'lib/' + it.getName() }.join(' '))
    baseName "stateless"
    destinationDir = file('../sample/statelessPkg/Code/')
    }
}

task copyDeps<< {
    copy {
        from(sfsdkdir)
        into("../sample/statelessPkg/Code/lib")
        include('*.jar')
    }
    copy {
        from(sfsdkdir)
        into("../sample/statelessPkg/Code/lib")
        include('*.so')
    }
    copy {
        from(sfsdkdir)
        into("../sample/statelessPkg/Code/lib")
        include('*.dll')
    }
}


defaultTasks 'clean', 'jar', 'copyDeps'

この書き換えをしてから、Importすると、コンパイルエラーが無くなるかと思います。

では実装を進めてみましょう。仕様としては、シンプルにパラメータをbase64化するサービスとします。

サービスのエントリポイント

Stateless Serviceでは、サービスのエントリポイントが2つあります。

  • runAsync
    • サービスのインスタンスが配置されてから最初に実行される
  • createServiceInstanceListeners
    • 外部からの通信に対するエントリポイントを作成する

C# 版との大きな違いとしては、runAsyncを停止する手段が、CanchcellationTokenではなく、CompletableFutureを返すだけ、という形になっています。

protected override async Task RunAsync(CancellationToken cancellationToken)
{
    ...
}

Javaだと、同じメソッドがこんなシグネチャになっています。

@Override
protected CompletableFuture<?> runAsync() {
    return super.runAsync();
}

これは、C#ではasync/awaitという、非同期に対する標準的な構文が用意されているのに対して、Javaにはそういうものがなく、Futureの類を利用する必要があるためだと思われます。出来る事自体はそう変わらないとは思いますが。

runAsyncは、いわば Runnable::run みたいな感じです。runAsyncが終了しても、サービス自体は終了しません。他のサービスと通信しないサービスであれば、ここを実装するだけでOKです。(そんなサービスなんて滅多に無いと思いますが・・・)。

サービスのエンドポイント

サービス間の通信や、Service Fabric外からの通信などは、全て createServiceInstanceListeners から、 CommunicationListener インターフェースを実装したクラスを返す必要があります。

Service Fabricの中でだけ完結するような通信=サービス間通信については、Service FabricのSDKから提供されています。しかし、HTTP通信については、C#では OWIN という仕組みを利用するためのListenerが用意されているものの、Java版にはそういったものはありませんでした・・・。

無ければ作るしか無いのが掟ですが、こういった通信系の実装ってのは結構辛い・・・。今回は、動けばいいってレベルでとりあえず作ります。

まずは、 sample/sample/base64Pkg/ServiceManifest.xml に以下を追加します。

<Resources>
    <Endpoints>
        <Endpoint Name="<好きな名前>" Type="Input" Protocol="http" Port="8281" />
    </Endpoints>
</Resources>

サービスが複数のListenerを実装している場合、ここも複数になります。

そして、HTTPでアクセスできるように、CommunicationListenerを実装します。今回はJettyを使ってみるので、build.gradleに以下を追加します。

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.eclipse.jetty:jetty-server:9.3.14.v20161028'
    compile 'org.eclipse.jetty:jetty-webapp:9.3.14.v20161028'
    ...
}

さて、肝心の実装ですが、結構な長さになっちゃいました。最初はHTTPに対するCommunicationListenerの実装です。

package statelessservice;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Logger;

import org.eclipse.jetty.server.Server;

import microsoft.servicefabric.services.communication.runtime.CommunicationListener;
import microsoft.servicefabric.services.runtime.StatelessServiceContext;
import system.fabric.FabricRuntime;
import system.fabric.ServiceContext;
import system.fabric.description.EndPointProtocol;
import system.fabric.description.EndpointResourceDescription;

public class CustomHttpCommunicationListener implements CommunicationListener {

    private static Logger logger = Logger.getLogger("base64api");

    private ServiceContext serviceContext;
    private String endpointName;
    private String listeningAddress;
    private String appRoot;
    private String publishAddress;

    private Server server;

    public CustomHttpCommunicationListener(ServiceContext serviceContext, String endpointName, String appRoot) {
        this.serviceContext = serviceContext;
        this.endpointName = endpointName;
        this.appRoot = appRoot;
    }

    @Override
    public void abort() {
        try {
            this.server.stop();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public CompletableFuture<?> closeAsync() {
        CompletableFuture<Integer> future = new CompletableFuture<Integer>();

        try {
            this.server.stop();
            future.complete(0);
        } catch (Exception e) {
            future.completeExceptionally(e);
        }
        return future;
    }

    @Override
    public CompletableFuture<String> openAsync() {
        EndpointResourceDescription serviceEndpoint = this.serviceContext.codePackageActivationContext()
                .getEndpoint(this.endpointName);
        EndPointProtocol protocol = serviceEndpoint.getProtocol();
        int port = serviceEndpoint.getPort();

        if (this.serviceContext instanceof StatelessServiceContext) {
            this.listeningAddress = String.format("%s://+:%s/%s", protocol, port,
                    this.appRoot == null || this.appRoot.equals("") ? "" : this.appRoot.replaceAll("/$", "") + "/");
            logger.info("Listening address: " + listeningAddress);
        }

        this.publishAddress = this.listeningAddress.replace("+", FabricRuntime.getNodeContext().getIpAddressOrFQDN());
        logger.info("Publish address: " + publishAddress);

        logger.info("Starting web server on " + this.listeningAddress);
        InetSocketAddress addr = new InetSocketAddress(FabricRuntime.getNodeContext().getIpAddressOrFQDN(), port);
        this.server = new Server(addr);
        server.setHandler(new SimpleHttpHandler());

        CompletableFuture<String> future = CompletableFuture.completedFuture(this.publishAddress);
        try {
            server.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        return future;
    }

}

CustomHttpCommunicationListenerの実装におけるポイントは、openAsyncメソッドの中でJettyのサーバーを起動している点になります。この部分、C#とあんまり変わりがないのが助かりました・・・。

HTTPのハンドラは、物凄い簡単に実装しました。この辺は、JAX-RSを利用したりするともっとシンプルに出来るかと。

package statelessservice;

import java.io.IOException;
import java.util.Base64;
import java.util.stream.Collectors;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;

public class SimpleHttpHandler extends AbstractHandler {

    @Override
    public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
            throws IOException, ServletException {
        if (!"POST".equals(request.getMethod())) {
            response.setStatus(405);
            baseRequest.setHandled(true);

            return;
        }

        String body = request.getReader().lines().collect(Collectors.joining("\n"));
        String encoded = Base64.getUrlEncoder().encodeToString(body.getBytes());
        response.setStatus(200);
        response.getWriter().write(encoded);
        response.setContentType(MimeTypes.Type.TEXT_PLAIN_UTF_8.getCharsetString());
        baseRequest.setHandled(true);
    }

}

実装したCommunicationListenerを、Serviceで利用します。

package statelessservice;

import java.util.ArrayList;
import java.util.List;

import microsoft.servicefabric.services.communication.runtime.ServiceInstanceListener;
import microsoft.servicefabric.services.runtime.StatelessService;

public class base64Service extends StatelessService {

    @Override
    protected List<ServiceInstanceListener> createServiceInstanceListeners() {
        List<ServiceInstanceListener> listeners = new ArrayList<>();
        listeners.add(new ServiceInstanceListener(
                context -> new CustomHttpCommunicationListener(context, "endpoint", ""), "endpoint"));
        return listeners;
    }
}

runAsyncでは何もしないのでOverrideしないようにします。 ServiceInstanceListener のコンストラクタにおける第二引数が、ServiceManifest.xmlで設定したEndpoint名と一致するように注意しないと、多分デプロイ後に落ちます。

ここまで出来たら、以下の上二行でインストールできます。

$ gradle
$ ./install.sh
$ curl -X POST http://localhost:8281 -d "foobar?"
Zm9vYmFyPw==  # 動いた!

デフォルトだと、サービスのデプロイ設定が Singletonとなっているので、Fabric ExplorerからNodeを落としても、ちゃんと再度立ち上がります。当然、同じアドレスでAPIにアクセスできます。

まとめ

Reliable Stateless Serviceを、HTTPで通信できるAPIとして実装してみました。

C#とJavaのSDKでは、HTTPを利用する機構が共通であるかどうか、というのがここの部分における結構大きな差異じゃないかと思います。

Javaで実装していく場合、Hystrixのようなフレームワークを使った方がいいと思います。Fabric内のサービス同士であれば、C#/Javaで同じ方式が使えるようです。

次回は、Reliable Stateless Actor を、Fabric内のサービス通信と組み合わせてみたいと思います。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1