8
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

GAE/Go 1.12 で Cron と Cloud Tasks を使ってまあまあ時間のかかる定時バッチ処理をする

Posted at

はじめに

  • GAE のスタンダード環境のタイムアウト時間は 60 秒なので、それ以上時間のかかる処理を GAE 上で行いたい場合は Cloud Tasks を使います
  • Cloud Tasks を使うとバックグラウンドで GAE の処理を実行することができて、タイムアウト時間が 10 分以上とスタンダード環境よりも長くなります
  • GAE では Cron を使って定時処理を実行することができるので、Cron と Cloud Tasks を組み合わせることで、時間のかかる定時実行処理を Cloud Functions などを使わずに GAE のみで実装することができます
  • ということで GAE/Go 1.12 環境で Cron と Cloud Tasks を使って定時バッチ処理をするサンプルを記載していきます

手順概要

  1. Cloud Tasks のタスク用のキューを作成
  2. Cloud Tasks のタスクを処理するハンドラの作成
  3. Cron の処理を実行するハンドラの作成
  4. Cron の設定

Cloud Tasks のタスク用のキューを作成

  • Cloud Tasks のキューは PubSub のトピックみたいなやつで gcloud コマンドで作成します
# 作成
gcloud tasks queues create [QUEUE_ID]

# 確認
gcloud tasks queues describe [QUEUE_ID]

Cloud Tasks のタスクを処理するハンドラの作成

  • Cloud Tasks のハンドラは基本的に普通の HTTP ハンドラです
  • キューに入ったメッセージは順次デキューされ、タスクハンドラがメッセージを受け取って処理を実行します
  • ヘッダからタスクの情報を受け取り、ボディからタスクの引数などを受け取ります
// Sample task_handler is an App Engine app demonstrating Cloud Tasks handling.
package main

import (
        "fmt"
        "io/ioutil"
        "log"
        "net/http"
        "os"
)

func main() {
        // Allow confirmation the task handling service is running.
        http.HandleFunc("/", indexHandler)

        // Handle all tasks.
        http.HandleFunc("/task_handler", taskHandler)

        port := os.Getenv("PORT")
        if port == "" {
                port = "8080"
                log.Printf("Defaulting to port %s", port)
        }

        log.Printf("Listening on port %s", port)
        log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", port), nil))
}

// indexHandler responds to requests with our greeting.
func indexHandler(w http.ResponseWriter, r *http.Request) {
        if r.URL.Path != "/" {
                http.NotFound(w, r)
                return
        }
        fmt.Fprint(w, "Hello, World!")
}

// taskHandler processes task requests.
func taskHandler(w http.ResponseWriter, r *http.Request) {
        t, ok := r.Header["X-Appengine-Taskname"]
        if !ok || len(t[0]) == 0 {
                // You may use the presence of the X-Appengine-Taskname header to validate
                // the request comes from Cloud Tasks.
                log.Println("Invalid Task: No X-Appengine-Taskname request header found")
                http.Error(w, "Bad Request - Invalid Task", http.StatusBadRequest)
                return
        }
        taskName := t[0]

        // Pull useful headers from Task request.
        q, ok := r.Header["X-Appengine-Queuename"]
        queueName := ""
        if ok {
                queueName = q[0]
        }

        // Extract the request body for further task details.
        body, err := ioutil.ReadAll(r.Body)
        if err != nil {
                log.Printf("ReadAll: %v", err)
                http.Error(w, "Internal Error", http.StatusInternalServerError)
                return
        }

        // Log & output details of the task.
        output := fmt.Sprintf("Completed task: task queue(%s), task name(%s), payload(%s)",
                queueName,
                taskName,
                string(body),
        )
        log.Println(output)

        // Set a non-2xx status code to indicate a failure in task processing that should be retried.
        // For example, http.Error(w, "Internal Server Error: Task Processing", http.StatusInternalServerError)
        fmt.Fprintln(w, output)
}

Cron の処理を実行するハンドラの作成

  • Cron で Cloud Tasks メッセージを送信します
  • Cron の処理を行うハンドラもほぼ普通の HTTP ハンドラです
  • キューのパスは gcloud tasks queues describe で確認して記入します
func main() {
	// 略
  
	// Handle cron
	http.HandleFunc("/cron_handler", CronHandler)
  
	// 略
}

func CronHandler(w http.ResponseWriter, r *http.Request) {
	ctx := r.Context()

	if r.Header.Get("X-Appengine-Cron") != "true" {
		log.Println("Invalid request: No X-Appengine-Cron request header found")
		http.Error(w, "Bad Request - Invalid request", http.StatusBadRequest)
		return
	}

	projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")

	client, err := cloudtasks.NewClient(ctx)
	if err != nil {
		log.Printf("NewClient: %v", err)
		http.Error(w, "Internal Error", http.StatusInternalServerError)
		return
	}

	// Build the Task queue path.
	locationID := "LOCATION_ID" //FIXME
	queueID := "QUEUE_ID" //FIXME
	queuePath := fmt.Sprintf("projects/%s/locations/%s/queues/%s", projectID, locationID, queueID)

	// Build the Task payload.
	// https://godoc.org/google.golang.org/genproto/googleapis/cloud/tasks/v2#CreateTaskRequest
	req := &taskspb.CreateTaskRequest{
		Parent: queuePath,
		Task: &taskspb.Task{
			// https://godoc.org/google.golang.org/genproto/googleapis/cloud/tasks/v2#AppEngineHttpRequest
			MessageType: &taskspb.Task_AppEngineHttpRequest{
				AppEngineHttpRequest: &taskspb.AppEngineHttpRequest{
					HttpMethod:  taskspb.HttpMethod_POST,
					RelativeUri: "/task_handler",
				},
			},
		},
	}

	// Add a payload message if one is present.
	data, err := json.Marshal(message.CreateContentMessage{})
	req.Task.GetAppEngineHttpRequest().Body = data

	createdTask, err := client.CreateTask(ctx, req)
	if err != nil {
		log.Printf("CreateTask: %v", err)
		http.Error(w, "Internal Error", http.StatusInternalServerError)
		return
	}
	log.Printf("Created task. %+v", createdTask)
}

Cron の設定

  • yaml ファイルでアプリケーションと Cron の設定をします
app.yaml
runtime: go112
cron.yaml
cron:
- description: "cron job"
  url: /cron_handler
  schedule: every 5 minutes

デプロイ

gcloud app deploy app.yaml
gcloud app deploy cron.yaml

参考資料

8
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?