15
14

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

単純なバッチだけどDataflowで構築する

Last updated at Posted at 2021-03-13

はじめに

こちらは社内技術勉強会用の資料として作成したものです。
Google Cloud Platform (GCP) で Cloud Dataflow によりバッチ処理を作成してみます。DataflowとSecret Managerを利用します。

やりたいこと

処理内容

ソースコードの管理にGitLabを使用しているものとします。GitLab APIを使用して指定期間のマージリクエストの一覧を取得し、それらの変更ファイル数をBigQueryに記録する、ということをしたいとします。

このアプリケーションを開発したとして、それを1日1回定期的に実行できるようにします。

要件

  1. 1日1回実行します。
  2. 処理の実行中にだけ課金される仕組みにします。

GCPで利用できそうなサービス

Google Compute Engine (GCE) のインスタンスを常時起動しておき、cron で実行を開始する、ということであれば話は難しくないかもしれませんが、コストを抑えるために、サーバレスでの実現を目指します。独自のアプリケーションを実行する、ということでは、以下のようなサービスが利用できそうです。

サービス 用途 課金 タイムアウト
Cloud Functions Web APIの公開 実行時間で課金 最大9分
Cloud Run Web APIの公開 実行時間で課金 最大15分
Cloud Dataflow Apache Beamによる処理パイプラインの構築 主にジョブの実行時間で課金 ???
Cloud Composer Apache Airflowによる処理パイプラインの構築 GCEその他サービスの実行時間で課金 ???
Cloud Data Fusion CDAPによる処理パイプラインの構築 パイプラインの開発と実行で課金 ???

Web API形式にすれば簡単でよいのですが、制限時間があるのが困ります。

それ以外はパイプライン処理を定義するサービスになっていますが、アプリケーションを1つ実行できればいいだけなので、パイプライン制御用のフレームワークなどは特には欲しくありません。

ちょうどいいのがなくて困ります...

嫌なら使わなければいいじゃないか、ということで、世の中の人たちも、いろいろ工夫をされているようです。

世の中での工夫

Cloud FunctionsからGCEインスタンスを起動する

Cloud FunctionsでGCEインスタンスを起動し、処理が完了したら自前でインスタンスを停止する、という作戦です。

GCEだけで実現する

上記1つ目の記事は、低スペックのGCEインスタンスを常時稼働させておき、そこからcronで別のインスタンスを起動する、という作戦です。2つ目は起動方法が書かれておらず不明ですが、挙げておきます。

インスタンスの停止はCloud Functionsを利用する場合と同様、自前で停止処理を行っています。

Cloud FunctionsからCloud Dataflowを起動する

Cloud FunctionsからCloud Dataflowを起動する、という構成にしているところもあるようです。

今回やってみる構成

  • 自前でGCEインスタンスの起動、停止を制御するのが面倒
  • Web APIとしてのタイムアウトは欲しくない
  • 複数のアプリケーションを組み合わせるような必要がない

上記のような点から、Cloud Dataflowを利用してみることにします。

Cloud Dataflow

DataflowとApache Beam

Dataflowは、Apache Beamのバックエンド(ランナー)です。

こちらの説明 によると、「Apache Beam は、バッチとストリーミングの両方のデータの並列処理パイプラインを定義するオープンソースの統合モデルです。」とのことです。

ここではバッチの実行に利用してみます。

実行するアプリケーションの内容

1日ごとに、前日1日間で作成されたGitLab上のマージリクエストの一覧を取得し、それら一つひとつの変更ファイル数を取得します。以下のような流れになります。

  1. GitLab上の対象グループのプロジェクト一覧を取得する(プロジェクトIDからプロジェクト名を取り出せるようにするため)。
  2. 対象グループの前日1日間に作成されたマージリクエストの一覧を取得する。
  3. 取得したすべてのマージリクエストについて、1件ずつ詳細情報を取得する。ここに変更ファイル数が含まれている。
  4. マージリクエストの情報にプロジェクト名、変更ファイル数を加えて、BigQueryに保存する。

Dataflowで実行するパイプラインの定義

今回はApache BeamをPythonで利用します。以下はパイプライン部分の抜粋です。

    def execute(self):
        project_dict = self.load_projects()
        output_table = 'my-project-id:gitlab_mr_changes.merge_requests'

        with beam.Pipeline(options=self.batch_options) as pipeline:
            lines = (
                pipeline
                | beam.Create(self.group_mr.load())
                | beam.ParDo(ChangesGenerator(self.loader, project_dict))
                | beam.Map(self.bq_mr.make_record)
            )

            # Output the results into BigQuery table.
            _ = lines | 'Write to Big Query' >> beam.io.WriteToBigQuery(
                output_table,
                schema=self.bq_mr.SCHEMA,
                custom_gcs_temp_location='gs://gitlab-mr-changes')

前述のアプリケーションの処理の流れとは、以下のような対応になっています。

ソースコード 処理の流れ
Create マージリクエストの一覧を取得
ParDo(ChangesGenerator) 取得したすべてのマージリクエストについて、1件ずつ詳細情報を取得
Map(make_record) マージリクエストに対してプロジェクト名、ファイルの変更数を加えて
Write to Big Query BigQueryに保存

※ ソースコード上でWrite to Big Queryだけパイプラインが分断されているのは、意図があってこうしているのではなく、自分が理解していないだけです。おそらく、この記事 のExample 16をコピー&ペーストして、そのままにしていたものだと思います。覚えていませんが。

Dataflowでの表示

GCPのDataflowの画面でジョブの実行結果(ジョブグラフ)を見ると、以下のように表示されます。

20210307_Dataflow_01.png

アプリケーションが実行できました。

Dataflow Felxテンプレート

Dataflowでは、「Dataflowテンプレート」と呼ぶ、ジョブの処理内容を定義したものをあらかじめ登録しておき、テンプレートを指定してジョブの実行を行います。テンプレートの作成方法には2種類あります。

テンプレートの種類 デプロイする内容
クラシックテンプレート ソースコード
Flexテンプレート Dockerイメージ

テンプレートの違いは以下のページに説明があります。

Flexテンプレートが推奨ということなので、ここではFlexテンプレートを使用します。以下のページの手順に従って作成していきます。

Dockerfile、cloudbuild.yml、metadata.json は後述します。cloudbuild.ymlを作成して、Dockerイメージをビルドし、Container Registryに登録します。

gcloud builds submit --config cloudbuild.yml

Container Registryにイメージを登録できたら、Dataflowテンプレートを作成します。

gcloud dataflow flex-template build gs://my-bucket-name/gitlab-mr-changes.json \
  --image "gcr.io/my-project-id/gitlab-mr-changes/batch-runner:latest" \
  --sdk-language "PYTHON" \
  --metadata-file "metadata.json"

Secret Manager

GitLab APIを使用して、マージリクエストの情報を取得します。GitLab APIを使用するには、あらかじめGitLabでアクセストークンを作成しておく必要があります。

このアクセストークンをどうやってアプリケーションに与えるか、ということになりますが、ここでは Secret Manager を利用してみます。

Secret Managerに「gitlab_access_token」というキーで「my super secret data」という値を登録する場合は、以下のように実行します。

echo -n "my super secret data" | gcloud secrets create gitlab_access_token \
  --replication-policy="automatic" \
  --data-file=-

登録した値をアプリケーションから取り出すには、GCPへの認証が必要です。アプリケーションからの認証ができるようにするために、専用のサービスアカウントを作成します。

gcloud iam service-accounts create my-app-account-name
gcloud projects add-iam-policy-binding my-project-id \
  --member="serviceAccount:my-app-account-name@my-project-id.iam.gserviceaccount.com" \
  --role="roles/owner"

本番の運用で「roles/owner」はやめましょう、と書いてあったので、やめましょう。

サービスアカウントが登録できたら、認証用のサービスアカウントキーを作成します。

gcloud iam service-accounts keys create my-app-credentials.json \
  --iam-account=my-app-account-name@my-project-id.iam.gserviceaccount.com

my-app-credentials.json が作成されるので、このファイルのパスを、Dockerfile内で環境変数 GOOGLE_APPLICATION_CREDENTIALS に指定します。Dockerfileを更新したら、もう一度ビルドします。

アプリケーションからは、以下のようにアクセスします。

from google.cloud import secretmanager

class GcpSecret:
    def __init__(self, project_cd):
        self.project_cd = project_cd

    def fetch(self, secret_name):
        client = secretmanager.SecretManagerServiceClient()
        response = client.access_secret_version(request={
            "name": 'projects/{}/secrets/{}/versions/latest'.format(
                self.project_cd,
                secret_name
            ),
        })
        payload = response.payload.data.decode("UTF-8")
        return payload

以下のように呼び出すと、Secret Managerに登録した値を取得することができます。

    gcp_secret = GcpSecret(gcp_project)
    access_token = gcp_secret.fetch('gitlab_access_token')

Cloud Scheduler

Cloud SchedulerからDataflowを起動します。起動するためのHTTPリクエストを送信する設定を行います。

gcloud scheduler jobs create http mr_changes_batch \
--schedule "0 0 * * *" \
--time-zone "Asia/Tokyo" \
--uri "https://dataflow.googleapis.com/v1b3/projects/my-project-id/locations/my-app-location/flexTemplates:launch" \
--http-method POST \
--message-body "{\"launch_parameter\":{\"jobName\":\"gitlab-mr-changes\",\"parameters\":{\"gcp_project\":\"my-project-id\",\"group_id\":\"my-gitlab-group-id\"},\"containerSpecGcsPath\":\"gs://my-bucket-name/gitlab-mr-changes.json\"}}" \
--oauth-service-account-email "my-app-account-name@my-project-id.iam.gserviceaccount.com" \
--oauth-token-scope "https://www.googleapis.com/auth/cloud-platform"

指定した時刻にDataflowのジョブが実行されれば成功です。

気を付けること

Dataflowのジョブがタイムアウトで失敗する

ローカルPC内で実行すると2分程度で終了する処理が、Cloud Dataflowで実行させるとおよそ15分かかりました。なぜ。

それでも動いていたのでしばらく放置していましたが、ある時ジョブが失敗していることを見つけました。

ジョブが失敗したら通知できないものか、と考えるのは別件として、失敗している原因を調べるためにログを確認したところ、以下のように出力されていました。

2021-03-07 00:02:09.725 JSTINFO:apache_beam.runners.portability.stager:Executing command: ['/usr/local/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', '/dataflow/template/requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']
2021-03-07 00:12:21.760 JSTShutting down the GCE instance, launcher-202103060700012597371614216191431, used for launching.
2021-03-07 00:12:45.961 JSTVM, launcher-202103060700012597371614216191431, stopped.
2021-03-07 00:12:46.064 JSTTimeout in polling result file: 
gs://(GCSのパス). 
Possible causes are: 
1. Your launch takes too long time to finish. Please check the logs on stackdriver. 
2. Service account XXXX-compute@developer.gserviceaccount.com may not have enough permissions to pull container image gcr.io/(Container Registryのパス) or create new objects in gs://(GCSのパス) 
3. Transient errors occurred, please try again.

00:02:09 に pip download の実行を開始し、およそ10分後におもむろにGCEインスタンスをシャットダウン、そしてタイムアウトエラーを出力した、というように見えます。むむ。

以下の記事を見つけました。

PythonFlexテンプレートを使用したデータフロー-ランチャータイムアウト

Apache Beamを requirements.txt でインストールするようにしていたのですが、Apache Beam は requirements.txt とは別にインストールしておかなければならないようです。

Dockerfileを修正し、再実行したところ、およそ15分かかっていたジョブが、約9分で終了するようになりました。それでも9分かかるのですが、しばらくはこれで様子見ということで。

pythonモジュールが見つからない

例えば main.py からプログラムの実行を開始するものだとして、アプリケーションのソースコードが長くなってくると、ソースコードを複数のファイルに分割したくなります。以下のような構成になったとします。

src
├── general
│   ├── batch.py
│   └── gcp_secret.py
├── gitlab
│   ├── api.py
│   └── models
│       └── merge_request.py
└── main.py

この状態でDataflowにデプロイし、実行させてみると、ジョブが失敗します。赤い。

20210307_Dataflow_02.png

ログには以下のように出力されています。時間をかけてコードをきれいにしたら動かなくなる、とはどういうことか。

ModuleNotFoundError: No module named 'general'

どうやらこちらの記事が該当するようです。

ソースコードを配置している各ディレクトリに __init__.py を作成し、かつ、setup.py を作成しておきます。

src
├── general
│   ├── __init__.py
│   ├── batch.py
│   └── gcp_secret.py
├── gitlab
│   ├── __init__.py
│   ├── api.py
│   └── models
│       ├── __init__.py
│       └── merge_request.py
├── main.py
└── setup.py
setup.py
import setuptools

setuptools.setup(
    name='gitlab_mr_changes',
    version='0.0.1',
    install_requires=[],
    packages=setuptools.find_packages(),
)

再度デプロイして、ジョブを実行し、緑色になれば成功です。

おわりに

サーバレスでバッチを実行するために、Cloud Dataflowを利用しました。ついでに秘密情報を扱うためにSecret Managerを利用しました。

ただただバッチを一つ実行したいだけなのに、いろいろ苦労することになったので、今後このメモが役に立つことを期待します。

設定ファイル

Dockerfile

Dockerfile
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base:20210120_RC00

ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}

RUN apt-get update && apt-get install -y libffi-dev git && rm -rf /var/lib/apt/lists/*

COPY ./requirements.txt .
COPY ./src ./src
COPY ./my-app-credentials.json .

ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/src/main.py"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/src/setup.py"
ENV GOOGLE_APPLICATION_CREDENTIALS="${WORKDIR}/my-app-credentials.json"

RUN pip install -U apache-beam[gcp]==2.27.0
RUN pip install -U -r ./requirements.txt

cloudbuild.yml

cloudbuild.yml
steps:
- name: 'gcr.io/cloud-builders/docker'
  args: ['build', '-t', 'gcr.io/my-project-id/gitlab-mr-changes/batch-runner:latest', '.']
images: ['gcr.io/my-project-id/gitlab-mr-changes/batch-runner:latest']

metadata.json

metadata.json
{
  "name": "GitLab Merge Request Changes",
  "description": "GitLab Merge Request Changes",
  "parameters": [
    {
      "name": "gcp_project",
      "label": "gcp_project",
      "helpText": "gcp_project",
      "regexes": [
          ".*"
      ]
    },
    {
      "name": "group_id",
      "label": "group_id",
      "helpText": "group_id",
      "regexes": [
          ".*"
      ]
    },
    {
      "name": "created_after",
      "label": "created_after",
      "helpText": "created_after",
      "isOptional": true,
      "regexes": [
          ".*"
      ]
    },
    {
      "name": "created_before",
      "label": "created_before",
      "helpText": "created_before",
      "isOptional": true,
      "regexes": [
          ".*"
      ]
    }
  ]
}

参考

15
14
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
14

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?