はじめに
- GAE のスタンダード環境のタイムアウト時間は 60 秒なので、それ以上時間のかかる処理を GAE 上で行いたい場合は Cloud Tasks を使います
- Cloud Tasks を使うとバックグラウンドで GAE の処理を実行することができて、タイムアウト時間が 10 分以上とスタンダード環境よりも長くなります
- GAE では Cron を使って定時処理を実行することができるので、Cron と Cloud Tasks を組み合わせることで、時間のかかる定時実行処理を Cloud Functions などを使わずに GAE のみで実装することができます
- ということで GAE/Go 1.12 環境で Cron と Cloud Tasks を使って定時バッチ処理をするサンプルを記載していきます
手順概要
- Cloud Tasks のタスク用のキューを作成
- Cloud Tasks のタスクを処理するハンドラの作成
- Cron の処理を実行するハンドラの作成
- Cron の設定
Cloud Tasks のタスク用のキューを作成
- Cloud Tasks のキューは PubSub のトピックみたいなやつで gcloud コマンドで作成します
# 作成
gcloud tasks queues create [QUEUE_ID]
# 確認
gcloud tasks queues describe [QUEUE_ID]
Cloud Tasks のタスクを処理するハンドラの作成
- Cloud Tasks のハンドラは基本的に普通の HTTP ハンドラです
- キューに入ったメッセージは順次デキューされ、タスクハンドラがメッセージを受け取って処理を実行します
- ヘッダからタスクの情報を受け取り、ボディからタスクの引数などを受け取ります
// Sample task_handler is an App Engine app demonstrating Cloud Tasks handling.
package main
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
)
func main() {
// Allow confirmation the task handling service is running.
http.HandleFunc("/", indexHandler)
// Handle all tasks.
http.HandleFunc("/task_handler", taskHandler)
port := os.Getenv("PORT")
if port == "" {
port = "8080"
log.Printf("Defaulting to port %s", port)
}
log.Printf("Listening on port %s", port)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", port), nil))
}
// indexHandler responds to requests with our greeting.
func indexHandler(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
fmt.Fprint(w, "Hello, World!")
}
// taskHandler processes task requests.
func taskHandler(w http.ResponseWriter, r *http.Request) {
t, ok := r.Header["X-Appengine-Taskname"]
if !ok || len(t[0]) == 0 {
// You may use the presence of the X-Appengine-Taskname header to validate
// the request comes from Cloud Tasks.
log.Println("Invalid Task: No X-Appengine-Taskname request header found")
http.Error(w, "Bad Request - Invalid Task", http.StatusBadRequest)
return
}
taskName := t[0]
// Pull useful headers from Task request.
q, ok := r.Header["X-Appengine-Queuename"]
queueName := ""
if ok {
queueName = q[0]
}
// Extract the request body for further task details.
body, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Printf("ReadAll: %v", err)
http.Error(w, "Internal Error", http.StatusInternalServerError)
return
}
// Log & output details of the task.
output := fmt.Sprintf("Completed task: task queue(%s), task name(%s), payload(%s)",
queueName,
taskName,
string(body),
)
log.Println(output)
// Set a non-2xx status code to indicate a failure in task processing that should be retried.
// For example, http.Error(w, "Internal Server Error: Task Processing", http.StatusInternalServerError)
fmt.Fprintln(w, output)
}
Cron の処理を実行するハンドラの作成
- Cron で Cloud Tasks メッセージを送信します
- Cron の処理を行うハンドラもほぼ普通の HTTP ハンドラです
- キューのパスは
gcloud tasks queues describe
で確認して記入します
func main() {
// 略
// Handle cron
http.HandleFunc("/cron_handler", CronHandler)
// 略
}
func CronHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if r.Header.Get("X-Appengine-Cron") != "true" {
log.Println("Invalid request: No X-Appengine-Cron request header found")
http.Error(w, "Bad Request - Invalid request", http.StatusBadRequest)
return
}
projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")
client, err := cloudtasks.NewClient(ctx)
if err != nil {
log.Printf("NewClient: %v", err)
http.Error(w, "Internal Error", http.StatusInternalServerError)
return
}
// Build the Task queue path.
locationID := "LOCATION_ID" //FIXME
queueID := "QUEUE_ID" //FIXME
queuePath := fmt.Sprintf("projects/%s/locations/%s/queues/%s", projectID, locationID, queueID)
// Build the Task payload.
// https://godoc.org/google.golang.org/genproto/googleapis/cloud/tasks/v2#CreateTaskRequest
req := &taskspb.CreateTaskRequest{
Parent: queuePath,
Task: &taskspb.Task{
// https://godoc.org/google.golang.org/genproto/googleapis/cloud/tasks/v2#AppEngineHttpRequest
MessageType: &taskspb.Task_AppEngineHttpRequest{
AppEngineHttpRequest: &taskspb.AppEngineHttpRequest{
HttpMethod: taskspb.HttpMethod_POST,
RelativeUri: "/task_handler",
},
},
},
}
// Add a payload message if one is present.
data, err := json.Marshal(message.CreateContentMessage{})
req.Task.GetAppEngineHttpRequest().Body = data
createdTask, err := client.CreateTask(ctx, req)
if err != nil {
log.Printf("CreateTask: %v", err)
http.Error(w, "Internal Error", http.StatusInternalServerError)
return
}
log.Printf("Created task. %+v", createdTask)
}
Cron の設定
- yaml ファイルでアプリケーションと Cron の設定をします
app.yaml
runtime: go112
cron.yaml
cron:
- description: "cron job"
url: /cron_handler
schedule: every 5 minutes
デプロイ
gcloud app deploy app.yaml
gcloud app deploy cron.yaml