5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

GAEでPub/Subに秒間1万リクエストをPub/Subに突っ込むまでの道のり

Posted at

はじめに

この記事はiRidge Advent Calendar 2019 における12/14分の記事になります。

注意点

約1年前にやったプロジェクトを思い出して書いております。
技術的にもどんどんアップデートされていく分野なので情報が古い可能性がございます。
(特に、GAEは大きめの更新があったと聞いております)

背景

弊社のシステムでは、秒間3千〜1万程度のGPS位置情報がエンドユーザから送信されてきます。 過去よりユーザ数が上昇してこれまでのシステムでは、捌き切れなくなる問題を抱えていることから、リプレイスすることになりました。

要件(実現したいこと)

とにかく、エンドユーザ(アプリ端末)から送られてきた位置情報をGCPのCloud Pub/Subに突っ込むだけ。 送られてきたデータは多少加工するものの、ほぼそのままデータをキューに入れるのみと考えてよい。

なお、様々な背景はあるものの、キューにPub/Subを選んだ主な理由は以下。

  • 後続で、Dataflowを使って、大量データを使ったリアルタイム処理をしたい。
  • 送信されてきたデータはBigQueryに格納したい
    • GCPで用意されているキューを使いたい。

最初の設計

GAE/Go standard Environmentを使って実現
こんな感じのシンプルなコードです。(便宜的にエラー処理は省いています)

func main() {
    http.HandleFunc("/", handler)
    appengine.Main()
}

func handler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
    ctx := appengine.NewContext(r)

    locationData := LocationData{}
    json.NewDecoder(r.Body).Decode(&locationData)

    // データの加工(省略)
    
    client, _ := pubsub.NewClient(ctx, os.Getenv("PROJECT_ID"))
    topic := client.Topic(os.Getenv("TOPIC_ID"))
    topic.Publish(ctx, &pubsub.Message{Data: locationData}).Get(ctx)
}

結果

全くパフォーマンス出ませんでした。

負荷試験を実施したところ、インスタンスが100台以上立ち上がってしまう上に全く捌ききれない事態に。
インスタンスタイプなどのGAEの設定値でどうにかチューニングできるレベルのものではありませんでした。

原因

Pub/SubにPublishする処理がけっこう重く、到底秒間1万リクエストを捌くことができなかった。

解決までの道のり

方針

設計1では、ユーザから来た1リクエストに対し、Pub/Subに1リクエスト送っていた。 Pub/SubへのPublish処理が重いので、バルクインサートすればいいのでは?

topicには、PublishSettingsというものがあり、これを設定することで実現できそう。
https://godoc.org/cloud.google.com/go/pubsub#PublishSettings

ただし、この設定を使う場合、定義したtopicを他のリクエストで使い回さないといけないので、topicの定義はhandler内ではなく、mainで行う必要あり。

同時に、topicを定義するためのclienthandlerの外で行う必要あり。

var topic *pubsub.Topic

func main(){
    client, _ := pubsub.NewClient(ctx, os.Getenv("PROJECT_ID"))
    topic := client.Topic(os.Getenv("TOPIC_ID"))
    topic.PublishSettings = pubsub.PublishSettings{
		DelayThreshold: 1, // 便宜的に全て1
		CountThreshold: 1,
		ByteThreshold:  1,
		Timeout:        1,
    }
    http.HandleFunc("/", handler)
    appengine.Main()
}

func handler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
    ctx := appengine.NewContext(r)

    locationData := LocationData{}
    json.NewDecoder(r.Body).Decode(&locationData)

    // データの加工(省略)
    
    topic.Publish(ctx, &pubsub.Message{Data: locationData}).Get(ctx)
}

問題

ここで問題発生!

上記コードのpubsub.NewClientで指定するcontextには何を渡せばいいのか?
appengine.NewContext(r)を渡したいが、これは、引数にhttp.Requestを指定する必要があるので、handler関数内でしか定義できない。

とりあえず、context.Background()を渡しておけばいいかと雑に判断して試験実行。

結果

動きません。

どうやら、appengine.NewContext(r)でないとGAE standardはまともに動かないらしい。
(今は変わっていそう)

最終的な結論

当初、いくら調査しても、standardでは実現できる方法が見つからなかった。
したがって、GAE Flexibleを使って実現することにした。 こちらは、普通のcontextを使えるので、以下のコードで実現できた。

var topic *pubsub.Topic

func main(){
    ctx := context.Background()
    client, _ := pubsub.NewClient(ctx, os.Getenv("PROJECT_ID"))
    topic := client.Topic(os.Getenv("TOPIC_ID"))
    topic.PublishSettings = pubsub.PublishSettings{
		DelayThreshold: 1, // 便宜的に全て1
		CountThreshold: 1,
		ByteThreshold:  1,
		Timeout:        1,
    }
    http.HandleFunc("/", handler)
}

func handler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
    ctx := context.Background()

    locationData := LocationData{}
    json.NewDecoder(r.Body).Decode(&locationData)

    // データの加工(省略)
    
    topic.Publish(ctx, &pubsub.Message{Data: locationData}).Get(ctx)
}

最後に

おそらく、今はGAEも色々更新されてより賢い方法があると思われるので、そちらを調べてみてください。

5
2
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?