はじめに
旧くよりGoogle App EngineにあったTask Queues機能が、Cloud Tasks (2019.2.15現在beta)としてサービス化され、GAEの外側から呼び出せるようになりました。
従来Task QueuesはDatastoreのトランザクションに含めることが出来て、トランザクションがcommitされた場合のみタスクの実行を行うことが可能ですが、Cloud TasksとCloud Datastoreを(クライアントライブラリなどで)併用する場合はその機能をサポートしていません。1
DatastoreへPUTした内容をSearch APIやBigQueryへミラーリングしたい場合などに重宝していた機能なので、Cloud Tasks&Cloud Datastoreでも実現することができないか考えました。
ストーリー
DatastoreにPUTしたSampleエンティティをCloud Tasks経由でSearch APIに保存します。
トランザクション内(ローカルクライアント)
- Txステータス保存。(KeyはUUID)
- DatastoreにSampleエンティティPUT
- TaskQueue Add(TxステータスKeyと現在時刻を渡す)
- commit
タスクハンドラ(GAE)
- UUIDでDatastoreよりTxステータスGET
- (GETできなかったら)リトライ。一定時間経過してたらトランザクションは失敗したとみなしてタイムアウト終了
- (GETできたら)DatastoreよりSampleエンティティを取得しSearch APIに登録
実装
ローカルクライアント
まずmain関数です。
func main() {
log.SetFlags(log.Lshortfile)
c := context.Background()
client, err := datastore.NewClient(c, projectID)
if err != nil {
log.Fatal(err.Error())
}
sample := tasktx.Sample{
ID: uuid.Must(uuid.NewV4()).String(),
Value: rand.Float64(),
CreatedAt: time.Now(),
}
// トランザクションのタイムアウト設定
// Taskのタイムアウトよりも短くする必要がある
// Cloud Tasks APIは現状30sec以上のタイムアウトを指定できない(正式仕様かは不明)
c, cancel := context.WithTimeout(c, 30*time.Second)
defer cancel()
_, err = client.RunInTransaction(c, func(tx *datastore.Transaction) error {
// Sampleモデル保存
key := datastore.NameKey("Sample", sample.ID, nil)
if _, err := tx.Put(key, &sample); err != nil {
return err
}
// Txステータスの保存
// 複数のTask Queue起動で利用可能
txStatus := &tasktx.TxStatus{
ID: uuid.Must(uuid.NewV4()).String(),
CreatedAt: time.Now(),
}
txStatusKey := datastore.NameKey("TxStatus", txStatus.ID, nil)
if _, err := tx.Put(txStatusKey, txStatus); err != nil {
return err
}
// タスク起動
// 必須ではないがタスクはできるだけトランザクションの最後の方でまとめて起動した方がよい
// 起動〜commitの間隔が長いと指数バックオフリトライの影響でタスク完了までの間隔がさらに開く可能性あり
// ていうか、Cloud TasksはdelayやETA指定できないんかな(・ω・)
if err := addTask(c, txStatus.ID, sample); err != nil {
return err
}
//↓のコメントを外せばcancel&rollbackを試せる
//time.Sleep(40 * time.Second)
return nil
})
if err != nil {
log.Fatal(err.Error())
}
log.Println("done")
}
次にTaskQueue起動処理です。
TxステータスのKeyと実行時刻をHTTP Headerに設定して渡してます。
func addTask(ctx context.Context, txID string, sample tasktx.Sample) error {
client, err := cloudtasks.NewClient(ctx)
if err != nil {
log.Println("cloudtasks NewClient failed")
return err
}
b, err := json.Marshal(sample)
if err != nil {
return err
}
queuePath := fmt.Sprintf("projects/%s/locations/us-central1/queues/%s", projectID, queueID)
req := &taskspb.CreateTaskRequest{
Parent: queuePath,
Task: &taskspb.Task{
PayloadType: &taskspb.Task_AppEngineHttpRequest{
AppEngineHttpRequest: &taskspb.AppEngineHttpRequest{
HttpMethod: taskspb.HttpMethod_POST,
RelativeUri: "/putdata",
Body: b,
Headers: map[string]string{
"X-TaskTx-ID": txID,
"X-TaskTx-DispatchTime": time.Now().Format(time.RFC3339Nano),
},
},
},
},
}
_, err = client.CreateTask(ctx, req)
if err != nil {
log.Println("CreateTask failed")
return err
}
return nil
}
タスクハンドラ(GAE)
Go1.11runtimeで実装しています。
func handlePutData(w http.ResponseWriter, r *http.Request) {
c := appengine.NewContext(r)
txID := r.Header.Get("X-TaskTx-ID")
dispatchTime, err := time.Parse(time.RFC3339Nano, r.Header.Get("X-TaskTx-DispatchTime"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
txStatusKey := datastore.NewKey(c, "TxStatus", txID, 0, nil)
var txStatus tasktx.TxStatus
if err = datastore.Get(c, txStatusKey, &txStatus); err == datastore.ErrNoSuchEntity {
if time.Now().Sub(dispatchTime) > 60*time.Second {
log.Println("timeout")
return
} else {
log.Println("retry")
// ステータス何返すべきか迷った。Lockedはどんなもんだろ
http.Error(w, err.Error(), http.StatusLocked)
return
}
} else if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var sample tasktx.Sample
if err = json.Unmarshal(b, &sample); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// get again just in case
key := datastore.NewKey(c, "Sample", sample.ID, 0, nil)
if err := datastore.Get(c, key, &sample); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
index, err := search.Open("Sample")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, err = index.Put(c, sample.ID, &sample)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Println("done")
w.WriteHeader(http.StatusOK)
}
まとめ
従来の仕組みと比べるとかなり面倒な手順になっていますが、追加できるタスク数の制限(従来5個まで)なくなるのはちょっと嬉しいメリットかもです。
ちょっとテストした感じ想定通りに動いてそうですが、まだプロダクションレベルでは採用していません。
もし何か穴があったらツッコミいただけたら超嬉しいです。
-
Cloud Tasksの中の人が、Transactional tasksのユースケースを集めてるという噂を聞いたので、しかるべきところにリクエストやissueなげとくといつかトランザクションが正式サポートされるかもしれません。 ↩