LoginSignup
1
6

More than 3 years have passed since last update.

Cloud Runを使ってみた

Last updated at Posted at 2020-09-09

動機

GCPのサービス関連系といえばCloud Functionがありますがプロジェクト間のCloud Storage オブジェクトのコピーができなかったため、今回は別の方法で実装しようと思いCloud Runを使ってみることにしました。

サンプルプログラムの概要

プロジェクトAのCloud Storageへファイルがアップロードされると、プロジェクトBのCloud Storageへファイルがコピーされるというサンプルプログラムです。
サンプルプログラム:Github - run-cloud-storage-migration-service

1. 設計

アーキテクチャ

Cloud Run 設計 (3).png

  1. ユーザーがCloud StorageへファイルをアップロードしたらCloud StorageがCloud Pub/Subへアップロード通知を行う。
  2. サブスクライバはアップロード通知を受け取ると、プロジェクトAのCloud StorageからプロジェクトBのCloud Storageへアップロードファイルをコピーする

2. 実装

2-1. Cloud Pub/Subの設定 

公式ドキュメント-トピックとサブスクリプションの管理

2-1-1. Cloud Storage の Pub/Sub トピック作成

gsuil コマンド

export TOPIC_NAME=my_gcs_notification
export BUCKET_NAME=minarai-sample-backet
# 作成
% gsutil notification create -t ${TOPIC_NAME} -f json gs://${BUCKET_NAME}
Created Cloud Pub/Sub topic projects/[YOUR_PROJECT_ID]/topics/my_gcs_notification
Created notification config projects/_/buckets/minarai-sample-backet/notificationConfigs/1

# 確認
% gsutil notification list gs://${BUCKET_NAME}
projects/_/buckets/minarai-sample-backet/notificationConfigs/1
    Cloud Pub/Sub topic: projects/[YOUR_PROJECT_ID]/topics/my_gcs_notification

2-1-2. サブスクリプションの作成

公式ドキュメント

gcloudコマンド

export TOPIC_NAME=my_gcs_notification
export SUBSCRIPTION_ID=gcs_notification_sub
gcloud beta pubsub subscriptions create ${SUBSCRIPTION_ID} --topic=${TOPIC_NAME}

2-2. pullメッセージの受信スクリプト実装

下記サンプルプログラムをGithubに上げておきます。

subscriber.py
from google.cloud import pubsub_v1
import json
import os
import subprocess
import time
import traceback


DESTINATION_BUCKET_NAME = os.getenv('DESTINATION_BUCKET_NAME')
PROJECT_ID = os.getenv('PROJECT_ID')
SUBSCRIPTION_ID = os.getenv('SUBSCRIPTION_ID')
OBJECT_FINALIZE = os.getenv('OBJECT_FINALIZE', 'OBJECT_FINALIZE')


def copyObjects(bucket_name: str, file_name: str, destination_bucket_name: str) -> dict:
    copy = "gsutil -m cp gs://{bucket}/{file} gs://{d_bucket}/".format(
        bucket=bucket_name,
        file=file_name,
        d_bucket=destination_bucket_name)

    try:

        check_res = subprocess.getoutput(copy)

        if ("completed" in check_res) == True:
            return {"is_success": True, "description": check_res}

        return {"is_success": False, "description": check_res}
    except Exception as e:
        return {"is_success": False, "description": traceback.format_exc()}


def summarize(message):
    # [START parse_message]
    data = message.data.decode("utf-8")
    attributes = message.attributes

    event_type = attributes["eventType"]
    bucket_id = attributes["bucketId"]
    object_id = attributes["objectId"]
    generation = attributes["objectGeneration"]
    description = (
        "\tEvent type: {event_type}\n"
        "\tBucket ID: {bucket_id}\n"
        "\tObject ID: {object_id}\n"
        "\tGeneration: {generation}\n"
    ).format(
        event_type=event_type,
        bucket_id=bucket_id,
        object_id=object_id,
        generation=generation,
    )

    if "overwroteGeneration" in attributes:
        description += "\tOverwrote generation: %s\n" % (
            attributes["overwroteGeneration"]
        )
    if "overwrittenByGeneration" in attributes:
        description += "\tOverwritten by generation: %s\n" % (
            attributes["overwrittenByGeneration"]
        )

    payload_format = attributes["payloadFormat"]
    if payload_format == "JSON_API_V1":
        object_metadata = json.loads(data)
        size = object_metadata["size"]
        content_type = object_metadata["contentType"]
        metageneration = object_metadata["metageneration"]
        description += (
            "\tContent type: {content_type}\n"
            "\tSize: {object_size}\n"
            "\tMetageneration: {metageneration}\n"
        ).format(
            content_type=content_type,
            object_size=size,
            metageneration=metageneration,
        )
    return {
        "event_type": event_type,
        "bucket_id": bucket_id,
        "object_id": object_id,
        "description": description
    }
    # [END parse_message]


def poll_notifications(project, subscription_name):
    """Polls a Cloud Pub/Sub subscription for new GCS events for display."""
    # [START poll_notifications]
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name
    )

    def callback(message):
        summarize_response = summarize(message)
        event_type = summarize_response["event_type"]
        bucket_id = summarize_response["bucket_id"]
        object_id = summarize_response["object_id"]
        description = summarize_response["description"]

        print("Received message:\n{}".format(description))
        if event_type == OBJECT_FINALIZE:
            copy_response = copyObjects(
                bucket_id, object_id, DESTINATION_BUCKET_NAME)
            if copy_response['is_success'] is True:
                print("copy object to gs://{}/{}".format(object_id,
                                                         DESTINATION_BUCKET_NAME))
            else:
                print("object copy error:\n{}".format(
                    copy_response['description']))

        message.ack()

    subscriber.subscribe(subscription_path, callback=callback)

    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print("Listening for messages on {}".format(subscription_path))
    while True:
        time.sleep(60)
    # [END poll_notifications]


if __name__ == "__main__":
    print("start service.")
    poll_notifications(PROJECT_ID, SUBSCRIPTION_ID)

2-3. Container イメージを作成する

FROM python:3.7-alpine3.12


RUN apk --update-cache add \
    linux-headers \
    gcc \
    g++ \
    curl \
    bash

RUN curl -sSL https://sdk.cloud.google.com | bash
ENV PATH $PATH:/root/google-cloud-sdk/bin

RUN apk update  \
    && apk upgrade

COPY ./requirements.txt /tmp/requirements.txt
RUN pip install --upgrade pip
RUN pip install -r /tmp/requirements.txt

RUN apk --no-cache add tzdata && \
    cp /usr/share/zoneinfo/Asia/Tokyo /etc/localtime && \
    apk del tzdata

RUN mkdir -p /src

COPY ./app.py /src/app.py

WORKDIR /src

ENTRYPOINT ["python", "app.py"]
requirements.txt
grpcio==1.29.0
google-cloud-pubsub==1.7.0

補足

wheels for grpcioはPEP 517で廃止のためインストールができない

google-cloud-pubsubを追加しようとすると下記メッセージが発生し、イメージのビルドに失敗しますが「https://github.com/grpc/grpc/issues/21918」を参考にapk add linux-headersしたらビルドに成功しました

DEPRECATION: Could not build wheels for grpcio which do not use PEP 517. pip will fall back to legacy 'setup.py install' for these. pip 21.0 will remove support for this functionality. A possible replacement is to fix the wheel build issue reported above. You can find discussion regarding this at https://github.com/pypa/pip/issues/8368.

2-4. Cloud Runの作成

手順

  1. GCP-Cloud Runkコンソールへ遷移
  2. 「サービスを作成」をクリック
  3. デプロイメント プラットフォームで「Cloud Run for Anthos」を選択
  4. Anthos GKEクラスタを新規作成する
  5. Anthos GKEクラスタのゾーンを選択する
  6. Anthos GKEクラスタ名を入力する
  7. クイック作成をクリック
  8. Cloud Runサービス名を入力
  9. 接続は「内部」「外部」任意の方を選択
  10. 「次へ」をクリック
  11. 「サービスの最初のリビジョンの構成」で「ソースリポジトリから新しいリビジョンを継続的にデプロイする」を選択
  12. 詳細設定を表示リンクをクリックして2-5. 環境変数を登録の設定を行う
  13. 「作成」をクリックする

ポイント:10.で「サービスの最初のリビジョンの構成」で「ソースリポジトリから新しいリビジョンを継続的にデプロイする」を選択するとCloud Buildサービスを利用したCDサービスが利用できます。

2-5. 環境変数を登録

Cloud Runサービスが作成されたら「継続的デプロイの編集」から環境変数を登録します
また、コンテナイメージのビルドに時間がかかる場合も、「継続的デプロイの編集」のタイムアウト設定でタイムアウト時間を変更できます。

環境変数
DESTINATION_BUCKET_NAME 転送先のバケット名
PROJECT_ID プロジェクトID
SUBSCRIPTION_ID サブスクリプションID
OBJECT_FINALIZE OBJECT_FINALIZE

Cloud Buildを使わない場合

docker build -t ${IMAGE_NAME} .

docker tag ${IMAGE_NAME} gcr.io/${PROJECT_ID}/${IMAGE_NAME}
gcloud docker --  push gcr.io/${PROJECT_ID}/${IMAGE_NAME}:latest
1
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
6