LoginSignup
0
0

Cloud Tasksで長時間実行ジョブのキュー管理をマスターする

Last updated at Posted at 2024-02-13

背景

  • 比較的長めのジョブタスクを制御するqueueシステムを作りたかったので、Cloud Tasksで簡易実装をしていきます
  • pubsubでは確認応答期限が最大10分. 1時間までは引き延ばせますが、実装が複雑になるのでCloudTasksで実装することにしました

Cloud Tasksとは

概要は以下ページ

すべてのワーカーはデフォルトのタイムアウト期限の 10 分、最大 30 分以内に HTTP レスポンス コード(200~299)を Cloud Tasks サービスに送信する必要があります。別のレスポンスが送信された場合、あるいはレスポンスがない場合、タスクが再試行されます。

  • タスクの最大サイズ1MBです

実装


import json
from google.cloud import tasks_v2
from google.protobuf import duration_pb2

def create_task():
    # ジョブ内容のデータ作成
    data_dict = {
        'job_name': 'test_job_name',
        'count': 2
    }
    data_str = json.dumps(data_dict, ensure_ascii=False)
    data = data_str.encode("utf-8")
    
    client = tasks_v2.CloudTasksClient()

    # タスクの作成
    # oidc認証設定もここで行います
    task = tasks_v2.Task(
        http_request=tasks_v2.HttpRequest(
            http_method=tasks_v2.HttpMethod.GET,
            url='https://sample.com/hoge',
            headers={"Content-type": "application/json"},
            oidc_token=tasks_v2.OidcToken(
                service_account_email='tekitou@developer.gserviceaccount.com',
            ),
            body=data
        ),
    )

    # 実行完了までの時刻を設定
    # 最大の30分にします
    duration = duration_pb2.Duration()
    duration.FromSeconds(1800)
    task.dispatch_deadline = duration

    # Use the client to build and send the task.
    return client.create_task(
        tasks_v2.CreateTaskRequest(
            parent=client.queue_path('PJ_PATH', 'asia-northeast1', 'queue_name'),
            task=task,
        )
    )

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0