LoginSignup
2
3

More than 3 years have passed since last update.

Cloud Schedulerを使ってみた

Last updated at Posted at 2019-12-02

この記事はfreeeデータに関わる人たち Advent Calendar 2019の3日目のエントリーです。

はじめまして。
Analyticsチームで基盤を担当している笹尾といいます。
freeeに転職して2週間が経ちました。初めて学ぶ仕組みも多く、日々勉強中です。

まえがき

とあるコンペで、GCS→BigQueryへの日次転送の仕組みを作ることになりました。
やりたいことは単純なので、なるべくシンプルな作りにして手早く作りたい。
日次転送なのでcron的なものがあれば助かるのですが、GCPにはそのようなマネージドサービスが無く、GCEやGAEを使うしかありません。(*古い情報です)
その時はGCEを立てて、前処理からGCSに上げてBigQueryに転送するまでの処理をcronで実行するようにしました。

作ってから気づいた

GCPにはCloudSchedulerがあるやん・・・約1年前にリリースされてました

その発表には間に合いませんが、せっかくなので使ってみました。

全体構成

Functionsで、GCSにあるcsvをロードするpythonを実行します。
Cloud Scheduler → Cloud PubSub → Cloud Functions → (GCS→BQ)

Cloud Scheduler

午前2時に実行されるようにセット。
タイムゾーンの設定が大事です。

gcloud scheduler jobs create pubsub JOB --schedule=“0 2 * * *” \
--topic=(トピック名) --time-zone="Asia/Tokyo"

Cloud Schedulerによる起動先は、HTTPとpubsubがあります。
Cloud FunctionsのHTTPを直接叩けるようにしないのは、そうすると誰でも関数が実行できる状態になってしまうためです。

Cloud PubSub

トピックはコンソールから作りました。
[トピックの作成]→名前を入力して[作成]

Cloud Functions

カレントディレクトリにあるpythonコードと設定ファイルをデプロイします。

gcloud functions deploy (関数名) --trigger-resource (トピック名)\
--trigger-event google.pubsub.topic.publish --runtime=python37 \
--env-vars-file env.yaml --service-account=(サービスアカウント名) --source .

このとき、--source .は大事です。これが無いと、2回目以降のデプロイ時にソースの変更が反映されないことがあります。
env.yamlは環境変数の定義ファイルで、次のpythonに出てくる環境変数の値が書かれています。

python

・messageの取得
・Credentialの復号
・BigQueryへロード
の順にやっています。(messageのstart_dateは、過去分を手動で実行させたいとき用です)

def load_from_gcs(event, context):
    import os
    import datetime
    import json
    import base64

    from google.cloud import bigquery
    from google.cloud import storage
    from google.cloud import kms_v1
    from google.oauth2 import service_account

    # 環境変数から取得
    PROJECT_ID = os.environ['PROJECT_ID']
    LOCATION = os.environ['LOCATION']
    KEY_RING = os.environ['KEY_RING']
    KEY_NAME = os.environ['KEY_NAME']
    DATASET = os.environ['DATASET']
    TABLE = os.environ['TABLE']

    # messageを取得
    messages = json.loads(base64.b64decode(event['data']).decode())
    start_date_str = messages.get('start_date', datetime.datetime.now().strftime('%Y%m%d'))

    # 鍵でCredentialを復号
    kms_client = kms_v1.KeyManagementServiceClient()
    name = kms_client.crypto_key_path_path(PROJECT_ID,LOCATION,KEY_RING,KEY_NAME)
    res_kms = kms_client.decrypt(name, base64.b64decode(os.environ['CIPHERTEXT']))
    json_key = json.loads(res_kms.plaintext)

    scope = ["https://www.googleapis.com/auth/cloud-platform"]
    credentials = service_account.Credentials.from_service_account_info(json_key, scopes=scope)

    # BigQueryへロード
    job_conf = bigquery.LoadJobConfig()
    job_conf.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
    job_conf.skip_leading_rows=1
    job_conf.source_format = bigquery.SourceFormat.CSV
    job_conf.allow_quoted_newlines = True

    bq_client = bigquery.Client(credentials=credentials)
    table_ref = bq_client.dataset('mydataset').table('mytable')

    uri = 'gs://GCSのファイルパス.csv'.format(start_date_str)

    load_job = bq_client.load_table_from_uri(uri, table_ref, job_config=job_conf)
    load_job.result()

感想

cron部分だけを取り出せるのは便利ですね。
ただドキュメント見ると、まれに処理が複数回実行されてしまうようで、何らかの対策が必要とのこと。
あと、CloudFunctionsのCredential部分に関しては、もっと良いやり方がありそう。

おまけ

本当にGCS→BigQueryの転送だけでいいなら、BigQueryDataTransferの方が簡単ぽい

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3