この記事はfreeeデータに関わる人たち Advent Calendar 2019の3日目のエントリーです。
はじめまして。
Analyticsチームで基盤を担当している笹尾といいます。
freeeに転職して2週間が経ちました。初めて学ぶ仕組みも多く、日々勉強中です。
#まえがき
とあるコンペで、GCS→BigQueryへの日次転送の仕組みを作ることになりました。
やりたいことは単純なので、なるべくシンプルな作りにして手早く作りたい。
日次転送なのでcron的なものがあれば助かるのですが、GCPにはそのようなマネージドサービスが無く、GCEやGAEを使うしかありません。(*古い情報です)
その時はGCEを立てて、前処理からGCSに上げてBigQueryに転送するまでの処理をcronで実行するようにしました。
#作ってから気づいた
GCPにはCloudSchedulerがあるやん・・・約1年前にリリースされてました
その発表には間に合いませんが、せっかくなので使ってみました。
#全体構成
Functionsで、GCSにあるcsvをロードするpythonを実行します。
Cloud Scheduler → Cloud PubSub → Cloud Functions → (GCS→BQ)
#Cloud Scheduler
午前2時に実行されるようにセット。
タイムゾーンの設定が大事です。
gcloud scheduler jobs create pubsub JOB --schedule=“0 2 * * *” \
--topic=(トピック名) --time-zone="Asia/Tokyo"
Cloud Schedulerによる起動先は、HTTPとpubsubがあります。
Cloud FunctionsのHTTPを直接叩けるようにしないのは、そうすると誰でも関数が実行できる状態になってしまうためです。
#Cloud PubSub
トピックはコンソールから作りました。
[トピックの作成]→名前を入力して[作成]
#Cloud Functions
カレントディレクトリにあるpythonコードと設定ファイルをデプロイします。
gcloud functions deploy (関数名) --trigger-resource (トピック名)\
--trigger-event google.pubsub.topic.publish --runtime=python37 \
--env-vars-file env.yaml --service-account=(サービスアカウント名) --source .
このとき、__--source .__は大事です。これが無いと、2回目以降のデプロイ時にソースの変更が反映されないことがあります。
env.yamlは環境変数の定義ファイルで、次のpythonに出てくる環境変数の値が書かれています。
python
・messageの取得
・Credentialの復号
・BigQueryへロード
の順にやっています。(messageのstart_dateは、過去分を手動で実行させたいとき用です)
def load_from_gcs(event, context):
import os
import datetime
import json
import base64
from google.cloud import bigquery
from google.cloud import storage
from google.cloud import kms_v1
from google.oauth2 import service_account
# 環境変数から取得
PROJECT_ID = os.environ['PROJECT_ID']
LOCATION = os.environ['LOCATION']
KEY_RING = os.environ['KEY_RING']
KEY_NAME = os.environ['KEY_NAME']
DATASET = os.environ['DATASET']
TABLE = os.environ['TABLE']
# messageを取得
messages = json.loads(base64.b64decode(event['data']).decode())
start_date_str = messages.get('start_date', datetime.datetime.now().strftime('%Y%m%d'))
# 鍵でCredentialを復号
kms_client = kms_v1.KeyManagementServiceClient()
name = kms_client.crypto_key_path_path(PROJECT_ID,LOCATION,KEY_RING,KEY_NAME)
res_kms = kms_client.decrypt(name, base64.b64decode(os.environ['CIPHERTEXT']))
json_key = json.loads(res_kms.plaintext)
scope = ["https://www.googleapis.com/auth/cloud-platform"]
credentials = service_account.Credentials.from_service_account_info(json_key, scopes=scope)
# BigQueryへロード
job_conf = bigquery.LoadJobConfig()
job_conf.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
job_conf.skip_leading_rows=1
job_conf.source_format = bigquery.SourceFormat.CSV
job_conf.allow_quoted_newlines = True
bq_client = bigquery.Client(credentials=credentials)
table_ref = bq_client.dataset('mydataset').table('mytable')
uri = 'gs://GCSのファイルパス.csv'.format(start_date_str)
load_job = bq_client.load_table_from_uri(uri, table_ref, job_config=job_conf)
load_job.result()
#感想
cron部分だけを取り出せるのは便利ですね。
ただドキュメント見ると、まれに処理が複数回実行されてしまうようで、何らかの対策が必要とのこと。
あと、CloudFunctionsのCredential部分に関しては、もっと良いやり方がありそう。
#おまけ
本当にGCS→BigQueryの転送だけでいいなら、BigQueryDataTransferの方が簡単ぽい