Help us understand the problem. What is going on with this article?

daprでつくるマイクロサービス

はじめに

この記事は、 富士通クラウドテクノロジーズ Advent Calendar 20202日目の記事です。

1日目は @miyuush さんの ニフクラがTerraformに対応したので使ってみた【基礎編】 でした!
昨日リリースされたばかりで生まれたてホヤホヤ感のある nifcloud/terraform-provider-nifcloud v1.0.0 ですがこれからの機能エンハンスが楽しみですね!どんどんIaCにしていきたい:yum:

改めましてこんにちは!NIFCLOUDのいくつかのサービスのAPIを開発している @kzmake と申します。
入社しサービスを開発してはや4年目になりました。ここ数年はいくつかのサービス開発を経験し、そこそこcleanでdddなアプリケーションをかけるようになってきたました :sunglasses:
最近はどうすればスピード感ある開発ができるかな〜と考えている今日このごろです。

今日は自分が使ってみたいなぁ〜と感じている

  1. github.com/dapr/dapr について
  2. daprを使ったマイクロサービスアプリケーション実装

を紹介したいと思います!

dapr とは

image.png

dapr は、 Distributed Application Runtime という名のとおりマイクロサービスアプリケーションとして必要な機能をビルディングブロックとして提供してくれるランタイムです。stable はまだ v0.11.3 と比較的若いながら、★8.4k とかなりホット :fire: な OSSプロジェクトではないかなとおもっています。そのコンセプトは、Any language, any framework, anywhere としており多様性をもった利用ができるところもポイントですね。

本来実装したいコアロジックにサイドカーとして利用することで、簡単にマイクロサービスを作成することができます。
更にそれぞれのビルディングブロックは抽象化されており、 HTTP/gRPC API を通して利用するものとなっているため言語に縛られない開発ができるのも魅力となっています。

マイクロサービスのためのビルディングブロック

dapr が現在(2020/11/29)提供しているビルディングブロックには下記のものがあります。

dapr_bilding_blocks.png

  • Service-to-service invocation: /v1.0/invoke
    • 他のマイクロサービスサービスへ通信するための機能
  • State management: /v1.0/state
    • key/valueベースの永続化や参照機能
  • Publish and subscribe: /v1.0/publish and /v1.0/subscribe
    • Publish/subscribeモデルで非同期にメッセージを送受信する機能
  • Resource bindings: /v1.0/bindings
    • 外部コンポーネントやサービスを抽象化しイベントの送受信を行う機能
  • Actors: /v1.0/actors
    • 分散性や並行・並列性をもち、非同期なメッセージ駆動のアクターモデルを提供
  • Observability
    • ログ・トレース・メトリクス・ヘルスチェックといったオブザーバビリティに必要な要素を提供
  • Secrets: /v1.0/secrets
    • 安全にパスワードなどのクレデンシャルなデータにアクセスする機能

それぞれのコンポーネントはライブラリとしてアプリケーションに組み込むのではなく、yamlのコンポーネント定義ファイルをロードさせることで利用することができます。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.redis
  metadata:
  - name: redisHost
    value: redis:6379
  - name: redisPassword
    value: ""

ビルディングブロックとしての提供なので、実装に一切手を加えず、検証環境ではredis、本番環境では何かしらのクラウドサービスなど切り替えもできるのがいいですね:tada:

ミドルウェア(http)

ミドルウェアも各種ビルディングブロックと同様にコンポーネントを定義することで利用可能となっています。

apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
  name: pipeline
  namespace: default
spec:
  httpPipeline:
    handlers:
    - name: oauth2
      type: middleware.http.oauth2
    - name: uppercase
      type: middleware.http.uppercase

特徴的なのが、 ratelimitoauth2 など定義済みのミドルウェアだけでなく、 Open Policy Agent(OPA) を用いてミドルウェアを追加できるようです( ex: https://play.openpolicyagent.org/p/oRIDSo6OwE )。提供されているミドルウェアは https://github.com/dapr/components-contrib/blob/master/middleware/http で確認できます。

サポートしているSDK

daprが提供する HTTP/gRPC API にアクセスするためのsdkを利用することもできます。提供されているSDKは、

となってます。今後のロードマップはこんな感じらしい。

ダッシュボード

image.png

まだ機能は少ないですが、ダッシュボードも利用できるようです。今後の機能追加も期待ですね!

IDEのサポート

言語サポートだけでなく、IDEのサポートもあるみたいです:tada:

んでなにを作ったの?

time.Now() など現在時刻を直接取得せず、マイクロサービスでシンプルな
時計サービス:timer: : github.com/kzmake/dapr-clock
をつくってみました(同期にこのアイデアをいただきました)!

$ docker-compose up -d --build
$ docker-compose exec dev curl -s http://web/now | jq .
{
  "hour": 15,
  "minute": 33,
  "second": 25
}

上記のコードは、時計をざっくりとモデリングし…

  • clock: 現在時刻を取得するサービス
    • /v1.0/clock/invoke/now で時刻を取得するAPIを提供
      • hour-hand/minute-hand/second-hand から針の情報を取得する
  • hour-hand: 時針を管理するサービス
    • 時刻同期のイベント に時針を設定する
    • 60分間経過のイベント を元に分針を 運針する
    • 時針を永続化する
    • 230 になると 24時間経過のイベント を発行する
  • minute-hand: 分針を管理するサービス
    • 時刻同期のイベント に分針を設定する
    • 60秒間経過のイベント を元に分針を 運針する
    • 分針を永続化する
    • 590 になると 60分間経過のイベント を発行する
  • second-hand: 秒針を管理するサービス
    • 時刻同期のイベント に秒針を設定する
    • 1秒間経過のイベント を元に秒針を 運針する
    • 秒針を永続化する
    • 590 になると 60秒間経過のイベント を発行する
  • ticker: 1秒をカウントするサービス
    • 一定間隔(1sec)毎1秒間経過のイベント を発行する
  • synchronizer: NTPを用いて時刻を同期するサービス
    • 一定間隔(24h)毎時刻同期のイベント を発行する

をマイクロサービスとして設計しています。

アーキテクチャ

daprを利用して以下のような設計してみました。

image.png

今回は、

を利用してみようとおもいます。

github.com/kzmake/dapr-clock

github.com/dapr/go-sdk を利用してdaprサイドカーと通信するマイクロサービスアプリケーションを github.com/kzmake/dapr-clock/microservices に各マイクロサービスを作成しました。各機能についてdaprでどう実装できるかを紹介していきたいと思います。

現在時刻を取得する機能

image.png

ユーザーはclockサービスを通してsecond-hand/minute-hand/hour-handサービスが管理する時針・分針・秒針を取得するとします。
また、second-hand/minute-hand/hour-handはそれぞれの針の状態を時針で持ちたくないのでdaprが提供するストア機能で永続化も試みます。ここでdaprのビルディングブロックとしては、

を利用し、dapr上でPOST /v1.0/invoke/clock/method/nowのAPIを提供します。まず、APIリクエストを受け取る部分から記述していきます。今回はアプリケーション <--> dapr間でgrpcを利用したかったのでgithub.com/dapr/go-sdk/service/grpcを使っています。

Service-to-service invocation

下記はclockサービスの実装ですが、second-hand/minute-hand/hour-hand も呼び出す handler が違うだけで差分はありません。second-hand/minute-hand/hour-handも同じようにAddServiceInvocationHandler を使って "now" のAPIを追加しています。

import (
    daprd "github.com/dapr/go-sdk/service/grpc"

    "github.com/kzmake/dapr-clock/microservices/clock/handler"
)

func main() {
    s, err := daprd.NewService(":3000")
    if err != nil {
        log.Fatalf("failed to start the server: %+v", err)
    }

    // POST /v1.0/invoke/clock/method/now
    if err := s.AddServiceInvocationHandler("now", handler.Now); err != nil {
        log.Fatalf("error adding invocation handler: %+v", err)
    }

    if err := s.Start(); err != nil {
        log.Fatalf("server error: %+v", err)
    }
}

clockサービスのhandlerは、second-hand/minute-hand/hour-handのAPIをリクエストするため、daprのビルディングブロックである Service-to-service invocation を利用します。client.InvokeServiceを利用している部分がそれにあたりますが、daprが提供する固定のエンドポイントを利用することでサービスディスカバリーを実装する必要がないようになっています!

func Now(ctx context.Context, in *common.InvocationEvent) (*common.Content, error) {
    client, err := dapr.NewClient()
    if err != nil {
        return nil, err
    }

    // POST /v1.0/invoke/hour-hand/method/now
    hourHandRes, err := client.InvokeService(ctx, "hour-hand", "now")
    if err != nil {
        return nil, err
    }

    // POST /v1.0/invoke/minute-hand/method/now
    minuteHandRes, err := client.InvokeService(ctx, "minute-hand", "now")
    if err != nil {
        return nil, err
    }

    // POST /v1.0/invoke/second-hand/method/now
    secondHandRes, err := client.InvokeService(ctx, "second-hand", "now")
    if err != nil {
        return nil, err
    }

    // ...それぞれのレスポンスの json.Unmarshal 処理してレスポンスを作成するなど

    return &common.Content{ContentType: "application/json", Data: res}, nil
}

State management

上記のリクエストを second-hand/minute-hand/hour-handサービスで受け付け、 永続化されている針を取得していきます。State management では、コンポーネントとしてyamlで定義したstatestoreを利用していきます。バックエンドとして多くのものをサポートしていますが、今回はredisを使用しています。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.redis
  metadata:
    - name: redisHost
      value: localhost:6379
    - name: redisPassword
      value: ""

State management では、metadataで設定したnamenamespaceを元にコード内で永続・参照先を指定します。今回の例ではstatestoreを指定します。参照のみなので、client.GetState(ctx, "statestore", "hour") のようにデータを取得します。
hour-handサービスのPOST /v1.0/invoke/hour-hand/method/nowを処理するハンドラーとしては下記のようになります。

import (
    dapr "github.com/dapr/go-sdk/client"
    "github.com/dapr/go-sdk/service/common"
)

func Now(ctx context.Context, in *common.InvocationEvent) (*common.Content, error) {
    client, err := dapr.NewClient()
    if err != nil {
        return nil, err
    }

    item, err := client.GetState(ctx, "statestore", "hour")
    if err != nil {
        return nil, err
    }

    // ...レスポンスを生成

    return &common.Content{ContentType: "application/json", Data: res}, nil
}

一定間隔毎に針を運針する機能

image.png

Resource bindingsでサポートしているcronを利用して一定間隔でスケジューリングできるトリガーをもとに、tickerサービスでイベントを発行し、second-hand/minute-hand/hour-handサービスの運針(データを永続化)を実現します。tickerからブロードキャストするのではなく、

ticker --[Ticked]--> second-hand --[60sTicked]--> minutes-hand --[60mTicked]--> hour-hand

とtickerのイベントをトリガーに各サービスもイベントを発行するようにしています。daprのビルディングブロックとしては、

を使いました。

Resource bindings

まずはtickerサービスのトリガーとなるinput bindingsから見ていきます。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: ticker
spec:
  type: bindings.cron
  metadata:
    - name: schedule
      value: "@every 1s"

と1sec毎にtickerサービスの処理を行うためのトリガーを設定します。Service-to-service invocationと同様に、ここではtickerコンポーネントとhandler.Tickハンドラーを結びつけるため、s.AddBindingInvocationHandler("ticker", handler.Tick) を行います。

import (
    daprd "github.com/dapr/go-sdk/service/grpc"

    "github.com/kzmake/dapr-clock/microservices/ticker/handler"
)

func main() {
    s, err := daprd.NewService(":3001")
    if err != nil {
        log.Fatalf("failed to start the server: %+v", err)
    }

    if err := s.AddBindingInvocationHandler("ticker", handler.Tick); err != nil {
        log.Fatalf("error adding binding handler: %+v", err)
    }

    if err := s.Start(); err != nil {
        log.Fatalf("server error: %+v", err)
    }
}

このようにするだけで、外部コンポーネントをトリガーとして処理を行うサービスを作成できます。

Publish and subscribe

次はマイクロサービスの肝となるPublish/Subcribeをdaprを使って実装していきます。pubsubコンポーネントの定義し、

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  metadata:
    - name: redisHost
      value: localhost:6379
    - name: redisPassword
      value: ""

コードではコンポーネントのname/namespaceを指定してclient.PublishEvent(ctx, "pubsub", "Ticked", nil) を使うことでメッセージのPublishを実装します。

func Tick(ctx context.Context, in *common.BindingEvent) ([]byte, error) {
    client, err := dapr.NewClient()
    if err != nil {
        return nil, err
    }

    if err := client.PublishEvent(ctx, "pubsub", "Ticked", nil); err != nil {
        return nil, err
    }

    return nil, nil
}

second-handサービスのSubscribeは、AddTopicEventHandler を使ってSubscribeハンドラーを登録します。

import (
    "github.com/dapr/go-sdk/service/common"
    daprd "github.com/dapr/go-sdk/service/grpc"

    "github.com/kzmake/dapr-clock/microservices/second-hand/handler"
)

func main() {
    s, err := daprd.NewService(":3000")
    if err != nil {
        log.Fatalf("failed to start the server: %+v", err)
    }

    // ...invokeなど

    if err := s.AddTopicEventHandler(&common.Subscription{
        PubsubName: "pubsub",
        Topic:      "Ticked",
        Route:      "/increase",
    }, handler.Increase); err != nil {
        log.Fatalf("error adding event handler: %+v", err)
    }

    if err := s.Start(); err != nil {
        log.Fatalf("server error: %+v", err)
    }
}

あとは、イベントを受信した際にdaprストア機能で秒針の情報を取得し、1sec分運針した後、永続化します。

func Increase(ctx context.Context, e *common.TopicEvent) (bool, error) {
    client, err := dapr.NewClient()
    if err != nil {
        return false, err
    }

    item, err := client.GetState(ctx, "statestore", "second")

    // ...itemからsecを取得

    // 59 -> 0 への運針であれば新規にイベント発行
    if (sec+1)/60 == 1 {
        if err := client.PublishEvent(ctx, "pubsub", "Ticked.60s", nil); err != nil {
            return 0, err
        }
    }

    // ...1sec運針する処理

    if err := client.SaveState(ctx, "statestore", "second", []byte(fmt.Sprintf("%d", sec))); err != nil {
        return false, err
    }

    return false, nil
}

59 -> 0 へと秒針が一周する場合は"Ticked.60s"をPublishして minute-hand/hour-handにイベントを渡していくようにしてみました。今回はこのようにサービス毎のイベントをPub/Subすることで時針・分針・秒針の運針をdaprを使って実装してみました。

現在時刻を同期する機能

最後に電波時計的な機能を実装しようと思います。
image.png

先程と基本的には同じビルディングブロックを利用し、cronを利用して一定間隔でスケジューリングできるトリガーをもとに、synchronizerサービスでNTPより取得した現在時刻の同期イベントを発行し、second-hand/minute-hand/hour-handサービスの時刻同期を実現します。今回はsynchronizerからブロードキャストし、ペイロードにjsonを渡すことでsecond-hand/minute-hand/hour-handの時刻を設定します。ここも dapr のビルディングブロックとしては、

を使いました。新規に下記のsynchronizerコンポーネントを定義し、1日毎に現在時刻の同期を試みます。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: synchronizer
spec:
  type: bindings.cron
  metadata:
    - name: schedule
      value: "@daily"

Publish and subscribe

Resource bindings を利用してトリガーとしてコンポーネントを登録しつつ、 Publish and subscribe でペイロードを持つイベントを発行してみます。Publish部分では NTP より取得した時刻情報をペイロードに設定してPublishさせます。

func Synchronize(ctx context.Context, in *common.BindingEvent) ([]byte, error) {
    client, err := dapr.NewClient()
    if err != nil {
        return nil, err
    }

    time, err := ntp.Time("ntp server address...")
    if err != nil {
        return nil, err
    }

    payload, err := json.Marshal(map[string]interface{}{"hour": time.Hour(), "minute": time.Minute(), "second": time.Second()})
    if err != nil {
        return nil, err
    }

    if err := client.PublishEvent(ctx, "pubsub", "Synchronized", payload); err != nil {
        return nil, err
    }

    return nil, nil
}

後はsecond-hand/minute-hand/hour-handでイベントをSubscribeし、時刻同期を行うといった実装になります。

以上で時計サービスとして、

  • 現在時刻を取得する機能
  • 一定間隔で秒針・分針・時針を運針する機能
  • 現在時刻を同期する機能

と最低限の機能を dapr + マイクロサービス として実現できたかなと思います!

さいごに

どうでしたでしょうか!まだまだ成長途中なOSSかと思いますが、コンポーネントとしてインフラの差し替え可能だったり、ビルディングブロックが豊富だったりと dapr に乗っかることでコアロジックに集中できそうだなと改めて思いました :smile:

github.com/kzmake/dapr-clock は6マイクロサービスを組み合わせてノリで作ってみましたが、設計&開発には1日もかかりませんでした :sunglasses: これなら開発効率ももりもり上げていけそうですね :muscle: (v1.0 が待ち遠しい…)

ニフクラのサービスとして提供している hatoba 上で dapr を動かしたり、マルチクラウドに利用しても面白いかもしれないですね!

今回紹介しきれなかったものに、

があるんですが、また別の機会で紹介しようと思います。(github.com/dapr/go-sdkはまだactorを利用できないようでした…残念… #21)

さて、明日は @yaaamaaaguuu さんが VMware製品を気軽に検証するためのtips について書いてくれるようです!お楽しみに:yum:

参考文献

この記事は以下の情報を参考にしています。

kzmake
趣味でArduinoやProcessingを使ったおもちゃを作ってます。
fjct
クラウド・IoT 関連サービスを開発・提供している企業です。(こちらは、富士通クラウドテクノロジーズの有志にて運営しております。)
https://fjct.fujitsu.com
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away