Dapr という分散アプリケーション用のランタイムを Lima で実行してみました。
はじめに
Dapr とは
Distributed Application Runtime の頭文字から Dapr と名付けられているように、分散アプリケーションの開発を容易にするためのランタイム(とフレームワーク)です。
マイクロサービスに特化したものではなく、考え方はサービスメッシュよりも Java/Scala の Akka に近い気がするので、マイクロサービス開発へ適用する際は注意が必要かもしれません。
アプリケーションの開発モデル
Dapr のアプリケーションは、自身のサイドカーとして適用される Dapr Runtime とだけ直接やりとりする次のようなモデルを想定しているようです。
App1, App2: Dapr のアプリケーション(マイクロサービス等)
Dapr Runtime: サイドカーとして個々のアプリケーションへ適用されるランタイム
State stores: ステートの保存先(Redis、MySQL 等)
そして、Dapr Runtime は HTTP と gRPC の API 1で次のような機能をアプリケーションへ提供します。
- メソッド呼び出し(invoke)
- ステート管理
- メッセージング(Pub/Sub)
- アクターモデル
- ワークフロー
例えば、App1 が App2 のメソッドを呼び出す、もしくはステートを保存する場合、サイドカーである Dapr Runtime の API を介して行うようになっています。
つまり、Dapr Runtime への依存と引き換えに、他への依存を解消するようなモデルとなっており、アプリケーションにサイドカーの存在を意識させない(一般的な)サービスメッシュとは違っています。
懸念点
以上の内容から、Dapr は stateful な分散オブジェクト
2 にフォーカスしたものだと考えています。
そのため、例えば stateless
なサービスだけを Dapr アプリケーション化しようとすると無理が生じるため、ドメイン駆動設計におけるエンティティや集約のようなものも分散オブジェクト化 3するような発想が必要になると思います。
更に、ステート管理は実質的に KVS 4ですので、これで状態を上手く管理できない作りのシステム、例えば RDB のテーブルを SQL で操作しなければならないようなものは(現時点で)Dapr にあまりマッチしないと思われます。
基本的に Dapr アプリケーションから直接 DB へ接続するような必要が生じたら要注意5かもしれません。
また、1つの Dapr アプリケーションで アプリケーション本体 + サイドカー
の 2プロセスを消費する事になるので、アプリケーションの粒度を細かくしすぎた場合の性能やコスト面への影響も懸念材料です。
おそらくは、アクターモデルの機能を効果的に活用するのがキーポイントかもしれません。
Lima で環境構築
Dapr CLI の init
コマンドで Dapr の環境を構築するのが簡単ですが、次のような課題があります。
- Dapr CLI は現時点で docker と podman コマンドにしか対応していない(utils.go 参照)
つまり、nerdctl 6に対応していません。
Lima で docker や podman の環境を構築する方法も考えられますが、Dapr CLI のソースコードを見る限り、podman
の場合はコマンドの有無しかチェックしていないので何とかなりそうです。
そこで、今回は次のようにしてみました。
- nerdctl のシンボリックリンクを podman という名前で作る7
Dapr CLI をインストールした後、シンボリックリンクの作成を行うように、Lima の構成ファイルを次のようにしてみました。
lima_dapr.yaml
images:
...省略
mounts:
- location: "~"
provision:
- mode: system
script: |
#!/bin/bash
wget -q https://raw.githubusercontent.com/dapr/cli/master/install/install.sh -O - | /bin/bash
ln -s /usr/local/bin/nerdctl /usr/local/bin/podman
このファイルを使って Lima を実行します。
この環境へ入るには、limactl shell で lima_dapr
を指定します。
Lima 実行
[macOS] % limactl start ./lima_dapr.yaml --tty=false
...
[macOS] % limactl shell lima_dapr
[lima] $
次のように dapr init を実行する事で Dapr の環境が整います。
なお、Downloading binaries and setting up components...
から進まなくなったり、stream stalled: received xxx bytes over the last 5 seconds
エラーが発生する事もあったので 8、その場合は dapr uninstall
した後で再度実行してみると上手くいくかもしれません。
Dapr の環境構築例
[lima] $ dapr init --container-runtime=podman
...
[lima] $ nerdctl ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
6f930d94067e docker.io/daprio/dapr:1.9.5 "./placement" About a minute ago Up 0.0.0.0:50005->50005/tcp dapr_placement
8315d73acb7d docker.io/openzipkin/zipkin:latest "start-zipkin" About a minute ago Up 0.0.0.0:9411->9411/tcp dapr_zipkin
f7a820809cba docker.io/library/redis:6 "docker-entrypoint.s…" About a minute ago Up 0.0.0.0:6379->6379/tcp dapr_redis
dapr_placement がいわゆるコントロールプレーン9、dapr_redis がステートの保存先、dapr_zipkin が分散トレーシングのためのコンテナとなっています。
Dapr アプリケーションの作成と実行
動作確認のために、このような構成のアプリケーションを作成してみます。
(1) item app
商品の価格をステート管理するだけのアプリケーションとして、次のような処理を Go で実装してみました。
-
GET /price/<id>
で価格を取得 -
POST /price/<id>
もしくはPUT /price/<id>
で価格を設定
Dapr では invoke 対象を method
と表現しているので、本来は "/メソッド名"
のようなパス設計にするのが妥当だと思いますが、とりあえず今回はこのようにしてみました。
Dapr Runtime の gRPC API は使わず、HTTP API を使う事にします。
この HTTP API へ接続するための URL は http://localhost:<daprPort>/・・・
で、ポート番号は環境変数 DAPR_HTTP_PORT
から取得できます。
ステート管理の API は /v1.0/state/<storename>
というパスとなっており、dapr init で構築した今回の環境では statestore
10が(storename として)使えます。
ステートは [{"key":<キー>, "value":<値>}, ...]
のような内容で保存します。
ステートの保存に成功すると HTTP ステータスコードが 204
で返ってきますが、ステートの取得時は成功すると 200
で該当するキーがないと 204
が返ってきます。多少紛らわしいので注意が必要です。11
item/item.go
package main
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"os"
"strings"
)
type Item struct {
ID string `json:"id"`
Price int
}
type State struct {
Key string
Value Item
}
...省略
// ステートの取得
func loadState(endpoint string, id string) (Item, error) {
res, err := http.Get(endpoint + "/" + id)
var state Item
if err != nil {
return state, err
}
err = json.NewDecoder(res.Body).Decode(&state)
if err == nil && state.ID == "" {
err = errors.New(fmt.Sprintf("not found: id=%s", id))
}
return state, err
}
// ステートの保存
func saveState(endpoint string, state Item) error {
s, _ := json.Marshal([]State{
State{state.ID, state},
})
body := bytes.NewBuffer(s)
res, err := http.Post(endpoint, "application/json", body)
if err != nil {
return err
}
// 成功した場合は 204
if res.StatusCode != 204 {
return errors.New(fmt.Sprintf("failed to store state: statusCode=%d", res.StatusCode))
}
return nil
}
func main() {
daprPort := os.Getenv("DAPR_HTTP_PORT")
port := os.Getenv("APP_HTTP_PORT")
if port == "" {
port = "3000"
}
daprUrl := fmt.Sprintf("http://localhost:%s/v1.0", daprPort)
// ステート管理 API の URL
daprStateUrl := daprUrl + "/state/statestore"
http.HandleFunc("/price/", func(w http.ResponseWriter, r *http.Request) {
id := strings.TrimPrefix(r.URL.Path, "/price/")
if id == "" {
serverError(w, errors.New("invalid id"))
return
}
switch r.Method {
case http.MethodGet: // price の取得処理
state, err := loadState(daprStateUrl, id)
if err != nil {
serverError(w, err)
return
}
json.NewEncoder(w).Encode(state.Price)
case http.MethodPost, http.MethodPut: // price の設定処理
var price int
err := json.NewDecoder(r.Body).Decode(&price)
if err != nil {
serverError(w, err)
return
}
state := Item{id, price}
err = saveState(daprStateUrl, state)
if err != nil {
serverError(w, err)
return
}
json.NewEncoder(w).Encode(state.Price)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
})
log.Fatal(http.ListenAndServe(":" + port, nil))
}
ビルドと実行
macOS 側で linux 向けにビルドします。
ビルド例
[macOS] % cd item
[macOS] % GOOS=linux go build -o item_linux item.go
lima_dapr へ入って、dapr run
コマンドを実行すると Dapr Runtime が起動し、その管理下でアプリケーション(item_linux)が実行されます。12
ここでは app id を item
とし、このアプリケーション本体のポートを 3100
にしました。
ちなみに、--app-port
は Dapr Runtime からアプリケーション本体へ接続するポートを指定するためのものです。
Dapr アプリケーション実行例
[macOS] % limactl shell lima_dapr
[lima] $ APP_HTTP_PORT=3100 dapr run --app-id item --app-port 3100 ./item_linux
ℹ️ Starting Dapr with id item. HTTP Port: 38291. gRPC Port: 32869
INFO[0000] starting Dapr Runtime -- version 1.9.5 -- commit
...省略
INFO[0000] dapr initialized. Status: Running. Init Elapsed 11ms app_id=item instance=ubuntu scope=dapr.runtime type=log ver=1.9.5
INFO[0000] placement tables updated, version: 0 app_id=item instance=ubuntu scope=dapr.runtime.actor.internal.placement type=log ver=1.9.5
ℹ️ Updating metadata for app command: ./item_linux
✅ You're up and running! Both Dapr and your app logs will appear here.
動作確認
dapr invoke コマンドを使って price/<id>
を呼び出してみます。
item-1
の price を 1000
へ設定する場合は次のようになります。
price 設定例1 - dapr invoke 利用
[lima] $ dapr invoke --app-id item --method price/item-1 --data "1000"
1000
✅ App invoked successfully
今度は、Dapr Runtime の HTTP API(/v1.0/invoke/<app id>/method/<method>
)を使って同じように実行してみます。
price 設定例2 - Dapr Runtime の HTTP API 利用
[macOS] % curl http://localhost:38291/v1.0/invoke/item/method/price/item-2 -d "2000"
2000
(2) cart app
item app の price を呼び出すアプリケーションを作成します。
item app へ直接アクセスせずに、Dapr Runtime の Invoke API(/v1.0/invoke/<app id>/method/<method>
)を利用する点が重要です。
なお、こちらのステート管理では ETag
を使った楽観ロック(楽観的排他制御)を試してみました。
ETag の値は、ステートの取得時にレスポンスヘッダーから取得できます。ステートの保存時に指定した ETag が合っていないと保存に失敗し、HTTP ステータスコードが 400
で返ってきます。
ちなみに、(Redis を使用する場合) ETag は 1 から順にカウントアップされていくようです。13
cart/cart.go
...省略
type Cart struct {
ID string `json:"id"`
Items []CartItem
}
type CartItem struct {
ItemID string `json:"item_id"`
Price int
Qty int
}
type AddItem struct {
ItemID string `json:"item_id"`
Qty int
}
type State struct {
Key string
Value Cart
Etag *string
}
...省略
func findPrice(endpoint string, itemID string) (int, error) {
res, err := http.Get(endpoint + "/" + itemID)
var price int
if res.StatusCode != 200 {
err = errors.New("failed to get item price")
}
if err != nil {
return price, err
}
err = json.NewDecoder(res.Body).Decode(&price)
return price, err
}
func loadState(endpoint string, id string) (Cart, *string, error) {
res, err := http.Get(endpoint + "/" + id)
var state Cart
if err != nil {
return state, nil, err
}
err = json.NewDecoder(res.Body).Decode(&state)
if err == nil && state.ID == "" {
err = errors.New(fmt.Sprintf("not found: id=%s", id))
}
// ETag の取得
etag := res.Header.Get("ETag")
return state, &etag, err
}
func saveState(endpoint string, state Cart, etag *string) error {
s, _ := json.Marshal([]State{
{state.ID, state, etag},
})
body := bytes.NewBuffer(s)
res, err := http.Post(endpoint, "application/json", body)
if err != nil {
return err
}
if res.StatusCode != 204 {
return errors.New(fmt.Sprintf("failed to store state: statusCode=%d", res.StatusCode))
}
return nil
}
func main() {
...省略
itemAppId := os.Getenv("ITEM_APP_ID")
if itemAppId == "" {
itemAppId = "item"
}
daprUrl := fmt.Sprintf("http://localhost:%s/v1.0", daprPort)
daprStateUrl := daprUrl + "/state/statestore"
// Invoke API の URL (item app の price を間接的に呼び出す)
itemPriceUrl := fmt.Sprintf("%s/invoke/%s/method/price", daprUrl, itemAppId)
http.HandleFunc("/items/", func(w http.ResponseWriter, r *http.Request) {
id := strings.TrimPrefix(r.URL.Path, "/items/")
state, _, err := loadState(daprStateUrl, id)
if err != nil {
serverError(w, err)
}
json.NewEncoder(w).Encode(state.Items)
})
http.HandleFunc("/additem/", func(w http.ResponseWriter, r *http.Request) {
id := strings.TrimPrefix(r.URL.Path, "/additem/")
var param AddItem
json.NewDecoder(r.Body).Decode(¶m)
...省略
cart, etag, _ := loadState(daprStateUrl, id)
...省略
if !upd {
price, err := findPrice(itemPriceUrl, param.ItemID)
...省略
}
err := saveState(daprStateUrl, cart, etag)
if err != nil {
serverError(w, err)
return
}
json.NewEncoder(w).Encode(cart.Items)
})
log.Fatal(http.ListenAndServe(":"+port, nil))
}
ビルドと実行
ビルドして dapr run で実行しておきます。
ビルド & 実行例
[macOS] % cd cart
[macOS] % GOOS=linux go build -o cart_linux cart.go
[macOS] % limactl shell lima_dapr
[lima] $ APP_HTTP_PORT=3200 ITEM_APP_ID=item dapr run --app-id cart --app-port 3200 ./cart_linux
ℹ️ Starting Dapr with id cart. HTTP Port: 37013. gRPC Port: 36853
...省略
INFO[0000] placement tables updated, version: 0 app_id=cart instance=ubuntu scope=dapr.runtime.actor.internal.placement type=log ver=1.9.5
ℹ️ Updating metadata for app command: ./cart_linux
✅ You're up and running! Both Dapr and your app logs will appear here.
動作確認
cart app の additem/<id>
を dapr invoke コマンドで実行してみます。
実行例
[lima] $ dapr invoke --app-id cart --method additem/cart-1 --data '{"item_id":"item-1", "qty":3}'
[{"item_id":"item-1","Price":1000,"Qty":3}]
✅ App invoked successfully
特に問題なく、item app から price の値を取得できているようです。
最後に
dapr_redis へ接続して、キーの内容を確認したところ次のようになっていました。
dapr_redis の内容
[lima] $ nerdctl exec -it dapr_redis redis-cli
127.0.0.1:6379> keys *
1) "item||item-2"
2) "cart||cart-1"
3) "item||item-1"
次に、Zipkin の Dependencies ではこのような表示になりました。
文字が小さくて分かり難いと思いますが、cart と item がそれぞれ statestore を呼び出していて、cart が item を呼び出している事が表現されています。
-
API の詳細は dapr.proto や api.go 参照 ↩
-
マイクロサービスの場合を Dapr では stateful microservices と表現している ↩
-
Dapr アプリケーション化してしまうと重いので、アクターモデルの活用が必要になりそう ↩
-
ある程度のクエリー機能(現時点ではアルファ版)は用意されており、DB 次第でトランザクションも扱える ↩
-
設計を見直した方が良さそうな気がします ↩
-
Lima におけるデフォルトのコンテナ管理コマンドツール ↩
-
nerdctl は Docker 互換なので、当然ながら podman とも互換性がある ↩
-
standalone.go に問題がありそう ↩
-
Actor を管理するためのものだと思われる ↩
-
~/.dapr/components/statestore.yaml 参照 ↩
-
実際の開発では、HTTP API よりも gRPC API を使っておいた方が無難かもしれません ↩
-
Dapr Runtime を終了させるとアプリケーションも停止します ↩
-
ETag にどのような値を使用するかはコンポーネント次第となっている。Redis の場合はインクリメントする version の値だが、MySQL や MongoDB では UUID。 ↩