LoginSignup
6

More than 3 years have passed since last update.

Cloud SchedulerでCloud Dataflowのジョブを定期的に起動する

Posted at

ということは、Cloud SchedulerからDataflowのジョブをキック出来る?と思い、試してみました。

準備

設定

projects.locations.templates.createを叩き、Bulk Compress Cloud Storage Files(Google提供のテンプレート)をキックする設定をします。

下例では、gcloudコマンドで設定していますが、GCPコンソールからでも出来ます。

gcloud beta scheduler jobs create http job_name \
--schedule='every day 09:00' \
--uri="https://dataflow.googleapis.com/v1b3/projects/プロジェクトID/locations/asia-northeast1/templates" \
--message-body-from-file="dataflow_message_body.json" \
--oauth-service-account-email='サービスアカウントの名前'

最低限、設定するのは

です。

HTTP Bodyに、Dataflow API一般の設定(gcsPathとやjobName)と、各テンプレート固有のパラメータ(parameters)を設定します。

cat dataflow_message_body.json | jq                                                                                                                         1739ms  月  1/13 15:32:41 2020
{
  "jobName": "dataflow_job_name",
  "parameters": {
    "inputFilePattern": "gs://バケット名/dataflow/compress_input.txt",
    "outputDirectory": "gs://バケット名/dataflow/compress_output.txt.gz",
    "outputFailureFile": "gs://バケット名/dataflow/failure",
    "compression": "GZIP"
  },
  "gcsPath": "gs://dataflow-templates/latest/Bulk_Compress_GCS_Files"
}

実行

スケジュールされた時間になるまで待つか、待ち遠しい人はCloud Scheduler画面から「今すぐ実行」しましょう。

確認

ジョブが出来たか、見てみます。

gcloud beta dataflow jobs list                                                                776ms  月  1/13 15:52:22 2020
`--region` not set; getting jobs from all available regions. Some jobs may be missing in the event of an outage. https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
JOB_ID                                    NAME                TYPE   CREATION_TIME        STATE      REGION
2020-01-12_22_32_14-12998664790411440361  dataflow_job_name   Batch  2020-01-13 06:32:16  Done       asia-northeast1

ジョブが出来ています(GCPコンソールでジョブ実行状況を見られます)。

結果のファイルも見てみましょう。

gsutil cat gs://qiita_test/dataflow/compress_output.txt.gz/compress_input.txt.gz | file -                                                                   1876ms  月  1/13 15:35:45 2020
/dev/stdin: gzip compressed data, from FAT filesystem (MS-DOS, OS/2, NT)

gsutil cat gs://qiita_test/dataflow/compress_output.txt.gz/compress_input.txt.gz | gzcat                                                                    2029ms  月  1/13 15:35:52 2020
テスト入力

圧縮出来ていますね。

後始末

ジョブが作られ続けるので、動作確認終わったらCloud Schedulerのジョブは削除しましょう。

gcloud beta scheduler jobs delete job_name                                                      

補足

「projects.locations.templates.create」と「projects.templates.create 」という2つのAPIがあります。

template.createのドキュメントでは説明を見つけられなかったのですが、projects.jobs.createの方には、

To create a job, we recommend using projects.locations.jobs.create with a regional endpoint. Using projects.jobs.create is not recommended, as your job will always start in us-central1.

とあるので、「projects.locations.templates.create」の方を使った方がいいのかなーと思っています。

Dataflowジョブの停止

Dataflowジョブの停止(cancel/drain)もAPIがあるので(※)、CloudSchedulerからジョブを停止することも出来ます。
ただし、自動で割り振られるjob idの指定が必要で、定期的にジョブを起動し、定期的にそれを止めるような処理は出来ないので、実用性は低い気がします…

※ リクエストbodyでrequestedStateをdrainingに指定します

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6