はじめに
こちらは社内技術勉強会用の資料として作成したものです。
Google Cloud Platform (GCP) で Cloud Dataflow によりバッチ処理を作成してみます。DataflowとSecret Managerを利用します。
やりたいこと
処理内容
ソースコードの管理にGitLabを使用しているものとします。GitLab APIを使用して指定期間のマージリクエストの一覧を取得し、それらの変更ファイル数をBigQueryに記録する、ということをしたいとします。
このアプリケーションを開発したとして、それを1日1回定期的に実行できるようにします。
要件
- 1日1回実行します。
- 処理の実行中にだけ課金される仕組みにします。
GCPで利用できそうなサービス
Google Compute Engine (GCE) のインスタンスを常時起動しておき、cron で実行を開始する、ということであれば話は難しくないかもしれませんが、コストを抑えるために、サーバレスでの実現を目指します。独自のアプリケーションを実行する、ということでは、以下のようなサービスが利用できそうです。
サービス | 用途 | 課金 | タイムアウト |
---|---|---|---|
Cloud Functions | Web APIの公開 | 実行時間で課金 | 最大9分 |
Cloud Run | Web APIの公開 | 実行時間で課金 | 最大15分 |
Cloud Dataflow | Apache Beamによる処理パイプラインの構築 | 主にジョブの実行時間で課金 | ??? |
Cloud Composer | Apache Airflowによる処理パイプラインの構築 | GCEその他サービスの実行時間で課金 | ??? |
Cloud Data Fusion | CDAPによる処理パイプラインの構築 | パイプラインの開発と実行で課金 | ??? |
Web API形式にすれば簡単でよいのですが、制限時間があるのが困ります。
それ以外はパイプライン処理を定義するサービスになっていますが、アプリケーションを1つ実行できればいいだけなので、パイプライン制御用のフレームワークなどは特には欲しくありません。
ちょうどいいのがなくて困ります...
嫌なら使わなければいいじゃないか、ということで、世の中の人たちも、いろいろ工夫をされているようです。
世の中での工夫
Cloud FunctionsからGCEインスタンスを起動する
- Google Compute Engine を用いた機械学習モデル学習バッチのスケジュール実行 (2019-10-07)
- 邪道だけど安くて簡単 - GCPで重いバッチ処理 (2019-12-19)
Cloud FunctionsでGCEインスタンスを起動し、処理が完了したら自前でインスタンスを停止する、という作戦です。
GCEだけで実現する
- Google Compute Engineでcronを使って日次バッチ処理を実行する (投稿時期不明)
- GCPで長時間バッチ処理終了時に自動終了したい (04/07/2020)
上記1つ目の記事は、低スペックのGCEインスタンスを常時稼働させておき、そこからcronで別のインスタンスを起動する、という作戦です。2つ目は起動方法が書かれておらず不明ですが、挙げておきます。
インスタンスの停止はCloud Functionsを利用する場合と同様、自前で停止処理を行っています。
Cloud FunctionsからCloud Dataflowを起動する
Cloud FunctionsからCloud Dataflowを起動する、という構成にしているところもあるようです。
今回やってみる構成
- 自前でGCEインスタンスの起動、停止を制御するのが面倒
- Web APIとしてのタイムアウトは欲しくない
- 複数のアプリケーションを組み合わせるような必要がない
上記のような点から、Cloud Dataflowを利用してみることにします。
Cloud Dataflow
DataflowとApache Beam
Dataflowは、Apache Beamのバックエンド(ランナー)です。
こちらの説明 によると、「Apache Beam は、バッチとストリーミングの両方のデータの並列処理パイプラインを定義するオープンソースの統合モデルです。」とのことです。
ここではバッチの実行に利用してみます。
実行するアプリケーションの内容
1日ごとに、前日1日間で作成されたGitLab上のマージリクエストの一覧を取得し、それら一つひとつの変更ファイル数を取得します。以下のような流れになります。
- GitLab上の対象グループのプロジェクト一覧を取得する(プロジェクトIDからプロジェクト名を取り出せるようにするため)。
- 対象グループの前日1日間に作成されたマージリクエストの一覧を取得する。
- 取得したすべてのマージリクエストについて、1件ずつ詳細情報を取得する。ここに変更ファイル数が含まれている。
- マージリクエストの情報にプロジェクト名、変更ファイル数を加えて、BigQueryに保存する。
Dataflowで実行するパイプラインの定義
今回はApache BeamをPythonで利用します。以下はパイプライン部分の抜粋です。
def execute(self):
project_dict = self.load_projects()
output_table = 'my-project-id:gitlab_mr_changes.merge_requests'
with beam.Pipeline(options=self.batch_options) as pipeline:
lines = (
pipeline
| beam.Create(self.group_mr.load())
| beam.ParDo(ChangesGenerator(self.loader, project_dict))
| beam.Map(self.bq_mr.make_record)
)
# Output the results into BigQuery table.
_ = lines | 'Write to Big Query' >> beam.io.WriteToBigQuery(
output_table,
schema=self.bq_mr.SCHEMA,
custom_gcs_temp_location='gs://gitlab-mr-changes')
前述のアプリケーションの処理の流れとは、以下のような対応になっています。
ソースコード | 処理の流れ |
---|---|
Create | マージリクエストの一覧を取得 |
ParDo(ChangesGenerator) | 取得したすべてのマージリクエストについて、1件ずつ詳細情報を取得 |
Map(make_record) | マージリクエストに対してプロジェクト名、ファイルの変更数を加えて |
Write to Big Query | BigQueryに保存 |
※ ソースコード上でWrite to Big Queryだけパイプラインが分断されているのは、意図があってこうしているのではなく、自分が理解していないだけです。おそらく、この記事 のExample 16をコピー&ペーストして、そのままにしていたものだと思います。覚えていませんが。
Dataflowでの表示
GCPのDataflowの画面でジョブの実行結果(ジョブグラフ)を見ると、以下のように表示されます。
アプリケーションが実行できました。
Dataflow Felxテンプレート
Dataflowでは、「Dataflowテンプレート」と呼ぶ、ジョブの処理内容を定義したものをあらかじめ登録しておき、テンプレートを指定してジョブの実行を行います。テンプレートの作成方法には2種類あります。
テンプレートの種類 | デプロイする内容 |
---|---|
クラシックテンプレート | ソースコード |
Flexテンプレート | Dockerイメージ |
テンプレートの違いは以下のページに説明があります。
Flexテンプレートが推奨ということなので、ここではFlexテンプレートを使用します。以下のページの手順に従って作成していきます。
Dockerfile、cloudbuild.yml、metadata.json は後述します。cloudbuild.ymlを作成して、Dockerイメージをビルドし、Container Registryに登録します。
gcloud builds submit --config cloudbuild.yml
Container Registryにイメージを登録できたら、Dataflowテンプレートを作成します。
gcloud dataflow flex-template build gs://my-bucket-name/gitlab-mr-changes.json \
--image "gcr.io/my-project-id/gitlab-mr-changes/batch-runner:latest" \
--sdk-language "PYTHON" \
--metadata-file "metadata.json"
Secret Manager
GitLab APIを使用して、マージリクエストの情報を取得します。GitLab APIを使用するには、あらかじめGitLabでアクセストークンを作成しておく必要があります。
このアクセストークンをどうやってアプリケーションに与えるか、ということになりますが、ここでは Secret Manager を利用してみます。
Secret Managerに「gitlab_access_token」というキーで「my super secret data」という値を登録する場合は、以下のように実行します。
echo -n "my super secret data" | gcloud secrets create gitlab_access_token \
--replication-policy="automatic" \
--data-file=-
登録した値をアプリケーションから取り出すには、GCPへの認証が必要です。アプリケーションからの認証ができるようにするために、専用のサービスアカウントを作成します。
gcloud iam service-accounts create my-app-account-name
gcloud projects add-iam-policy-binding my-project-id \
--member="serviceAccount:my-app-account-name@my-project-id.iam.gserviceaccount.com" \
--role="roles/owner"
本番の運用で「roles/owner」はやめましょう、と書いてあったので、やめましょう。
サービスアカウントが登録できたら、認証用のサービスアカウントキーを作成します。
gcloud iam service-accounts keys create my-app-credentials.json \
--iam-account=my-app-account-name@my-project-id.iam.gserviceaccount.com
my-app-credentials.json が作成されるので、このファイルのパスを、Dockerfile内で環境変数 GOOGLE_APPLICATION_CREDENTIALS
に指定します。Dockerfileを更新したら、もう一度ビルドします。
アプリケーションからは、以下のようにアクセスします。
from google.cloud import secretmanager
class GcpSecret:
def __init__(self, project_cd):
self.project_cd = project_cd
def fetch(self, secret_name):
client = secretmanager.SecretManagerServiceClient()
response = client.access_secret_version(request={
"name": 'projects/{}/secrets/{}/versions/latest'.format(
self.project_cd,
secret_name
),
})
payload = response.payload.data.decode("UTF-8")
return payload
以下のように呼び出すと、Secret Managerに登録した値を取得することができます。
gcp_secret = GcpSecret(gcp_project)
access_token = gcp_secret.fetch('gitlab_access_token')
Cloud Scheduler
Cloud SchedulerからDataflowを起動します。起動するためのHTTPリクエストを送信する設定を行います。
gcloud scheduler jobs create http mr_changes_batch \
--schedule "0 0 * * *" \
--time-zone "Asia/Tokyo" \
--uri "https://dataflow.googleapis.com/v1b3/projects/my-project-id/locations/my-app-location/flexTemplates:launch" \
--http-method POST \
--message-body "{\"launch_parameter\":{\"jobName\":\"gitlab-mr-changes\",\"parameters\":{\"gcp_project\":\"my-project-id\",\"group_id\":\"my-gitlab-group-id\"},\"containerSpecGcsPath\":\"gs://my-bucket-name/gitlab-mr-changes.json\"}}" \
--oauth-service-account-email "my-app-account-name@my-project-id.iam.gserviceaccount.com" \
--oauth-token-scope "https://www.googleapis.com/auth/cloud-platform"
指定した時刻にDataflowのジョブが実行されれば成功です。
気を付けること
Dataflowのジョブがタイムアウトで失敗する
ローカルPC内で実行すると2分程度で終了する処理が、Cloud Dataflowで実行させるとおよそ15分かかりました。なぜ。
それでも動いていたのでしばらく放置していましたが、ある時ジョブが失敗していることを見つけました。
ジョブが失敗したら通知できないものか、と考えるのは別件として、失敗している原因を調べるためにログを確認したところ、以下のように出力されていました。
2021-03-07 00:02:09.725 JSTINFO:apache_beam.runners.portability.stager:Executing command: ['/usr/local/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', '/dataflow/template/requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']
2021-03-07 00:12:21.760 JSTShutting down the GCE instance, launcher-202103060700012597371614216191431, used for launching.
2021-03-07 00:12:45.961 JSTVM, launcher-202103060700012597371614216191431, stopped.
2021-03-07 00:12:46.064 JSTTimeout in polling result file:
gs://(GCSのパス).
Possible causes are:
1. Your launch takes too long time to finish. Please check the logs on stackdriver.
2. Service account XXXX-compute@developer.gserviceaccount.com may not have enough permissions to pull container image gcr.io/(Container Registryのパス) or create new objects in gs://(GCSのパス)
3. Transient errors occurred, please try again.
00:02:09 に pip download
の実行を開始し、およそ10分後におもむろにGCEインスタンスをシャットダウン、そしてタイムアウトエラーを出力した、というように見えます。むむ。
以下の記事を見つけました。
PythonFlexテンプレートを使用したデータフロー-ランチャータイムアウト
Apache Beamを requirements.txt
でインストールするようにしていたのですが、Apache Beam は requirements.txt
とは別にインストールしておかなければならないようです。
Dockerfileを修正し、再実行したところ、およそ15分かかっていたジョブが、約9分で終了するようになりました。それでも9分かかるのですが、しばらくはこれで様子見ということで。
pythonモジュールが見つからない
例えば main.py からプログラムの実行を開始するものだとして、アプリケーションのソースコードが長くなってくると、ソースコードを複数のファイルに分割したくなります。以下のような構成になったとします。
src
├── general
│ ├── batch.py
│ └── gcp_secret.py
├── gitlab
│ ├── api.py
│ └── models
│ └── merge_request.py
└── main.py
この状態でDataflowにデプロイし、実行させてみると、ジョブが失敗します。赤い。
ログには以下のように出力されています。時間をかけてコードをきれいにしたら動かなくなる、とはどういうことか。
ModuleNotFoundError: No module named 'general'
どうやらこちらの記事が該当するようです。
- Multiple File Dependencies
- Dataflow/apache beam: manage custom module dependencies
- Google Cloud DataFlow: ModuleNotFoundError: No module named 'main'
ソースコードを配置している各ディレクトリに __init__.py
を作成し、かつ、setup.py
を作成しておきます。
src
├── general
│ ├── __init__.py
│ ├── batch.py
│ └── gcp_secret.py
├── gitlab
│ ├── __init__.py
│ ├── api.py
│ └── models
│ ├── __init__.py
│ └── merge_request.py
├── main.py
└── setup.py
import setuptools
setuptools.setup(
name='gitlab_mr_changes',
version='0.0.1',
install_requires=[],
packages=setuptools.find_packages(),
)
再度デプロイして、ジョブを実行し、緑色になれば成功です。
おわりに
サーバレスでバッチを実行するために、Cloud Dataflowを利用しました。ついでに秘密情報を扱うためにSecret Managerを利用しました。
ただただバッチを一つ実行したいだけなのに、いろいろ苦労することになったので、今後このメモが役に立つことを期待します。
設定ファイル
Dockerfile
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base:20210120_RC00
ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}
RUN apt-get update && apt-get install -y libffi-dev git && rm -rf /var/lib/apt/lists/*
COPY ./requirements.txt .
COPY ./src ./src
COPY ./my-app-credentials.json .
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/src/main.py"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/src/setup.py"
ENV GOOGLE_APPLICATION_CREDENTIALS="${WORKDIR}/my-app-credentials.json"
RUN pip install -U apache-beam[gcp]==2.27.0
RUN pip install -U -r ./requirements.txt
cloudbuild.yml
steps:
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', 'gcr.io/my-project-id/gitlab-mr-changes/batch-runner:latest', '.']
images: ['gcr.io/my-project-id/gitlab-mr-changes/batch-runner:latest']
metadata.json
{
"name": "GitLab Merge Request Changes",
"description": "GitLab Merge Request Changes",
"parameters": [
{
"name": "gcp_project",
"label": "gcp_project",
"helpText": "gcp_project",
"regexes": [
".*"
]
},
{
"name": "group_id",
"label": "group_id",
"helpText": "group_id",
"regexes": [
".*"
]
},
{
"name": "created_after",
"label": "created_after",
"helpText": "created_after",
"isOptional": true,
"regexes": [
".*"
]
},
{
"name": "created_before",
"label": "created_before",
"helpText": "created_before",
"isOptional": true,
"regexes": [
".*"
]
}
]
}