ということは、Cloud SchedulerからDataflowのジョブをキック出来る?と思い、試してみました。
準備
- Dataflow APIと、テンプレートが使う適当なAPI(Pub/Subとか)を有効化
- Cloud SchedulerがDataflow APIを叩くためのサービスアカウントを作成する
- Dataflow管理者に類する権限が必要
- 入力・出力のバケット作る
設定
projects.locations.templates.createを叩き、Bulk Compress Cloud Storage Files(Google提供のテンプレート)をキックする設定をします。
下例では、gcloudコマンドで設定していますが、GCPコンソールからでも出来ます。
gcloud beta scheduler jobs create http job_name \
--schedule='every day 09:00' \
--uri="https://dataflow.googleapis.com/v1b3/projects/プロジェクトID/locations/asia-northeast1/templates" \
--message-body-from-file="dataflow_message_body.json" \
--oauth-service-account-email='サービスアカウントの名前'
最低限、設定するのは
- スケジュール間隔
- Cloud Schedulerが叩くエンドポイント
- https://dataflow.googleapis.com/v1b3/projects/プロジェクトID/locations/asia-northeast1/templates
- ジョブをキックするサービスアカウント
- HTTP Bodyを指定するファイル
- (--message-bodyにするとコマンドライン引数でも指定できます)
です。
HTTP Bodyに、Dataflow API一般の設定(gcsPathとやjobName)と、各テンプレート固有のパラメータ(parameters)を設定します。
cat dataflow_message_body.json | jq 1739ms 月 1/13 15:32:41 2020
{
"jobName": "dataflow_job_name",
"parameters": {
"inputFilePattern": "gs://バケット名/dataflow/compress_input.txt",
"outputDirectory": "gs://バケット名/dataflow/compress_output.txt.gz",
"outputFailureFile": "gs://バケット名/dataflow/failure",
"compression": "GZIP"
},
"gcsPath": "gs://dataflow-templates/latest/Bulk_Compress_GCS_Files"
}
実行
スケジュールされた時間になるまで待つか、待ち遠しい人はCloud Scheduler画面から「今すぐ実行」しましょう。
確認
ジョブが出来たか、見てみます。
gcloud beta dataflow jobs list 776ms 月 1/13 15:52:22 2020
`--region` not set; getting jobs from all available regions. Some jobs may be missing in the event of an outage. https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
JOB_ID NAME TYPE CREATION_TIME STATE REGION
2020-01-12_22_32_14-12998664790411440361 dataflow_job_name Batch 2020-01-13 06:32:16 Done asia-northeast1
ジョブが出来ています(GCPコンソールでジョブ実行状況を見られます)。
結果のファイルも見てみましょう。
gsutil cat gs://qiita_test/dataflow/compress_output.txt.gz/compress_input.txt.gz | file - 1876ms 月 1/13 15:35:45 2020
/dev/stdin: gzip compressed data, from FAT filesystem (MS-DOS, OS/2, NT)
gsutil cat gs://qiita_test/dataflow/compress_output.txt.gz/compress_input.txt.gz | gzcat 2029ms 月 1/13 15:35:52 2020
テスト入力
圧縮出来ていますね。
後始末
ジョブが作られ続けるので、動作確認終わったらCloud Schedulerのジョブは削除しましょう。
gcloud beta scheduler jobs delete job_name
補足
「projects.locations.templates.create」と「projects.templates.create 」という2つのAPIがあります。
template.createのドキュメントでは説明を見つけられなかったのですが、projects.jobs.createの方には、
To create a job, we recommend using projects.locations.jobs.create with a regional endpoint. Using projects.jobs.create is not recommended, as your job will always start in us-central1.
とあるので、「projects.locations.templates.create」の方を使った方がいいのかなーと思っています。
Dataflowジョブの停止
Dataflowジョブの停止(cancel/drain)もAPIがあるので(※)、CloudSchedulerからジョブを停止することも出来ます。
ただし、自動で割り振られるjob idの指定が必要で、定期的にジョブを起動し、定期的にそれを止めるような処理は出来ないので、実用性は低い気がします…
※ リクエストbodyでrequestedStateをdrainingに指定します