Edited at
DeNADay 16

GAE/Go, Firestoreでマスターデータを管理してみた件


モチベーション

ちょっと前までRuby,Rails,オンプレという環境で開発してきたのですが、最近GAE/Goという全く違う環境で開発するようになりGCPしゅごいいぃぃぃ!と思い始めていたのでGCPのサービス群を使って今開発しているサービスにも活かせるようなかっこいい処理を考えてみようと思いました。

ちょうど今作っているサービスでマスタデータを管理するような処理をかく機会があったのでPub/Subあたりを使ってイベント駆動でうまいこと処理できないか考えてみることにしました。


アーキテクチャ

利用するサービスとしては、GAE/SE,GCS,Pub/Sub,Cloud Firestoreを使っていきます。

GCSからCloud Functions経由でFirestoreに直接書き込みをする方法も考えたのですが、型を管理するのが大変そうなのとデバッグが辛かったり自分のNode.js力が低すぎたりしたのでメインのAPIコンポーネントでも利用しているGAE/Goという構成でやってみることにしました。


マスタ管理を切り出すメリット

マスタデータを管理するサービスを切り出すことで以下のようなメリットがあるかなと思います。Pub/SubでGAEからAckが返ってこなかった時に再通知してくれるので障害が発生したようなケースにも活かせるかなと思いました。


  • ドメインレイヤと疎結合にすることができる

  • 耐障害性

  • CI,CDにかかる時間を短縮することができる

デメリットとしては工数がかかったりオブジェクトの定義をどうやって管理するかなどがあるかなと思います。定義の管理やvalidationについては、gRPCやswaggerなどを使えばうまくできそうな気がしますが、今回時間がなかったのでスコープ外にしています(逃


構成図

アーキテクチャの構成としては下図の通りにしました。

advent.png

流れとしては次のようにGCSにファイルをアップロードするだけでDBに反映して永続化されるという作りになっています。


  1. ClientからgsutilでGCSにファイルをアップロードします。ここは実際の環境だとCircleCIなどでマスタファイルをアップロードするなどすれば良いと思います。

  2. アップロードしたことによりGCSのFinalize eventのhookが実行されます。

  3. Pub/SubがSubscriberであるGAEにNotificationを送ります。

  4. Notificationを受け取ったGAEはGCSにアップロードされたyamlファイルをreadするリクエストを投げます。

  5. yamlのレスポンスを受け取ったGAEは受け取った結果をFirestoreにwriteします。


実装

前述のアーキテクチャを実装していきます。環境としては下記です。


  • Go1.11 runtime

  • GAE/SE 2nd generation

  • Google Cloud SDK 228.0.0


Cloud Storageのバケット作成

マスタデータを格納するためのバケットを作成します。GCPのコンソールからストレージを選択してバケットを作成します。


Pub/Subのトピック作成

gsutil notificationコマンドを使ってPub/Subのトピックを作成します。作成時に-f jsonを指定することで通知で送信されるpostリクエストのペイロードをjsonに設定することができます。

$ gsutil notification create -t [TOPIC_NAME] -f json gs://[BUCKET_NAME]


Subscriberの設定

トピックを作成したのでGAEで購読するためにSubscriberの設定を行います。Pub/Subのコンソールから先ほど作成したトピックを選択肢、サブスクリプションを作成を選択します。

今回はGAEのエンドポイントにリクエストを送るため/_ah/push-handlers/とうプレフィックスをendpointに付加します。このプレフィックスを付加することでPub/Subからの通知をGAEで受信することができるようになります。また後述するadmin権限でのアクセス許可を行っていくことでcURLによるリクエストを防ぐことができます。

このプレフィックスに関する詳細な説明は下記にあります。


Configuring HTTP Endpoints

https://cloud.google.com/pubsub/docs/push#configuring-http-endpoints


エンドポイント URL に push を選択して下記のendpointを登録してサブスクリプション設定を行います。

https://[PROJECT_NAME].appspot.com/_ah/push-handlers/seeds


Pub/Subからの通知を受けるendpointを生やす

サブスクリプションの設定でpush先のendpointとして設定したものをアプリケーションに生やしていきます。Pub/Subからのアクセスはインターナルなアクセスなため未認証アクセスを許容しないようにapp.yamllogin: adminの設定をしておきます。

login: adminこちらのアナウンスでdeprecatedになっており、デプロイが失敗すると記述があったのですが、代替する新しいフィールドがGo1.11のリファレンスになさそうなのとデプロイもエラーにならなかったため追加しています。

PR: https://github.com/rexitorg/sample-update-seed/pull/1


app.yaml

runtime: go111

service: sample-update-seed

handlers:
- url: /_ah/push-handlers/.*
script: auto
login: admin
- url: /.*
script: auto

includes:
- secret.yaml



GCSのファイルをReadする

Pub/Subのリクエストが受け付けられるようになったので、リクエストパラメータからバケット名、ファイル名を取得してReadします。Readしたらmapのsliceにマッピングします。

PR: https://github.com/rexitorg/sample-update-seed/pull/2


seed.go

package seed

import (
"cloud.google.com/go/storage"
"context"
"github.com/pkg/errors"
"gopkg.in/yaml.v2"
"io/ioutil"
"log"
)

var (
// ErrLoad はseedデータのロードに失敗したエラー
ErrLoad = errors.New("sample-update-seed-seed: Failed to load")
)

// Load はseedをfirestoreにロードする
func Load(ctx context.Context, bucketID string, fileName string) error {
buf, err := readSeed(ctx, bucketID, fileName)
if err != nil {
return errors.Wrap(err, ErrLoad.Error())
}

seeds, err := mapSeed(buf)
if err != nil {
return errors.Wrap(err, ErrLoad.Error())
}

log.Printf("Output seed data: %#v", seeds)

// TODO: firestoreにseedを反映する処理追加

return nil
}

func readSeed(ctx context.Context, bucketID string, fileName string) ([]byte, error) {
c, err := storage.NewClient(ctx)
if err != nil {
return nil, err
}

r, err := c.Bucket(bucketID).Object(fileName).NewReader(ctx)
if err != nil {
return nil, err
}
defer r.Close()

buf, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}

return buf, nil
}

func mapSeed(buf []byte) ([]map[string]string, error) {
src := make([]map[string]string, 0)
err := yaml.Unmarshal(buf, &src)
if err != nil {
return nil, err
}

return src, nil
}



Firestoreに書き込む

マスタデータのオブジェクトを作ることができたのでこちらをFirestoreにwriteしていきます。公式のクイックスタートではFirebaseのクライアントライブラリを使っていますが、今回はFiresotreのクライアントライブラリを使って実装しています。また環境変数のGCP_PROJECT_NAMEをsecret.yamlに追加しておきます。

PR: https://github.com/rexitorg/sample-update-seed/pull/3


seed.go

package seed

import (
"context"
"io/ioutil"
"log"
"os"
"strings"

"cloud.google.com/go/firestore"
"cloud.google.com/go/storage"
"github.com/pkg/errors"
"gopkg.in/yaml.v2"
)

var (
// ErrLoad はseedデータのロードに失敗したエラー
ErrLoad = errors.New("sample-update-seed-seed: Failed to load")
)

// Load はseedをfirestoreにロードする
func Load(ctx context.Context, bucketID string, fileName string) error {
buf, err := readSeed(ctx, bucketID, fileName)
if err != nil {
return errors.Wrap(err, ErrLoad.Error())
}

seeds, err := mapSeed(buf)
if err != nil {
return errors.Wrap(err, ErrLoad.Error())
}

collectionName := strings.Replace(fileName, ".yaml", "", 1)
err = putMulti(ctx, collectionName, seeds)
if err != nil {
return errors.Wrap(err, ErrLoad.Error())
}
log.Printf("Updated seed data: %#v", seeds)

return nil
}
...

func putMulti(ctx context.Context, collectionName string, seeds []map[string]string) error {
projectID := os.Getenv("GCP_PROJECT_ID")
c, err := firestore.NewClient(ctx, projectID)
if err != nil {
return err
}

for _, seed := range seeds {
_, err := c.Collection(collectionName).Doc(seed["id"]).Set(ctx, seed)
if err != nil {
return err
}
}

return nil
}



secret.yaml

env_variables:

GCP_PROJECT_ID: [YOUR_GCP_PROJECT_NAME]


デプロイする

$ gcloud app deploy


マスタデータファイルの用意

最後に動作確認をしていきます。例としてカテゴリマスタのyamlを用意しました。


categories.yaml

- id: 1

name: konchan
- id: 2
name: saiko
- id: 3
name: unko
- id: 4
name: aho
- id: 5
name: baka
- id: 6
name: majimanji


Cloud Storageへのアップロード

ローカルからgsutilコマンドを使ってGCSへのアップロードを行い、それをフックにして処理を実行します。本番運用だとCIからGCSにファイルをアップロードするなどすれば良いかなと思います。

$ gsutil cp categories.yml gs://[YOUR_BUCKET_NAME]


結果を確認する

マスタデータがFirestoreに反映できていることが確認できます。

スクリーンショット 2018-12-15 23.21.57.png


追加でやった方が良いこと、注意点

サンプルのため実装しませんでしたが本番運用する際は下記を実装したいと思います。


  • 型を管理する

    gRPCを調べて複数コンポーネントで共通の型を使えるようにしてみたいと思います。


  • データバリデーション

    現状はノーチェックで男気更新しているのでCIなどで型に定義したバリデーションロジックを満たすようにしたいと思います。


  • 各種エラーハンドリング

    ファイル存在確認、バケット存在確認など


  • トランザクション処理

    バルクでインサートできるようにする


  • delete,insert

    現状がただのupdateなのでdelete,insertでトランザクション処理できるようにしようかなと思います。


  • 冪等性を担保する

    Pub/Subの通知の再実行などもあるため更新は冪等な作りにしておいた方が良いかなと思います。



感想

cronなどでポーリングしたりせずにイベントフックで処理できるため良い仕組みになったかなーと思いました。またGCPのサービスの連携方法など調べならが実装を進めることができたので良い勉強になりました。まだ勉強し始めたばかりですが、他のサービスについてもドキュメントを読んだり軽く実装しながら把握していきたいと思います。


リポジトリ

今回実装したGoのコードは下記に置いておきます。サンプルコードなのでパッケージ、レイヤ構成は汚いですがご容赦ください。

https://github.com/rexitorg/sample-update-seed


参考にしたページ