はじめに
この記事では Cloud Run でいくつかのジョブを作成して Cloud Run Jobs の実践的な使い方を紹介します。
なお、Cloud Runの利用料に関してはについては公式の案内を確認してください。
また、「そもそもCloud Run Jobsってなに?」という方は前に書いた記事(「怖くないCloud Run」)も参考にしていただけると幸いです。
演習の準備
プロジェクトの作成
-
Google Cloudダッシュボードを開き「プロジェクトを作成」を選択してください。
-
プロジェクトの作成画面が開いたら cloud-run-drill という名称の新規プロジェクトを作成してください。
※ プロジェクトIDは重複が許されません。
※ 以後"cloud-run-drill"というIDは各自のプロジェクトIDに読み替えてください
Google Cloud APIの有効化
-
Artifact Registry APIの有効化
Artifact Registry APIサービスの詳細ページにアクセスして「有効にする」を選択してください。
このAPIはGoogle Cloud CLIでコンテナイメージをプッシュする際などに使用します。
-
Cloud Run Admin APIの有効化
Cloud Run Admin APIサービスの詳細ページにアクセスして「有効にする」を選択してください。
このAPIはCloud Runにアプリをデプロイしたりやデプロイを更新するのに使用します。
ロールの割り当て
-
IAMロールの設定画面で自分のアカウントに次のロールを付与してください。
- Artifact Registry 管理者: リポジトリを作成、管理するための管理者アクセス権
- Cloud Run デベロッパー: すべてのCloud Runリソースに対する読み取りと書き込みアクセス権
- Cloud Run 起動元: Cloud Runサービスを起動する権限
- BigQueryデータ編集者: データセットのすべてのコンテンツを編集するためのアクセス権限
- BigQueryジョブユーザ: BigQueryでクエリを実行する権限
- ストレージフォルダ管理者: フォルダとオブジェクトを作成/参照/更新/削除する権限
- サービスアカウントユーザ: オペレーションをサービスアカウントとして実行する権限
- ログ閲覧者: Cloud Runなどのログを確認する権限
-
サービスアカウントの管理ページを開き、デフォルトのサービスアカウントのメールアドレスをメモします。
📝サービスアカウントとは
サービスアカウントとは私たちユーザが利用する "ユーザアカウント" とは違い、Cloud Runなどのサービスが使うためのアカウントです。
サービスアカウントはユーザアカウントのようにGmailを開いたり他のサイトにログインしたりはできません。しかし、Google Cloudではサービスの一部の処理をマシンとして実行する必要があるので、マシンの権限を管理するためにマシン用のアカウントすなわちサービスアカウントが必要になります。
-
IAMロールの設定画面に戻り、「アクセス権を付与」からサービスアカウントには次のロールを付与してください。
- BigQueryデータ編集者: データセットのすべてのコンテンツを編集するためのアクセス権限
- BigQueryジョブユーザ: BigQueryでクエリを実行する権限
- ストレージフォルダ管理者: フォルダとオブジェクトを作成/参照/更新/削除する権限
- Cloud Run 起動元: Cloud Runサービスを起動する権限
アプリのインストール
Cloud Shell Editorを利用する場合は不要ですが、ローカル環境で演習を進める場合は次のアプリをインストールしておいてください。
- Google Cloud CLI: Google Cloudのリソースを操作するのに利用
- Docker Desktop: コンテナイメージのビルドやプッシュに利用
Artifact Registryの作成
-
次のコマンドを実行してcloud-run-drillプロジェクトへの認証を対話的に進めてください。
gcloud init
-
次のコマンドでリポジトリを作成します。
gcloud artifacts repositories create sample-repo \ --repository-format=docker \ --location=asia-northeast1 \ --description="Cloud Run Sample Repository" \ --project="cloud-run-drill"
- create sample-repo: "sample-repo"という名前のリポジトリを作成
- --location: コンテナイメージのアップロード先リージョン
- --description: リポジトリの説明文(任意)
- --project: コンテナイメージのアップロード先プロジェクト
-
コンテナイメージのアップロード先リージョンへの接続を認証します。
gcloud auth configure-docker asia-northeast1-docker.pkg.dev
データセット作成
-
BigQeuryのデータセットを作成します。
データセットの作成はBigQueryStudioのGUIでも実行できますが、簡単のためコマンドを紹介します。
bq --location=asia-northeast1 mk --dataset crd
-
演習で使用するテーブルも作成しておきましょう。
BigQueryStudioを開いて次のSQLを実行してください。
-- 商品マスタの作成 CREATE TABLE crd.m_product ( id STRING NOT NULL, name STRING NOT NULL, price FLOAT64 NOT NULL ); INSERT INTO crd.m_product (id, name, price) VALUES ('P001', 'apple', 120), ('P002', 'orange', 80), ('P003', 'banana', 90); -- 販売マスタの作成 CREATE TABLE crd.m_sale ( id STRING NOT NULL, buyer STRING, sale_date DATE DEFAULT (CURRENT_DATE()), product_id STRING NOT NULL, sale_quantity INT64 NOT NULL ); INSERT INTO crd.m_sale (id, buyer, sale_date, product_id, sale_quantity) VALUES ('S001', 'Newton', DATE '2024-07-01', 'P001', 8), ('S002', 'Gorilla', DATE '2024-07-01', 'P003', 5), ('S003', NULL, DATE '2024-07-01', 'P002', 10), ('S004', 'Newton', DATE '2024-07-02', 'P001', 6), ('S005', 'Newton', DATE '2024-07-02', 'P002', 2), ('S006', 'Gorilla', DATE '2024-07-02', 'P003', 5);
バケットを作成する
-
次のコマンドでバケットを作成しておきます。
gsutil mb -l asia-northeast1 gs://crd-temporary
※ バケット名はユニークである必要があります
※ 以後"crd-temporary"というバケット名は各自のバケット名に読み替えてください
シンプルなジョブを作る
まずは一番シンプルなジョブを作ってみましょう。
プロジェクトのディレクトリレイアウトは次のような感じです。
run-python/
├─ Dockerfile
└─ main.py
-
任意のディレクトリに次の作業ディレクトリを作成して移動します。
mkdir run-python cd run-python
-
プロジェクトルートにDockerfileを作り次のように編集してください。
FROM python:3.11-slim COPY . /job WORKDIR /job RUN pip install --upgrade pip ENV TZ=Asia/Tokyo CMD ["python", "main.py"]
-
プロジェクトルートにmain.pyを作り次のように編集してください。
import sys def main(): for arg in sys.argv: print(arg) if __name__ == "__main__": main()
-
コンテナイメージをビルドしてプッシュします。
# イメージをビルド docker build -t run-python . # イメージをタグ付け docker tag run-python:latest \ asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/run-python:latest # イメージをプッシュ docker push asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/run-python:latest
-
ジョブを作成します。
gcloud run jobs create run-python \ --image asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/run-python:latest \ --region=asia-northeast1 \ --tasks=1 \ --cpu=1 \ --max-retries=0 \ --memory=512Mi \ --parallelism=1 \ --task-timeout=100
-
ジョブを実行します。
gcloud run jobs execute run-python --region=asia-northeast1
-
ログを確認します。
ジョブの呼出しでターミナルに出力された「Or visit」以降に表示されるURLをブラウザで開いて実行ログを確認してください。
main.pyはコマンド引数を順次表示するアプリなので、少なくともファイル名の"main.py"が出力されているはずです。
-
コマンド引数を与えてみます。
コンテナイメージの記述でCMDを使った場合、コマンド引数はジョブの作成時に指定することになります。
run-pythonはすでに作成済みなので以下のコマンドでジョブを更新してみてください。
gcloud run jobs update run-python \ --image asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/run-python:latest \ --region=asia-northeast1 \ --command="python,main.py" \ --args="hoge,huga"
-
先ほどと同じようにジョブを実行します。
gcloud run jobs execute run-python --region=asia-northeast1
-
改めてログを確認します。
ターミナルに出力された「Or visit」以降に表示されるURLから実行ログを確認すると今度は"hoge"と"huga"の出力が増えているはずです。
コマンド引数を扱う
コンテナイメージの記述にCMDを使うとコマンド引数を変えるにはジョブを更新する必要がありました。
しかしこれはちょっと面倒くさいのですね💦
なので、次は ENTRYPOINT を利用してジョブの実行時にコマンド引数を指定する方法を紹介します。
プロジェクトのディレクトリレイアウトは先ほどと同じです。
run-python2/
├─ Dockerfile
└─ main.py
-
任意のディレクトリに次の作業ディレクトリを作成して移動します。
mkdir run-python2 cd run-python2
-
プロジェクトルートにDockerfileを作り次のように編集してください。
FROM python:3.11-slim COPY . /job WORKDIR /job RUN pip install --upgrade pip ENV TZ=Asia/Tokyo ENTRYPOINT ["python", "main.py"] # ここを変更
-
プロジェクトルートにmain.pyを作り次のように編集してください。
import sys def main(): for arg in sys.argv: print(arg) if __name__ == "__main__": main()
-
コンテナイメージをビルドしてプッシュします。
# イメージをビルド docker build -t run-python2 . # イメージをタグ付け docker tag run-python2:latest asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/run-python2:latest # イメージをプッシュ docker push asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/run-python2:latest
-
ジョブを作成します。
gcloud run jobs create run-python2 \ --image asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/run-python2:latest \ --region=asia-northeast1 \ --tasks=1 \ --cpu=1 \ --max-retries=0 \ --memory=512Mi \ --parallelism=1 \ --task-timeout=100
-
ジョブを実行します。
gcloud run jobs execute run-python2 --region=asia-northeast1
-
ログを確認します。
ジョブの呼出しでターミナルに出力された「Or visit」以降に表示されるURLをブラウザで開いて実行ログを確認してください。
まずはファイル名の"main.py"が出力されていることを確認してください。
-
コマンド引数を与えて実行します。
gcloud run jobs execute run-python2 --region=asia-northeast1 --args="hoge,huga"
-
新しいログを確認します。
ターミナルに出力された「Or visit」以降に表示されるURLから実行ログを確認して"hoge"と"huga"が出力されていることを確認してください。
BigQueryを動かす
Cloud RunからBig Queryを動かしてみましょう。
run-bigquery/
├─ Dockerfile
├─ main.py
└─ requirements.txt
-
任意のディレクトリに次の作業ディレクトリを作成して移動します。
mkdir run-bigquery cd run-bigquery
-
プロジェクトルートにDockerfileを作り次のように編集してください。
FROM python:3.11-slim COPY . /job WORKDIR /job RUN pip install --upgrade pip RUN pip install --no-cache-dir -r requirements.txt ENV TZ=Asia/Tokyo ENTRYPOINT ["python", "main.py"]
-
プロジェクトルートにmain.pyを作り次のように編集してください。
from google.cloud import bigquery PROJECT_NAME = "cloud-run-drill" SQL = "SELECT * FROM crd.m_product;" def main(): # BigQueryに接続してレコードを表示 bq_client = bigquery.Client(project=PROJECT_NAME) result = bq_client.query(SQL).result() for row in result: col_list = [str(col) for col in row.values()] print('\t'.join(col_list)) if __name__ == "__main__": main()
-
プロジェクトルートにrequirements.txtを作り次のように編集してください。
google-cloud-bigquery==3.25.0
-
コンテナイメージをビルドしてプッシュします。
# イメージをビルド docker build -t run-bigquery . # イメージをタグ付け docker tag run-bigquery:latest asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/run-bigquery:latest # イメージをプッシュ docker push asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/run-bigquery:latest
-
ジョブを作成します。
gcloud run jobs create run-bigquery \ --image asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/run-bigquery:latest \ --region=asia-northeast1 \ --tasks=1 \ --cpu=1 \ --max-retries=0 \ --memory=512Mi \ --parallelism=1 \ --task-timeout=100
-
ジョブを実行します。
gcloud run jobs execute run-bigquery --region=asia-northeast1
-
ログを確認します。
ジョブの呼出しでターミナルに出力された「Or visit」以降に表示されるURLをブラウザで開いて実行ログを確認してください。
ログを確認すると商品マスタ(m_product)の一覧が出力されているはずです。
GCSにファイルを保存する
さらに応用としてBig Queryから取得したデータをGCSに保存する手順も見て行きましょう。
save-to-gcs/
├─ Dockerfile
├─ main.py
└─ requirements.txt
-
任意のディレクトリに次の作業ディレクトリを作成して移動します。
mkdir save-to-gcs cd save-to-gcs
-
プロジェクトルートにDockerfileを作り次のように編集してください。
FROM python:3.11-slim COPY . /job WORKDIR /job RUN pip install --upgrade pip RUN pip install --no-cache-dir -r requirements.txt ENV TZ=Asia/Tokyo ENTRYPOINT ["python", "main.py"]
-
プロジェクトルートにmain.pyを作り次のように編集してください。
from google.cloud import bigquery, storage PROJECT_NAME = "cloud-run-drill" SQL = "SELECT * FROM crd.m_product;" BUCKET_NAME = "crd-temporary" def main(): # BigQueryに接続してプレーンテキストとしてCSVを作成 bq_client = bigquery.Client(project=PROJECT_NAME) result = bq_client.query(SQL).result() header_row = [field.name for field in result.schema] row_list = [','.join(header_row)] for row in result: col_list = [str(col) for col in row] row_list.append(','.join(col_list)) csv_content = '\n'.join(row_list) # GCSに接続してCSVをアップロード gcs_client = storage.Client() bucket = gcs_client.bucket(BUCKET_NAME) blob = bucket.blob("save-to-gcs/m_product.csv") blob.upload_from_string(csv_content, content_type="text/csv") if __name__ == "__main__": main()
-
プロジェクトルートにrequirements.txtを作り次のように編集してください。
google-cloud-bigquery==3.25.0 google-cloud-storage==2.18.0
-
コンテナイメージをビルドしてプッシュします。
# イメージをビルド docker build -t save-to-gcs . # イメージをタグ付け docker tag save-to-gcs:latest asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/save-to-gcs:latest # イメージをプッシュ docker push asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/save-to-gcs:latest
-
ジョブを作成します。
gcloud run jobs create save-to-gcs \ --image asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/save-to-gcs:latest \ --region=asia-northeast1 \ --tasks=1 \ --cpu=1 \ --max-retries=0 \ --memory=512Mi \ --parallelism=1 \ --task-timeout=100
-
ジョブを実行します。
gcloud run jobs execute save-to-gcs --region=asia-northeast1
-
次のコマンドでGCSにm_product.csvが保存されていることを確認します。
gsutil cat gs://crd-temporary/save-to-gcs/m_product.csv
ジョブをプロジェクト化する
実際のアプリは多階層にファイルを分けて開発することが多いと思います。
より実践的なジョブの開発を想定して次のようなレイアウトのプロジェクトを作成してみましょう。
make-project/
├─ Dockerfile
├─ src/
│ ├─ python/
│ │ └─ main.py
│ └─ sql/
│ └─ template.sql
└─ requirements.txt
-
任意のディレクトリに次の作業ディレクトリを作成して移動します。
mkdir make-project cd make-project
-
プロジェクトルートにDockerfileを作り次のように編集してください。
FROM python:3.11-slim COPY . /job WORKDIR /job RUN pip install --upgrade pip RUN pip install --no-cache-dir -r requirements.txt WORKDIR /job/src/python ENV TZ=Asia/Tokyo ENTRYPOINT ["python", "main.py"]
-
src/python/ディレクトリにmain.pyを作り次のように編集してください。
from google.cloud import bigquery, storage from datetime import datetime import sys PROJECT_NAME = "cloud-run-drill" TEMPLATE_FILEPATH = "/job/src/sql/template.sql" BUCKET_NAME = "crd-temporary" def main(): # テンプレートSQLにコマンド引数から受け取った集計対象日を埋め込む with open(TEMPLATE_FILEPATH, "r", encoding='utf-8') as f: template = f.read() agg_date_str = sys.argv[1] agg_date = datetime.strptime(agg_date_str, "%Y%m%d") agg_date_str = agg_date.strftime("%Y-%m-%d") template = template.replace("${SALE_DATE}", agg_date_str) # BigQueryに接続してプレーンテキストとしてCSVを作成 with bigquery.Client(project=PROJECT_NAME) as bq_client: result = bq_client.query(template).result() header_row = [field.name for field in result.schema] row_list = [','.join(header_row)] for row in result: col_list = [str(col) for col in row] row_list.append(','.join(col_list)) csv_content = '\n'.join(row_list) # GCSに接続してCSVを保存 gcs_client = storage.Client() bucket = gcs_client.bucket(BUCKET_NAME) blob = bucket.blob(f"make-project/total_sale_{agg_date_str}.csv") blob.upload_from_string(csv_content, content_type="text/csv") if __name__ == "__main__": main()
-
プロジェクトルートにrequirements.txtを作り次のように編集してください。
google-cloud-bigquery==3.25.0 google-cloud-storage==2.18.0
-
src/sql/ディレクトリにtemplate.sqlを作り次のように編集してください。
SELECT sale.sale_date, product.name, SUM(sale.sale_quantity) AS sale_quantity, SUM(product.price * sale.sale_quantity) AS total_sales FROM crd.m_sale AS sale INNER JOIN crd.m_product AS product ON sale.product_id = product.id WHERE sale_date = DATE '${SALE_DATE}' GROUP BY 1,2 ;
-
コンテナイメージをビルドしてプッシュします。
# イメージをビルド docker build -t make-project . # イメージをタグ付け docker tag make-project:latest asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/make-project:latest # イメージをプッシュ docker push asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/make-project:latest
-
ジョブを作成します。
gcloud run jobs create make-project \ --image asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/make-project:latest \ --region=asia-northeast1 \ --tasks=1 \ --cpu=1 \ --max-retries=0 \ --memory=512Mi \ --parallelism=1 \ --task-timeout=100
-
ジョブを実行します。
gcloud run jobs execute make-project --region=asia-northeast1 --args="20240701"
-
次のコマンドでGCSにtotal_sale_2024-07-01.csvが保存されていることを確認します。
gsutil cat gs://crd-temporary/make-project/total_sale_2024-07-01.csv
GCSをマウントする
ここからはちょっと特殊なユースケースも見て行きましょう。
Cloud Runではジョブ内でGCSのクライアントライブラリを使ってGCSへの操作を行うことができますが、GCSの特定の領域をマウントしてファイルシステムの一部のように扱うこともできます。
-
任意のディレクトリに次の作業ディレクトリを作成して移動します。
mkdir mount-gcs cd mount-gcs
-
プロジェクトルートにDockerfileを作り次のように編集してください。
FROM python:3.11-slim COPY . /job WORKDIR /job RUN pip install --upgrade pip RUN pip install --no-cache-dir -r requirements.txt ENV TZ=Asia/Tokyo ENTRYPOINT ["python", "main.py"]
-
src/python/ディレクトリにmain.pyを作り次のように編集してください。
from google.cloud import bigquery from datetime import datetime import sys, os PROJECT_NAME = "cloud-run-drill" TEMPLATE_FILEPATH = "/job/src/sql/template.sql" BUCKET_NAME = "crd-temporary" OUTPUT_DIRECTORY = "/mnt/gcs/mount-gcs/" def main(): # テンプレートSQLにコマンド引数から受け取った集計対象日を埋め込む with open(TEMPLATE_FILEPATH, "r", encoding='utf-8') as f: template = f.read() agg_date_str = sys.argv[1] agg_date = datetime.strptime(agg_date_str, "%Y%m%d") agg_date_str = agg_date.strftime("%Y-%m-%d") sql = template.replace("${SALE_DATE}", agg_date_str) # BigQueryに接続してプレーンテキストとしてCSVを作成 with bigquery.Client(project=PROJECT_NAME) as bq_client: result = bq_client.query(sql).result() header_row = [field.name for field in result.schema] row_list = [','.join(header_row)] for row in result: col_list = [str(col) for col in row] row_list.append(','.join(col_list)) csv_content = '\n'.join(row_list) # マウントしているGCSにCSVを保存 output_filepath = f"{OUTPUT_DIRECTORY}total_sale_{agg_date_str}.csv" if not os.path.exists(OUTPUT_DIRECTORY): os.makedirs(OUTPUT_DIRECTORY) with open(output_filepath, "w", encoding="utf-8") as f: f.write(csv_content) if __name__ == "__main__": main()
-
プロジェクトルートにrequirements.txtを作り次のように編集してください。
google-cloud-bigquery==3.25.0
-
src/sql/ディレクトリにtemplate.sqlを作り次のように編集してください。
SELECT sale.sale_date, product.name, SUM(sale.sale_quantity) AS sale_quantity, SUM(product.price * sale.sale_quantity) AS total_sales FROM crd.m_sale AS sale INNER JOIN crd.m_product AS product ON sale.product_id = product.id WHERE sale_date = DATE '${SALE_DATE}' GROUP BY 1,2 ;
-
コンテナイメージをビルドしてプッシュします。
# イメージをビルド docker build -t mount-gcs . # イメージをタグ付け docker tag mount-gcs:latest asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/mount-gcs:latest # イメージをプッシュ docker push asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/mount-gcs:latest
-
ジョブを作成します。
gcloud beta run jobs create mount-gcs \ --image asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/mount-gcs:latest \ --region=asia-northeast1 \ --tasks=1 \ --cpu=1 \ --memory=512Mi \ --max-retries=0 \ --parallelism=1 \ --task-timeout=100 \ --add-volume="name=gcs,type=cloud-storage,bucket=crd-temporary" \ --add-volume-mount="volume=gcs,mount-path=/mnt/gcs"
-
run beta: GCSをマウントするジョブを作成するにはbeta版のコマンドを使う必要があります
もしローカル環境でbeta版が使えない場合は次のコマンドでインストールしてください
gcloud components update gcloud components install beta
-
--add-volume: マウントするGCSバケットを指定します
-
--add-volume-mount: マウントしたGCSバケットのパスを指定します
-
-
ジョブを実行します。
gcloud run jobs execute mount-gcs --region=asia-northeast1 --args="20240702"
-
次のコマンドでGCSにtotal_sale_2024-07-02.csvが保存されていることを確認しましょう。
gsutil cat gs://crd-temporary/mount-gcs/total_sale_2024-07-02.csv
並列分散処理
Cloud Run Jobsではコンテナに次のような環境変数が自動的に付与されます。
- CLOUD_RUN_JOB: ジョブ名
- CLOUD_RUN_EXECUTION: ジョブID
- CLOUD_RUN_TASK_COUNT: 並列稼働させるコンテナの数
- CLOUD_RUN_TASK_INDEX: 並列稼働で起動したコンテナの番号 (0から起算)
- CLOUD_RUN_TASK_ATTEMPT: 現在のタスクの再試行回数 (0から起算)
このような環境変数を利用すればタスクを並列分散処理させることもできます。
試しにやってみましょう!
-
任意のディレクトリに次の作業ディレクトリを作成して移動します。
mkdir parallelism cd parallelism
-
プロジェクトルートにDockerfileを作り次のように編集してください。
FROM python:3.11-slim COPY . /job WORKDIR /job RUN pip install --upgrade pip RUN pip install --no-cache-dir -r requirements.txt ENV TZ=Asia/Tokyo ENTRYPOINT ["python", "main.py"]
-
src/python/ディレクトリにmain.pyを作り次のように編集してください。
from google.cloud import bigquery, storage import os PROJECT_NAME = "cloud-run-drill" TEMPLATE_FILEPATH = "/job/src/sql/template.sql" BUCKET_NAME = "crd-temporary" product_list = ['apple', 'orange', 'banana'] def main(): # テンプレートSQLにコンテナ番号に応じて集計対象商品を埋め込む with open(TEMPLATE_FILEPATH, "r", encoding='utf-8') as f: template = f.read() task_index = os.getenv("CLOUD_RUN_TASK_INDEX", 0) product_name = product_list[task_index] sql = template.replace("${PRUDUCT_NAME}", product_name) # BigQueryに接続してプレーンテキストとしてCSVを作成 with bigquery.Client(project=PROJECT_NAME) as bq_client: result = bq_client.query(sql).result() header_row = [field.name for field in result.schema] row_list = [','.join(header_row)] for row in result: col_list = [str(col) for col in row] row_list.append(','.join(col_list)) csv_content = '\n'.join(row_list) # GCSに接続してCSVを保存 gcs_client = storage.Client() bucket = gcs_client.bucket(BUCKET_NAME) blob = bucket.blob(f"parallelism/total_sale_of_{product_name}.csv") blob.upload_from_string(csv_content, content_type="text/csv") if __name__ == "__main__": main()
-
プロジェクトルートにrequirements.txtを作り次のように編集してください。
google-cloud-bigquery==3.25.0
-
src/sql/ディレクトリにtemplate.sqlを作り次のように編集してください。
SELECT product.name, SUM(sale.sale_quantity) AS sale_quantity, SUM(product.price * sale.sale_quantity) AS total_sales FROM crd.m_sale AS sale INNER JOIN crd.m_product AS product ON sale.product_id = product.id WHERE product.name = '${PRUDUCT_NAME}' GROUP BY 1 ;
-
コンテナイメージをビルドしてプッシュします。
# イメージをビルド docker build -t parallelism . # イメージをタグ付け docker tag parallelism:latest asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/parallelism:latest # イメージをプッシュ docker push asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/parallelism:latest
-
ジョブを作成します。
gcloud run jobs create parallelism \ --image asia-northeast1-docker.pkg.dev/cloud-run-drill/sample-repo/parallelism:latest \ --region=asia-northeast1 \ --tasks=3 \ --cpu=1 \ --max-retries=0 \ --memory=512Mi \ --parallelism=1 \ --task-timeout=100
※ 商品が3つあるので1発で集計できるようにtasks=3に設定します
-
ジョブを実行します。
gcloud run jobs execute parallelism --region=asia-northeast1
-
GCSを確認すると1回の呼出しで3ファイルが出力されているはずです。
gsutil ls -r gs://crd-temporary/parallelism/