12
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Cloud Tasksのタスク追加をCloud Datastoreトランザクションに含める

Last updated at Posted at 2019-02-15

はじめに

旧くよりGoogle App EngineにあったTask Queues機能が、Cloud Tasks (2019.2.15現在beta)としてサービス化され、GAEの外側から呼び出せるようになりました。

従来Task QueuesはDatastoreのトランザクションに含めることが出来て、トランザクションがcommitされた場合のみタスクの実行を行うことが可能ですが、Cloud TasksとCloud Datastoreを(クライアントライブラリなどで)併用する場合はその機能をサポートしていません。1
DatastoreへPUTした内容をSearch APIやBigQueryへミラーリングしたい場合などに重宝していた機能なので、Cloud Tasks&Cloud Datastoreでも実現することができないか考えました。

ストーリー

DatastoreにPUTしたSampleエンティティをCloud Tasks経由でSearch APIに保存します。

トランザクション内(ローカルクライアント)

  1. Txステータス保存。(KeyはUUID)
  2. DatastoreにSampleエンティティPUT
  3. TaskQueue Add(TxステータスKeyと現在時刻を渡す)
  4. commit

タスクハンドラ(GAE)

  1. UUIDでDatastoreよりTxステータスGET
  2. (GETできなかったら)リトライ。一定時間経過してたらトランザクションは失敗したとみなしてタイムアウト終了
  3. (GETできたら)DatastoreよりSampleエンティティを取得しSearch APIに登録

実装

ローカルクライアント

まずmain関数です。

func main() {
	log.SetFlags(log.Lshortfile)

	c := context.Background()

	client, err := datastore.NewClient(c, projectID)
	if err != nil {
		log.Fatal(err.Error())
	}

	sample := tasktx.Sample{
		ID:        uuid.Must(uuid.NewV4()).String(),
		Value:     rand.Float64(),
		CreatedAt: time.Now(),
	}

	// トランザクションのタイムアウト設定
	// Taskのタイムアウトよりも短くする必要がある
	// Cloud Tasks APIは現状30sec以上のタイムアウトを指定できない(正式仕様かは不明)
	c, cancel := context.WithTimeout(c, 30*time.Second)
	defer cancel()

	_, err = client.RunInTransaction(c, func(tx *datastore.Transaction) error {

		// Sampleモデル保存
		key := datastore.NameKey("Sample", sample.ID, nil)
		if _, err := tx.Put(key, &sample); err != nil {
			return err
		}

		// Txステータスの保存
		// 複数のTask Queue起動で利用可能
		txStatus := &tasktx.TxStatus{
			ID:        uuid.Must(uuid.NewV4()).String(),
			CreatedAt: time.Now(),
		}

		txStatusKey := datastore.NameKey("TxStatus", txStatus.ID, nil)
		if _, err := tx.Put(txStatusKey, txStatus); err != nil {
			return err
		}

		// タスク起動
		// 必須ではないがタスクはできるだけトランザクションの最後の方でまとめて起動した方がよい
		// 起動〜commitの間隔が長いと指数バックオフリトライの影響でタスク完了までの間隔がさらに開く可能性あり
		// ていうか、Cloud TasksはdelayやETA指定できないんかな(・ω・)
		if err := addTask(c, txStatus.ID, sample); err != nil {
			return err
		}

		//↓のコメントを外せばcancel&rollbackを試せる
		//time.Sleep(40 * time.Second)

		return nil
	})

	if err != nil {
		log.Fatal(err.Error())
	}

	log.Println("done")
}

次にTaskQueue起動処理です。
TxステータスのKeyと実行時刻をHTTP Headerに設定して渡してます。

func addTask(ctx context.Context, txID string, sample tasktx.Sample) error {
	client, err := cloudtasks.NewClient(ctx)
	if err != nil {
		log.Println("cloudtasks NewClient failed")
		return err
	}

	b, err := json.Marshal(sample)
	if err != nil {
		return err
	}

	queuePath := fmt.Sprintf("projects/%s/locations/us-central1/queues/%s", projectID, queueID)

	req := &taskspb.CreateTaskRequest{
		Parent: queuePath,
		Task: &taskspb.Task{
			PayloadType: &taskspb.Task_AppEngineHttpRequest{
				AppEngineHttpRequest: &taskspb.AppEngineHttpRequest{
					HttpMethod:  taskspb.HttpMethod_POST,
					RelativeUri: "/putdata",
					Body:        b,
					Headers: map[string]string{
						"X-TaskTx-ID":           txID,
						"X-TaskTx-DispatchTime": time.Now().Format(time.RFC3339Nano),
					},
				},
			},
		},
	}

	_, err = client.CreateTask(ctx, req)
	if err != nil {
		log.Println("CreateTask failed")
		return err
	}

	return nil
}

タスクハンドラ(GAE)

Go1.11runtimeで実装しています。

func handlePutData(w http.ResponseWriter, r *http.Request) {

	c := appengine.NewContext(r)

	txID := r.Header.Get("X-TaskTx-ID")

	dispatchTime, err := time.Parse(time.RFC3339Nano, r.Header.Get("X-TaskTx-DispatchTime"))
	if err != nil {
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}

	txStatusKey := datastore.NewKey(c, "TxStatus", txID, 0, nil)
	var txStatus tasktx.TxStatus
	if err = datastore.Get(c, txStatusKey, &txStatus); err == datastore.ErrNoSuchEntity {
		if time.Now().Sub(dispatchTime) > 60*time.Second {
			log.Println("timeout")
			return
		} else {
			log.Println("retry")
			// ステータス何返すべきか迷った。Lockedはどんなもんだろ
			http.Error(w, err.Error(), http.StatusLocked)
			return
		}
	} else if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	b, err := ioutil.ReadAll(r.Body)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	var sample tasktx.Sample
	if err = json.Unmarshal(b, &sample); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	// get again just in case
	key := datastore.NewKey(c, "Sample", sample.ID, 0, nil)
	if err := datastore.Get(c, key, &sample); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	index, err := search.Open("Sample")
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	_, err = index.Put(c, sample.ID, &sample)

	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	log.Println("done")

	w.WriteHeader(http.StatusOK)
}

まとめ

従来の仕組みと比べるとかなり面倒な手順になっていますが、追加できるタスク数の制限(従来5個まで)なくなるのはちょっと嬉しいメリットかもです。

ちょっとテストした感じ想定通りに動いてそうですが、まだプロダクションレベルでは採用していません。
もし何か穴があったらツッコミいただけたら超嬉しいです。

  1. Cloud Tasksの中の人が、Transactional tasksのユースケースを集めてるという噂を聞いたので、しかるべきところにリクエストやissueなげとくといつかトランザクションが正式サポートされるかもしれません。

12
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?