はじめに
最近Goを使っているので、自分の興味分野であったマイクロサービスをGoでどうやって実現できるか調べてみました。といってもそんなに深い内容ではありません。gokitのチュートリアルに毛が生えた程度です。すごい簡単なアプリが出てきますが、かなりいい加減なので先に謝っておきます。真似しないように。
それではつらつらと。
Gokitについて
A toolkit for microservices
と書いてありましたのでその通りなんだと思います。
他にもマイクロサービス用のライブラリはあるらしいですが、スターの数が8500(2017年12月)と圧倒的ぽかったので、触ってみました。
今回作ってみたサンプルアプリについて
すごくシンプルなアプリです。
全ソースコード
https://github.com/miya-masa/go-micro-sample
UserServiceとProductServiceという二つのサービスに対してAPI Gateway(的なもの)がアクセスするようなイメージです。
UserServiceとProductServiceは単純に名前からUserとProductを取得してくるというだけのアプリです。
API Gatewayは
- /users/{username}
- /products/{productname}
という二つのを公開していて、それぞれ裏側のUserService /{username}
と ProductService /{productname}
にアクセスしています。
API Gatewayは一台(localhost:8000)、UserService3台(localhost:8080,localhost:8081,localhost:8082)、ProductService3台(localhost:8083,localhost:8084,localhost:8085)を想定しています。
2 +-------------+
+------> | UserService |-+
+--------+ +-------------+ | ++------------+ +-+
| | 1 | | -+ ++-------------+ |
| client | ---> | API Gateway | +---------------+
| | | | -+
+--------+ +-------------+ |
| 3 +----------------+
+-----> | ProductService |-+
++---------------+ +-+
++----------------+ |
+------------------+
- clientは
/users/{username}
、または/products/{productname}
にアクセスできます - UserServiceへは
/{username}
にプロキシします。 - ProductServiceへは
/{productname}
にプロキシします。
サービスを作る
まずサービスを作ります。サービス自体はgokitと特に関係ないのですごく簡単に。。。
サービスはインターフェースで定義します。
type ProductService interface {
ProductByName(ctx context.Context, name string) (*Product, error)
}
type impl struct {
}
func NewService() ProductService {
return &impl{}
}
func (u *impl) ProductByName(ctx context.Context, name string) (*Product, error) {
product, ok := products[name]
if ok {
return &product, nil
}
return nil, ErrNotFound
}
サービスをhttpハンドラにする。
すごーく簡単に紹介します。
ハンドラーはgokitの
httptransport "github.com/go-kit/kit/transport/http" // ハンドラに紐づけるやつ
github.com/go-kit/kit/endpoint // endpointつくるやつ
を使って作ります。gokitは任意のサービスそのものをエンドポイントにするわけではなく、
endpoint.Endpoint
としてエンドポイントとして扱います。
といってもこの endpoint.Endpoint
は func(ctx context.Context, request interface{}) (interface{}, error)
という関数なのでシンプルでわかりやすいですね。 request
と response
については後ほど。
作ったサービスをこんな感じでラップします。
func makeProductByNameEndpoint(svc ProductService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(string)
product, err := svc.ProductByName(ctx, req)
if err != nil {
return nil, err
}
return product, nil
}
}
requestを型アサーションしていますが、このデコード処理は別途定義します。
responseもエンコードする必要ありますがこれも別途定義します。まとめて後ほど。
さて、作ったエンドポイントですが httptransport
を使ってこんな感じでハンドラをバインドします。
r := mux.NewRouter()
r.Handle("/{name}", httptransport.NewServer(
makeProductByNameEndpoint(products.NewService()),
decodeProductByNameRequest,
encodeResponse))
エンドポイント、RequestDecoder、ResponseEncoderという順番です。
このdecodeProductByNameRequestの中身がこれです。
func decodeProductByNameRequest(_ context.Context, r *http.Request) (interface{}, error) {
if name, ok := mux.Vars(r)["name"]; ok {
return name, nil
}
return nil, errors.New("name is required")
}
gorilla使ってますが、パス変数取っているだけです。ここでデコードした値が endpoint.Endpoint
の引数になるわけですね。
encodeResponseはこんな感じです。
func encodeResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
return json.NewEncoder(w).Encode(response)
}
はい。構造体をまんまJSONエンコードしているだけです。
この辺りは、標準APIそのまんまですね。
個人的に入力変換と処理と出力変換が分かれているのはすごくわかりやすくて良いです。
さて、これで
s := &http.Server{
Handler: r,
Addr: "localhost:8080",
// Good practice: enforce timeouts for servers you create!
WriteTimeout: 15 * time.Second,
ReadTimeout: 15 * time.Second,
}
logger.Log(s.ListenAndServe())
とやれば、サービスが立ち上がりますね。
+----------------+
| ProductService |
+----------------+
ができました。UserServiceも全く同じです。
APIGatewayを作る
次にAPIGatewayを作りましょう。APIGatewayはclientからのServerEndpointであるProductServiceへは、ClientEndpointとなります。
まず、ProductServiceと同じインターフェースで作っていきます。処理は裏側のProductServiceへアクセスするだけです
type ProxyProductService struct {
productByName endpoint.Endpoint
}
func (m *ProxyProductService) ProductByName(ctx context.Context, name string) (*products.Product, error) {
response, err := m.productByName(ctx, name)
if err != nil {
return nil, err
}
resp := response.(*products.Product)
return resp, nil
}
これは自体は簡単なんですが、ProxyProductServiceが持っている productByName endpoint.Endpoint
。これは先ほどの ServerEndpoint
と違うので注意です! ClientEndpoint
と呼ぶことにします。
さて、このClientEndpointですが、こんな感じでつくります。
func makeProductByNameEndpoint(proxyURL string) endpoint.Endpoint {
url, err := url.Parse(proxyURL)
if err != nil {
panic(err)
}
return httptransport.NewClient(
"GET",
url,
encodeProductByNameRequest,
decodeProductByNameResponse,
).Endpoint()
}
proxyURLは裏側のURLとなります。
クライアントなのでEncodeRequest、DecodeResponseと、ServerEndpointと逆になっていることに注意してください。
ここで勘が鋭い人なら気づくかもしれませんが、
「ん、さっきのサービスでendpoint.Endpoint一つしか持ってなかったけどどうやって後ろの三台に分散するん?proxyURLも一つでしょ?。。。。」
という疑問が残ります。
では、ちょっと難しめのProxyProductServiceを作るコードを説明していきます。
func NewProductService(proxyURLs []string) products.ProductService {
var (
qps = 100
maxAttempts = 3
maxTime = 1 * time.Second
)
var (
subscriber sd.FixedEndpointer // 1
)
for _, url := range proxyURLs { // 2
if !strings.HasPrefix(url, "http") {
url = "http://" + url
}
var e endpoint.Endpoint
e = NewEndpoints(url).ProductByName // 3
e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
e = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), qps))(e)
subscriber = append(subscriber, e) // 4
}
balancer := lb.NewRoundRobin(subscriber) // 5
retry := lb.Retry(maxAttempts, maxTime, balancer) // 6
return &ProxyProductService{retry}
}
引数のproxyURLsのサーバーに対してラウンドロビンで接続しに行くProxyProductServiceを生成します。
- 複数のエンドポイントクライアントを保持するための変数定義です。
- proxyURL分endpointをendpointer(subscriberとなってますがendpointerと呼びます。。。)に加えます。ProductServiceの場合、"localhost:8083", "localhost:8084", "localhost:8085",が入ってくる想定です。もちろんそのホストにProductServiceが起動されている前提です。
- ここでendpointは先ほど
makeProductByNameEndpoint
で作成したものです。 - endpointerに作ったClientEndpointを加えます。加える前になにやらやっていますが、これは、作ったエンドポイントの前後にいい感じの処理を追加しています。「いい感じ」というのは名前から想像してください。。。。。mm
- 作ったendpointerに対してラウンドロビンで利用するクライアントロードバランサーを作ります。
- 作ったロードバランサーから、endpointsを作っています。
ここまで来て作ったエンドポイントが、ProxyProductServiceのendpointとなります!イメージ的には最初貧弱だったエンドポイントが色々な装備を経てマイクロサービス用のendpoint.Endpointに変身する感じです。
+----------------+
+----> | ProductService |
+-----------+-----------------+ | | localhost:8083 |
| | localhost:8083用| -+ +----------------+
| Product +-----------------+ +-> | ProductService |
apigateway | endpoint | localhost:8084用| ----+ | localhost:8084 |
| +-----------------+ +----------------+
| | localhost:8085用| ------> | ProductService |
+-----------------------------+ | localhost:8085 |
+----------------+
UserServiceも同じ作りです!
APIゲートウェイからバックエンドサービスにつなぐ
ここまでくればあと少しです。
アプリを起動するmain関数を書いていきます。あくまでマイクロサービスっぽく作る簡易的なコードなので決してそのまま使わないでください。
var logger log.Logger
{
logger = log.NewLogfmtLogger(os.Stdout)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
}
r := mux.NewRouter()
productproxy := apigateway.NewProductService([]string{ // 1
"localhost:8083", "localhost:8084", "localhost:8085",
})
productproxy = products.Logging(logger, "localhost:8000")(productproxy)
r.Handle("/products/{name}", httptransport.NewServer( // 2
products.NewEndpoints(productproxy).ProductByName,
decodeProductByNameRequest,
encodeResponse))
serverApiGateway := &http.Server{ //3
Handler: r,
Addr: "127.0.0.1:8000",
// Good practice: enforce timeouts for servers you create!
WriteTimeout: 15 * time.Second,
ReadTimeout: 15 * time.Second,
}
psrv := products.NewService() // 4
prodinstances := []string{"localhost:8083", "localhost:8084", "localhost:8085"}
for _, v := range prodinstances { // 5
r := mux.NewRouter()
srv := products.Logging(logger, v)(psrv)
r.Handle("/{name}", httptransport.NewServer(
products.NewEndpoints(srv).ProductByName,
decodeProductByNameRequest,
encodeResponse))
s := &http.Server{
Handler: r,
Addr: v,
// Good practice: enforce timeouts for servers you create!
WriteTimeout: 15 * time.Second,
ReadTimeout: 15 * time.Second,
}
go func(ins string) {
logger.Log("start", ins)
logger.Log(s.ListenAndServe()) // 6
}(v)
}
logger.Log(serverApiGateway.ListenAndServe()) //7
UserServiceは省いています。作りは同じです。
- apigateway.NewProductServiceでバックエンドにつなぐProductServiceを作ります。実体はProxyProductServiceです。クライアントロードバランサーを備えたProductServiceのProxyです。裏で動かすの3台の想定なので、それぞれのホストを指定します。
- APIGatewayのエンドポイントを作ります。これはServerEndpointです。products.NewEndpointsはServerEndpointを生成する関数です。ClientEndpointを備えたProxyProductServiceを指定します。
- serverを作ります。
- 4,5,6はバックエンドのサービス立ち上げです。proxyとおなじようにproducts.NewEndpoint()を利用してEndpointを作りますが、中身は実サービスです。4.では実サービスインスタンス生成します。
- 5,6で指定したアドレスでサーバーを立てます。単純にループで回しています。
- 非同期で作ったサーバーをガンガン起動します。
- 最後にAPIGatewayを立ち上げます。
これで最初にイメージした構成が完了しました。
アプリを起動
実行するとサーバーが起動します。スルーしましたが、logを出力をするmiddlewareを仕込んでいます。何回か同じURL(/products/product1)でアクセスすると。。。。
// 1回めのアクセス
// ApiGatewayのホスト
ts=2017-12-08T14:54:36.338054Z caller=logging.go:26 server=localhost:8000
// Backendのホスト
ts=2017-12-08T14:56:16.447323Z caller=logging.go:26 server=localhost:8083
// 2回めのアクセス
// ApiGatewayのホスト
ts=2017-12-08T14:56:53.491512Z caller=logging.go:26 server=localhost:8000
// アクセスするサーバーが変わっている!
ts=2017-12-08T14:56:53.793216Z caller=logging.go:26 server=localhost:8084
ってな感じで負荷分散しながらバックエンドのサービスとつながることができました。
まとめ
- RequestとかResponseで型アサーションをミスると実行時までわからないのが少し痛い
- 今回実現したかったことはできた
- 次はServiceRegistoryにConsulを使ってみたい
- 公式も見ると良いよ。 http://gokit.io/examples/
- 公式のexampleのソースコードよりリポジトリ内のソースコード読むと良いよ
- 最後の方少しなげやりですみません。
- まさかり怖い