動機
GCP
のサービス関連系といえばCloud Function
がありますがプロジェクト間のCloud Storage
オブジェクトのコピーができなかったため、今回は別の方法で実装しようと思いCloud Run
を使ってみることにしました。
サンプルプログラムの概要
プロジェクトAのCloud Storageへファイルがアップロードされると、プロジェクトBのCloud Storageへファイルがコピーされるというサンプルプログラムです。
サンプルプログラム:Github - run-cloud-storage-migration-service
1. 設計
アーキテクチャ
- ユーザーがCloud StorageへファイルをアップロードしたらCloud StorageがCloud Pub/Subへアップロード通知を行う。
- サブスクライバはアップロード通知を受け取ると、プロジェクトAのCloud StorageからプロジェクトBのCloud Storageへアップロードファイルをコピーする
2. 実装
2-1. Cloud Pub/Subの設定
2-1-1. Cloud Storage の Pub/Sub トピック作成
gsuil コマンド
export TOPIC_NAME=my_gcs_notification
export BUCKET_NAME=minarai-sample-backet
# 作成
% gsutil notification create -t ${TOPIC_NAME} -f json gs://${BUCKET_NAME}
Created Cloud Pub/Sub topic projects/[YOUR_PROJECT_ID]/topics/my_gcs_notification
Created notification config projects/_/buckets/minarai-sample-backet/notificationConfigs/1
# 確認
% gsutil notification list gs://${BUCKET_NAME}
projects/_/buckets/minarai-sample-backet/notificationConfigs/1
Cloud Pub/Sub topic: projects/[YOUR_PROJECT_ID]/topics/my_gcs_notification
2-1-2. サブスクリプションの作成
gcloudコマンド
export TOPIC_NAME=my_gcs_notification
export SUBSCRIPTION_ID=gcs_notification_sub
gcloud beta pubsub subscriptions create ${SUBSCRIPTION_ID} --topic=${TOPIC_NAME}
2-2. pullメッセージの受信スクリプト実装
下記サンプルプログラムをGithubに上げておきます。
from google.cloud import pubsub_v1
import json
import os
import subprocess
import time
import traceback
DESTINATION_BUCKET_NAME = os.getenv('DESTINATION_BUCKET_NAME')
PROJECT_ID = os.getenv('PROJECT_ID')
SUBSCRIPTION_ID = os.getenv('SUBSCRIPTION_ID')
OBJECT_FINALIZE = os.getenv('OBJECT_FINALIZE', 'OBJECT_FINALIZE')
def copyObjects(bucket_name: str, file_name: str, destination_bucket_name: str) -> dict:
copy = "gsutil -m cp gs://{bucket}/{file} gs://{d_bucket}/".format(
bucket=bucket_name,
file=file_name,
d_bucket=destination_bucket_name)
try:
check_res = subprocess.getoutput(copy)
if ("completed" in check_res) == True:
return {"is_success": True, "description": check_res}
return {"is_success": False, "description": check_res}
except Exception as e:
return {"is_success": False, "description": traceback.format_exc()}
def summarize(message):
# [START parse_message]
data = message.data.decode("utf-8")
attributes = message.attributes
event_type = attributes["eventType"]
bucket_id = attributes["bucketId"]
object_id = attributes["objectId"]
generation = attributes["objectGeneration"]
description = (
"\tEvent type: {event_type}\n"
"\tBucket ID: {bucket_id}\n"
"\tObject ID: {object_id}\n"
"\tGeneration: {generation}\n"
).format(
event_type=event_type,
bucket_id=bucket_id,
object_id=object_id,
generation=generation,
)
if "overwroteGeneration" in attributes:
description += "\tOverwrote generation: %s\n" % (
attributes["overwroteGeneration"]
)
if "overwrittenByGeneration" in attributes:
description += "\tOverwritten by generation: %s\n" % (
attributes["overwrittenByGeneration"]
)
payload_format = attributes["payloadFormat"]
if payload_format == "JSON_API_V1":
object_metadata = json.loads(data)
size = object_metadata["size"]
content_type = object_metadata["contentType"]
metageneration = object_metadata["metageneration"]
description += (
"\tContent type: {content_type}\n"
"\tSize: {object_size}\n"
"\tMetageneration: {metageneration}\n"
).format(
content_type=content_type,
object_size=size,
metageneration=metageneration,
)
return {
"event_type": event_type,
"bucket_id": bucket_id,
"object_id": object_id,
"description": description
}
# [END parse_message]
def poll_notifications(project, subscription_name):
"""Polls a Cloud Pub/Sub subscription for new GCS events for display."""
# [START poll_notifications]
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name
)
def callback(message):
summarize_response = summarize(message)
event_type = summarize_response["event_type"]
bucket_id = summarize_response["bucket_id"]
object_id = summarize_response["object_id"]
description = summarize_response["description"]
print("Received message:\n{}".format(description))
if event_type == OBJECT_FINALIZE:
copy_response = copyObjects(
bucket_id, object_id, DESTINATION_BUCKET_NAME)
if copy_response['is_success'] is True:
print("copy object to gs://{}/{}".format(object_id,
DESTINATION_BUCKET_NAME))
else:
print("object copy error:\n{}".format(
copy_response['description']))
message.ack()
subscriber.subscribe(subscription_path, callback=callback)
# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print("Listening for messages on {}".format(subscription_path))
while True:
time.sleep(60)
# [END poll_notifications]
if __name__ == "__main__":
print("start service.")
poll_notifications(PROJECT_ID, SUBSCRIPTION_ID)
2-3. Container イメージを作成する
FROM python:3.7-alpine3.12
RUN apk --update-cache add \
linux-headers \
gcc \
g++ \
curl \
bash
RUN curl -sSL https://sdk.cloud.google.com | bash
ENV PATH $PATH:/root/google-cloud-sdk/bin
RUN apk update \
&& apk upgrade
COPY ./requirements.txt /tmp/requirements.txt
RUN pip install --upgrade pip
RUN pip install -r /tmp/requirements.txt
RUN apk --no-cache add tzdata && \
cp /usr/share/zoneinfo/Asia/Tokyo /etc/localtime && \
apk del tzdata
RUN mkdir -p /src
COPY ./app.py /src/app.py
WORKDIR /src
ENTRYPOINT ["python", "app.py"]
grpcio==1.29.0
google-cloud-pubsub==1.7.0
補足
wheels for grpcio
はPEP 517で廃止のためインストールができない
google-cloud-pubsubを追加しようとすると下記メッセージが発生し、イメージのビルドに失敗しますが「https://github.com/grpc/grpc/issues/21918」を参考にapk add linux-headers
したらビルドに成功しました
DEPRECATION: Could not build wheels for grpcio which do not use PEP 517. pip will fall back to legacy 'setup.py install' for these. pip 21.0 will remove support for this functionality. A possible replacement is to fix the wheel build issue reported above. You can find discussion regarding this at https://github.com/pypa/pip/issues/8368.
2-4. Cloud Runの作成
手順
- GCP-Cloud Runkコンソールへ遷移
- 「サービスを作成」をクリック
- デプロイメント プラットフォームで「Cloud Run for Anthos」を選択
- Anthos GKEクラスタを新規作成する
- Anthos GKEクラスタのゾーンを選択する
- Anthos GKEクラスタ名を入力する
- クイック作成をクリック
- Cloud Runサービス名を入力
- 接続は「内部」「外部」任意の方を選択
- 「次へ」をクリック
- 「サービスの最初のリビジョンの構成」で「ソースリポジトリから新しいリビジョンを継続的にデプロイする」を選択
- 詳細設定を表示リンクをクリックして2-5. 環境変数を登録の設定を行う
- 「作成」をクリックする
ポイント:10.で「サービスの最初のリビジョンの構成」で「ソースリポジトリから新しいリビジョンを継続的にデプロイする」を選択するとCloud Build
サービスを利用したCDサービスが利用できます。
2-5. 環境変数を登録
Cloud Runサービスが作成されたら「継続的デプロイの編集」から環境変数を登録します
また、コンテナイメージのビルドに時間がかかる場合も、「継続的デプロイの編集」のタイムアウト設定でタイムアウト時間を変更できます。
環境変数 | 値 |
---|---|
DESTINATION_BUCKET_NAME | 転送先のバケット名 |
PROJECT_ID | プロジェクトID |
SUBSCRIPTION_ID | サブスクリプションID |
OBJECT_FINALIZE | OBJECT_FINALIZE |
Cloud Buildを使わない場合
docker build -t ${IMAGE_NAME} .
docker tag ${IMAGE_NAME} gcr.io/${PROJECT_ID}/${IMAGE_NAME}
gcloud docker -- push gcr.io/${PROJECT_ID}/${IMAGE_NAME}:latest