背景
- 比較的長めのジョブタスクを制御するqueueシステムを作りたかったので、Cloud Tasksで簡易実装をしていきます
- pubsubでは確認応答期限が最大10分. 1時間までは引き延ばせますが、実装が複雑になるのでCloudTasksで実装することにしました
Cloud Tasksとは
概要は以下ページ
すべてのワーカーはデフォルトのタイムアウト期限の 10 分、最大 30 分以内に HTTP レスポンス コード(200~299)を Cloud Tasks サービスに送信する必要があります。別のレスポンスが送信された場合、あるいはレスポンスがない場合、タスクが再試行されます。
- タスクの最大サイズ1MBです
実装
- あるAPI(https://sample.com/hoge)へのpushを行うqueueです
- レスポンスコード(200~299)のレスポンスが返ると、タスクはキューから出されます
import json
from google.cloud import tasks_v2
from google.protobuf import duration_pb2
def create_task():
# ジョブ内容のデータ作成
data_dict = {
'job_name': 'test_job_name',
'count': 2
}
data_str = json.dumps(data_dict, ensure_ascii=False)
data = data_str.encode("utf-8")
client = tasks_v2.CloudTasksClient()
# タスクの作成
# oidc認証設定もここで行います
task = tasks_v2.Task(
http_request=tasks_v2.HttpRequest(
http_method=tasks_v2.HttpMethod.GET,
url='https://sample.com/hoge',
headers={"Content-type": "application/json"},
oidc_token=tasks_v2.OidcToken(
service_account_email='tekitou@developer.gserviceaccount.com',
),
body=data
),
)
# 実行完了までの時刻を設定
# 最大の30分にします
duration = duration_pb2.Duration()
duration.FromSeconds(1800)
task.dispatch_deadline = duration
# Use the client to build and send the task.
return client.create_task(
tasks_v2.CreateTaskRequest(
parent=client.queue_path('PJ_PATH', 'asia-northeast1', 'queue_name'),
task=task,
)
)
参考