はじめに
この記事は、 富士通クラウドテクノロジーズ Advent Calendar 2020 の2日目の記事です。
1日目は @miyuush さんの ニフクラがTerraformに対応したので使ってみた【基礎編】 でした!
昨日リリースされたばかりで生まれたてホヤホヤ感のある nifcloud/terraform-provider-nifcloud v1.0.0 ですがこれからの機能エンハンスが楽しみですね!どんどんIaCにしていきたい![]()
改めましてこんにちは!NIFCLOUDのいくつかのサービスのAPIを開発している @kzmake と申します。
入社しサービスを開発してはや4年目になりました。ここ数年はいくつかのサービス開発を経験し、そこそこcleanでdddなアプリケーションをかけるようになってきたました ![]()
最近はどうすればスピード感ある開発ができるかな〜と考えている今日このごろです。
今日は自分が使ってみたいなぁ〜と感じている
- github.com/dapr/dapr について
- daprを使ったマイクロサービスアプリケーション実装
を紹介したいと思います!
dapr とは
dapr は、 Distributed Application Runtime という名のとおりマイクロサービスアプリケーションとして必要な機能をビルディングブロックとして提供してくれるランタイムです。stable はまだ v0.11.3 と比較的若いながら、★8.4k とかなりホット
な OSSプロジェクトではないかなとおもっています。そのコンセプトは、Any language, any framework, anywhere としており多様性をもった利用ができるところもポイントですね。
本来実装したいコアロジックにサイドカーとして利用することで、簡単にマイクロサービスを作成することができます。
更にそれぞれのビルディングブロックは抽象化されており、 HTTP/gRPC API を通して利用するものとなっているため言語に縛られない開発ができるのも魅力となっています。
マイクロサービスのためのビルディングブロック
dapr が現在(2020/11/29)提供しているビルディングブロックには下記のものがあります。
-
- 他のマイクロサービスサービスへ通信するための機能
-
- key/valueベースの永続化や参照機能
-
Publish and subscribe:
/v1.0/publishand/v1.0/subscribe- Publish/subscribeモデルで非同期にメッセージを送受信する機能
-
- 外部コンポーネントやサービスを抽象化しイベントの送受信を行う機能
-
- 分散性や並行・並列性をもち、非同期なメッセージ駆動のアクターモデルを提供
-
Observability
- ログ・トレース・メトリクス・ヘルスチェックといったオブザーバビリティに必要な要素を提供
-
- 安全にパスワードなどのクレデンシャルなデータにアクセスする機能
それぞれのコンポーネントはライブラリとしてアプリケーションに組み込むのではなく、yamlのコンポーネント定義ファイルをロードさせることで利用することができます。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.redis
metadata:
- name: redisHost
value: redis:6379
- name: redisPassword
value: ""
ビルディングブロックとしての提供なので、実装に一切手を加えず、検証環境ではredis、本番環境では何かしらのクラウドサービスなど切り替えもできるのがいいですね![]()
ミドルウェア(http)
ミドルウェアも各種ビルディングブロックと同様にコンポーネントを定義することで利用可能となっています。
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: pipeline
namespace: default
spec:
httpPipeline:
handlers:
- name: oauth2
type: middleware.http.oauth2
- name: uppercase
type: middleware.http.uppercase
特徴的なのが、 ratelimit や oauth2 など定義済みのミドルウェアだけでなく、 Open Policy Agent(OPA) を用いてミドルウェアを追加できるようです( ex: https://play.openpolicyagent.org/p/oRIDSo6OwE )。提供されているミドルウェアは https://github.com/dapr/components-contrib/blob/master/middleware/http で確認できます。
サポートしているSDK
daprが提供する HTTP/gRPC API にアクセスするためのsdkを利用することもできます。提供されているSDKは、
となってます。今後のロードマップはこんな感じらしい。
ダッシュボード
まだ機能は少ないですが、ダッシュボードも利用できるようです。今後の機能追加も期待ですね!
IDEのサポート
言語サポートだけでなく、IDEのサポートもあるみたいです![]()
んでなにを作ったの?
time.Now() など現在時刻を直接取得せず、マイクロサービスでシンプルな
時計サービス
: github.com/kzmake/dapr-clock
をつくってみました(同期にこのアイデアをいただきました)!
$ docker-compose up -d --build
$ docker-compose exec dev curl -s http://web/now | jq .
{
"hour": 15,
"minute": 33,
"second": 25
}
上記のコードは、時計をざっくりとモデリングし…
-
clock: 現在時刻を取得するサービス
-
/v1.0/clock/invoke/nowで時刻を取得するAPIを提供-
hour-hand/minute-hand/second-handから針の情報を取得する
-
-
-
hour-hand: 時針を管理するサービス
-
時刻同期のイベントに時針を設定する -
60分間経過のイベントを元に分針を運針する - 時針を永続化する
-
23→0になると24時間経過のイベントを発行する
-
-
minute-hand: 分針を管理するサービス
-
時刻同期のイベントに分針を設定する -
60秒間経過のイベントを元に分針を運針する - 分針を永続化する
-
59→0になると60分間経過のイベントを発行する
-
-
second-hand: 秒針を管理するサービス
-
時刻同期のイベントに秒針を設定する -
1秒間経過のイベントを元に秒針を運針する - 秒針を永続化する
-
59→0になると60秒間経過のイベントを発行する
-
-
ticker: 1秒をカウントするサービス
-
一定間隔(1sec)毎に1秒間経過のイベントを発行する
-
-
synchronizer: NTPを用いて時刻を同期するサービス
-
一定間隔(24h)毎に時刻同期のイベントを発行する
-
をマイクロサービスとして設計しています。
アーキテクチャ
daprを利用して以下のような設計してみました。
今回は、
-
Service-to-service invocation
-
clockサービスからsecond-hand/minute-hand/hour-handの呼び出し
-
-
Publish and subscribe
- cronを使って
tickerサービス /synchronizerサービス を一定間隔毎にトリガー
- cronを使って
-
Resource bindings
- redis経由で各イベントを通知
-
State management
- 時針・分針・秒針の情報を永続化
を利用してみようとおもいます。
github.com/kzmake/dapr-clock
github.com/dapr/go-sdk を利用してdaprサイドカーと通信するマイクロサービスアプリケーションを github.com/kzmake/dapr-clock の /microservices に各マイクロサービスを作成しました。各機能についてdaprでどう実装できるかを紹介していきたいと思います。
現在時刻を取得する機能
ユーザーはclockサービスを通してsecond-hand/minute-hand/hour-handサービスが管理する時針・分針・秒針を取得するとします。
また、second-hand/minute-hand/hour-handはそれぞれの針の状態を時針で持ちたくないのでdaprが提供するストア機能で永続化も試みます。ここでdaprのビルディングブロックとしては、
を利用し、dapr上でPOST /v1.0/invoke/clock/method/nowのAPIを提供します。まず、APIリクエストを受け取る部分から記述していきます。今回はアプリケーション <--> dapr間でgrpcを利用したかったのでgithub.com/dapr/go-sdk/service/grpcを使っています。
Service-to-service invocation
下記はclockサービスの実装ですが、second-hand/minute-hand/hour-hand も呼び出す handler が違うだけで差分はありません。second-hand/minute-hand/hour-handも同じようにAddServiceInvocationHandler を使って "now" のAPIを追加しています。
import (
daprd "github.com/dapr/go-sdk/service/grpc"
"github.com/kzmake/dapr-clock/microservices/clock/handler"
)
func main() {
s, err := daprd.NewService(":3000")
if err != nil {
log.Fatalf("failed to start the server: %+v", err)
}
// POST /v1.0/invoke/clock/method/now
if err := s.AddServiceInvocationHandler("now", handler.Now); err != nil {
log.Fatalf("error adding invocation handler: %+v", err)
}
if err := s.Start(); err != nil {
log.Fatalf("server error: %+v", err)
}
}
clockサービスのhandlerは、second-hand/minute-hand/hour-handのAPIをリクエストするため、daprのビルディングブロックである Service-to-service invocation を利用します。client.InvokeServiceを利用している部分がそれにあたりますが、daprが提供する固定のエンドポイントを利用することでサービスディスカバリーを実装する必要がないようになっています!
func Now(ctx context.Context, in *common.InvocationEvent) (*common.Content, error) {
client, err := dapr.NewClient()
if err != nil {
return nil, err
}
// POST /v1.0/invoke/hour-hand/method/now
hourHandRes, err := client.InvokeService(ctx, "hour-hand", "now")
if err != nil {
return nil, err
}
// POST /v1.0/invoke/minute-hand/method/now
minuteHandRes, err := client.InvokeService(ctx, "minute-hand", "now")
if err != nil {
return nil, err
}
// POST /v1.0/invoke/second-hand/method/now
secondHandRes, err := client.InvokeService(ctx, "second-hand", "now")
if err != nil {
return nil, err
}
// ...それぞれのレスポンスの json.Unmarshal 処理してレスポンスを作成するなど
return &common.Content{ContentType: "application/json", Data: res}, nil
}
State management
上記のリクエストを second-hand/minute-hand/hour-handサービスで受け付け、 永続化されている針を取得していきます。State management では、コンポーネントとしてyamlで定義したstatestoreを利用していきます。バックエンドとして多くのものをサポートしていますが、今回はredisを使用しています。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
State management では、metadataで設定したnameやnamespaceを元にコード内で永続・参照先を指定します。今回の例ではstatestoreを指定します。参照のみなので、client.GetState(ctx, "statestore", "hour") のようにデータを取得します。
hour-handサービスのPOST /v1.0/invoke/hour-hand/method/nowを処理するハンドラーとしては下記のようになります。
import (
dapr "github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/service/common"
)
func Now(ctx context.Context, in *common.InvocationEvent) (*common.Content, error) {
client, err := dapr.NewClient()
if err != nil {
return nil, err
}
item, err := client.GetState(ctx, "statestore", "hour")
if err != nil {
return nil, err
}
// ...レスポンスを生成
return &common.Content{ContentType: "application/json", Data: res}, nil
}
一定間隔毎に針を運針する機能
Resource bindingsでサポートしているcronを利用して一定間隔でスケジューリングできるトリガーをもとに、tickerサービスでイベントを発行し、second-hand/minute-hand/hour-handサービスの運針(データを永続化)を実現します。tickerからブロードキャストするのではなく、
ticker --[Ticked]--> second-hand --[60sTicked]--> minutes-hand --[60mTicked]--> hour-hand
とtickerのイベントをトリガーに各サービスもイベントを発行するようにしています。daprのビルディングブロックとしては、
を使いました。
Resource bindings
まずはtickerサービスのトリガーとなるinput bindingsから見ていきます。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: ticker
spec:
type: bindings.cron
metadata:
- name: schedule
value: "@every 1s"
と1sec毎にtickerサービスの処理を行うためのトリガーを設定します。Service-to-service invocationと同様に、ここではtickerコンポーネントとhandler.Tickハンドラーを結びつけるため、s.AddBindingInvocationHandler("ticker", handler.Tick) を行います。
import (
daprd "github.com/dapr/go-sdk/service/grpc"
"github.com/kzmake/dapr-clock/microservices/ticker/handler"
)
func main() {
s, err := daprd.NewService(":3001")
if err != nil {
log.Fatalf("failed to start the server: %+v", err)
}
if err := s.AddBindingInvocationHandler("ticker", handler.Tick); err != nil {
log.Fatalf("error adding binding handler: %+v", err)
}
if err := s.Start(); err != nil {
log.Fatalf("server error: %+v", err)
}
}
このようにするだけで、外部コンポーネントをトリガーとして処理を行うサービスを作成できます。
Publish and subscribe
次はマイクロサービスの肝となるPublish/Subcribeをdaprを使って実装していきます。pubsubコンポーネントの定義し、
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
コードではコンポーネントのname/namespaceを指定してclient.PublishEvent(ctx, "pubsub", "Ticked", nil) を使うことでメッセージのPublishを実装します。
func Tick(ctx context.Context, in *common.BindingEvent) ([]byte, error) {
client, err := dapr.NewClient()
if err != nil {
return nil, err
}
if err := client.PublishEvent(ctx, "pubsub", "Ticked", nil); err != nil {
return nil, err
}
return nil, nil
}
second-handサービスのSubscribeは、AddTopicEventHandler を使ってSubscribeハンドラーを登録します。
import (
"github.com/dapr/go-sdk/service/common"
daprd "github.com/dapr/go-sdk/service/grpc"
"github.com/kzmake/dapr-clock/microservices/second-hand/handler"
)
func main() {
s, err := daprd.NewService(":3000")
if err != nil {
log.Fatalf("failed to start the server: %+v", err)
}
// ...invokeなど
if err := s.AddTopicEventHandler(&common.Subscription{
PubsubName: "pubsub",
Topic: "Ticked",
Route: "/increase",
}, handler.Increase); err != nil {
log.Fatalf("error adding event handler: %+v", err)
}
if err := s.Start(); err != nil {
log.Fatalf("server error: %+v", err)
}
}
あとは、イベントを受信した際にdaprストア機能で秒針の情報を取得し、1sec分運針した後、永続化します。
func Increase(ctx context.Context, e *common.TopicEvent) (bool, error) {
client, err := dapr.NewClient()
if err != nil {
return false, err
}
item, err := client.GetState(ctx, "statestore", "second")
// ...itemからsecを取得
// 59 -> 0 への運針であれば新規にイベント発行
if (sec+1)/60 == 1 {
if err := client.PublishEvent(ctx, "pubsub", "Ticked.60s", nil); err != nil {
return 0, err
}
}
// ...1sec運針する処理
if err := client.SaveState(ctx, "statestore", "second", []byte(fmt.Sprintf("%d", sec))); err != nil {
return false, err
}
return false, nil
}
59 -> 0 へと秒針が一周する場合は"Ticked.60s"をPublishして minute-hand/hour-handにイベントを渡していくようにしてみました。今回はこのようにサービス毎のイベントをPub/Subすることで時針・分針・秒針の運針をdaprを使って実装してみました。
現在時刻を同期する機能
先程と基本的には同じビルディングブロックを利用し、cronを利用して一定間隔でスケジューリングできるトリガーをもとに、synchronizerサービスでNTPより取得した現在時刻の同期イベントを発行し、second-hand/minute-hand/hour-handサービスの時刻同期を実現します。今回はsynchronizerからブロードキャストし、ペイロードにjsonを渡すことでsecond-hand/minute-hand/hour-handの時刻を設定します。ここも dapr のビルディングブロックとしては、
を使いました。新規に下記のsynchronizerコンポーネントを定義し、1日毎に現在時刻の同期を試みます。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: synchronizer
spec:
type: bindings.cron
metadata:
- name: schedule
value: "@daily"
Publish and subscribe
Resource bindings を利用してトリガーとしてコンポーネントを登録しつつ、 Publish and subscribe でペイロードを持つイベントを発行してみます。Publish部分では NTP より取得した時刻情報をペイロードに設定してPublishさせます。
func Synchronize(ctx context.Context, in *common.BindingEvent) ([]byte, error) {
client, err := dapr.NewClient()
if err != nil {
return nil, err
}
time, err := ntp.Time("ntp server address...")
if err != nil {
return nil, err
}
payload, err := json.Marshal(map[string]interface{}{"hour": time.Hour(), "minute": time.Minute(), "second": time.Second()})
if err != nil {
return nil, err
}
if err := client.PublishEvent(ctx, "pubsub", "Synchronized", payload); err != nil {
return nil, err
}
return nil, nil
}
後はsecond-hand/minute-hand/hour-handでイベントをSubscribeし、時刻同期を行うといった実装になります。
以上で時計サービスとして、
- 現在時刻を取得する機能
- 一定間隔で秒針・分針・時針を運針する機能
- 現在時刻を同期する機能
と最低限の機能を dapr + マイクロサービス として実現できたかなと思います!
さいごに
どうでしたでしょうか!まだまだ成長途中なOSSかと思いますが、コンポーネントとしてインフラの差し替え可能だったり、ビルディングブロックが豊富だったりと dapr に乗っかることでコアロジックに集中できそうだなと改めて思いました ![]()
github.com/kzmake/dapr-clock は6マイクロサービスを組み合わせてノリで作ってみましたが、設計&開発には1日もかかりませんでした
これなら開発効率ももりもり上げていけそうですね
(v1.0 が待ち遠しい…)
ニフクラのサービスとして提供している hatoba 上で dapr を動かしたり、マルチクラウドに利用しても面白いかもしれないですね!
今回紹介しきれなかったものに、
があるんですが、また別の機会で紹介しようと思います。(github.com/dapr/go-sdkはまだactorを利用できないようでした…残念… #21)
さて、明日は @yaaamaaaguuu さんが VMware製品を気軽に検証するためのtips について書いてくれるようです!お楽しみに![]()
参考文献
この記事は以下の情報を参考にしています。






