Google Cloud Platform (GCP) BigQuery から Amazon S3 へ転送する記事が少ない印象を受けたので、備忘録として初投稿します。
どなたかのお役に立てば幸いです。
1. やりたいこと
- AWS Lambda で BigQuery のデータを DataFrame で取得する
- GCP との認証方法は以下を使用する
- サービスアカウントキー + AWS SSM を使用する方法
- Workload Identity 連携する方法
- GCP との認証方法は以下を使用する
- データを CSV 化 & ZIP 圧縮し、 S3 へアップロードする
2. 前提
- GCP のプロジェクト、AWS のアカウント等が作成済みであること
- Google Cloud SDK (gcloud) が使用できること
インストールは、こちらのGoogle Cloud SDK(gcloud) を Homebrew 経由で Mac にインストールする方法を参考にしました。 - BigQuery のデータセットとテーブルは作成済みであること
テストデータ作成は、こちらの記事を参考にしました。非常にわかりやすいです。
3. BigQuery
GCP BigQuery は Google 社が提供するデータ分析プラットフォームです。
今回はできるだけ無料枠の範囲で行いたいので、一度 BigQuery の無料枠について確認します。
3.1. 無料枠について
BigQuery では使用したクエリでスキャンされるデータ量と、ストレージの容量に応じて課金されます。(公式ドキュメント)
- 例: 東京 (asia-northeast1) の料金
項目 | 料金 | 無料枠 |
---|---|---|
クエリ (オンデマンド) | $6.00 per TB | 毎月 1 TB まで |
BigQuery アクティブストレージ | $0.023 per GB | 毎月 10 GB まで |
想定外の課金を防ぐために、事前に Dry run することで処理されるデータ量を確認することができます。(コードは後述)
ですが、どんなに小さいデータ量のクエリにしても、テーブル毎に最低 10 MB が課金対象となり無料枠を消費するので、注意が必要です。
また今回は関係ありませんが、よく一緒に使用される Google Cloud Storage (GCS) には無料枠対象リージョンが指定されています。
そのため GCS と一緒に無料枠で試したい場合には、BigQuery のリージョンにも注意が必要です。
今回は小さいテーブル 1 つで検証するので、相当連打しなければ問題なさそうです。
では実際にリソースを作成していきます。
4. 手順
- 事前確認 (省略可)
- サービスアカウントの作成
- GCP との認証設定
- モジュールを Lambda Layer に登録する
- Dry run でクエリの処理バイト数を確認する (疎通確認)
- 実際にデータを取得して S3 へアップロードする
※ コマンドやソースコード内の < >
は適宜ご自身の環境の内容に置き換えて下さい。
4.1. 事前確認 (省略可)
gcloud のプロジェクトが目的のプロジェクトであることを確認します。
$ gcloud config list
:
Your active configuration is: [default]
$ gcloud config configurations list
NAME IS_ACTIVE ACCOUNT PROJECT COMPUTE_DEFAULT_ZONE COMPUTE_DEFAULT_REGION
default True xxxxxx <使用するPJ>
4.2. サービスアカウントの作成
Bigquery 用のサービスアカウント bq-test
を作成し、ロールを設定します。
コンソールではIAM と管理 > サービスアカウント
で作成、 IAM と管理 > IAM
で確認&編集が出来ます。
$ gcloud iam service-accounts create bq-test \
--description="Bigquery to S3" \
--display-name="bq-test"
$ gcloud projects add-iam-policy-binding <PROJECT_ID> \
--member="serviceAccount:bq-test@<PROJECT_ID>.iam.gserviceaccount.com" \
--role="roles/bigquery.admin"
[1] EXPRESSION=resource.type == "cloudresourcemanager.googleapis.com/Project", TITLE=test
[2] None
[3] Specify a new condition
The policy contains bindings with conditions, so specifying a condition is required when adding a
binding. Please specify a condition.: 1
今回ロールは大雑把ですがBigquery管理者(roles/bigquery.admin)
とし、条件は1
で行いました。
※ 各ロールについては公式ドキュメントを参照
これで Bigquery の操作が可能なサービスアカウントが作成されました。
4.3. GCP との認証設定
GCP API でデータにアクセスするための認証方法はいくつかあります。
今回は、以下2つの方法を記載します。
4.3.1. 方法1: サービスアカウントキー + AWS SSM パラメータストア
-
サービスアカウントキーを作成する
$ gcloud iam service-accounts keys create <KEY_FILE_PATH.json> \ --iam-account=bq-test@<PROJECT_ID>.iam.gserviceaccount.com
ダウンロードされたサービスアカウントキーをパラメータストアへ登録する
SecureString で、値は鍵の中身を設定します。-
AWS Lambda で認証情報を設定する
環境変数に
GOOGLE_APPLICATION_CREDENTIALS
を設定します。
値はそのまま鍵の中身とすることは出来ないため、Lambda 上でファイルを作成し、パスを設定する必要があります。import boto3 import json import os # パラメータストアから認証情報を取得 def get_param(): ssm = boto3.client('ssm', '<AWS_REGION>') response = ssm.get_parameters( Names=['<PARAMETER_PATH>'], WithDecryption=True ) return response['Parameters'][0]['Value'] # Google 認証ファイルを作成 + 環境変数に設定 def set_gcp_auth(path): file = open(path, 'w') json.dump(json.loads(get_param()), file, indent=2) file.close() os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path def lambda_handler(event, context): path = '/tmp/credentials.json' try: set_gcp_auth(path) finally: # 一時ファイルを削除 if os.path.isfile(path): os.remove(path)
これでサービスアカウントキーの場合の認証設定は完了です。
4.3.2. 方法2: Workload Identity 連携
この方法では、 GCP のサービスアカウントと AWS 等のロールを紐付け、サービスアカウントの権限を借用して認証します。(公式ブログ)
公式ドキュメントの手順に従い作成していきます。
-
事前確認
GCP で以下 API が有効になっていることを確認します。- Identity and Access Management (IAM) API
- Cloud Resource Manager API
- IAM Service Account Credentials API
- Security Token Service API
-
AWS Lambda IAM ロールを作成する
後の設定で使用するため、先にIAM ロールを作成します。【注意】: 検証時ここで作成したロールの名前が長すぎたため、 Lambda 実行の際にエラーとなりました。
ロール名は短めがおすすめです。 -
Workload Identity プールを作成する
$ gcloud iam workload-identity-pools create "<POOL_NAME>" \ --location="global" \ --display-name="<DISPLAY_NAME>"
-
Workload Identity プールで、サービスアカウントと Lambda の IAM ロールを紐付ける
-
プールに AWS アカウント ID を プロバイダとして追加する
$ gcloud iam workload-identity-pools providers create-aws "<PROVIDER_NAME>" \ --location="global" \ --workload-identity-pool="<POOL_NAME>" \ --account-id="<AWS_ACCOUNT_ID>" \ --display-name="<DISPLAY_NAME>"
-
権限を借用するサービスアカウント
bq-test
に、プリンシパル(権限借用を許可する ID)を設定する
権限はワークロード ID ユーザー(roles/iam.workloadIdentityUser) を設定します。
また、今回は作成した IAM ロールを持つすべての ID を許可します。$ gcloud iam service-accounts add-iam-policy-binding "bq-test@<PROJECT_ID>.iam.gserviceaccount.com" \ --role="roles/iam.workloadIdentityUser" \ --member="principalSet://iam.googleapis.com/projects/<PROJECT_NUMBER>/locations/global/workloadIdentityPools/<POOL_ID>/attribute.aws_role/arn:aws:sts::<AWS_ACCOUNT_ID>:assumed-role/<AWS_ROLE_NAME>"
-
サービスアカウントの認証情報ファイルをダウンロードする
キー情報のないファイルがダウンロードされます。$ gcloud iam workload-identity-pools create-cred-config "projects/<PROJECT_NUMBER>/locations/global/workloadIdentityPools/<POOL_ID>/providers/<PROVIDER_NAME>" \ --service-account="bq-test@<PROJECT_ID>.iam.gserviceaccount.com" \ --output-file="<FILE_PATH.json>" \ --aws
-
-
AWS Lambda で認証情報を設定する
Lambda にソースコードと共に認証ファイルを配置し、環境変数に以下を設定します。-
GOOGLE_APPLICATION_CREDENTIALS
=<FILE_PATH>
-
GOOGLE_CLOUD_PROJECT
=<PROJECT_ID>
-
これで Workload Identity 連携での認証設定は完了です。
4.4. モジュールを Lambda Layer に登録する
認証設定が完了したら実際に Lambda を作成していきます。
今回は 取得したデータを CSV 化するため、 google-cloud-bigquery に加え、 pandas も使用します。
以下コマンドで作成した ZIP ファイルを Lambda Layer に登録します。
【注意】: 今回は Lambda で実行するため、 pandas は AmazonLinux 環境かつ実行環境と同じ Python versionでインストールしないとエラーとなります。
今回は AWS Cloud 9 で行いました。(方法はこちら)
$ pip install google-cloud-bigquery pandas-gbq 'google-cloud-bigquery[pandas]' -t ./python/
$ zip -r google-cloud-bigquery.zip python
$ pip install protobuf --upgrade -t ./python/
$ zip -r google-cloud-bigquery.zip ./python/google/protobuf
4.5. Dry run でクエリの処理バイト数を確認する (疎通確認)
公式ドキュメントを参照し、実際に GCP にアクセスができるか Dry run で試します。
ライブラリの詳細はこちらをご参照ください。
from google.cloud import bigquery
# Dry run でクエリのデータ処理バイト数を確認
def get_total_bytes(client, sql):
job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
query_job = client.query(sql, job_config=job_config)
total_bytes = query_job.total_bytes_processed
print('This query will process {} bytes.'.format(total_bytes))
return total_bytes
def lambda_handler(event, context):
# サービスアカウントキーの方法の認証設定は省略
client = bigquery.Client()
sql = 'SELECT < COLUMN > FROM `<PROJECT_ID>.<DATASET>.<TABLE>`;'
total_bytes = get_total_bytes(client, sql)
4.6. 実際にデータを取得して S3 へアップロードする
認証が通り無事 Dry run が出来たら、今度は実際にデータを取得して S3 へアップロードしてみます。
クエリ実行部分は公式ドキュメントを参照しました。
DataFrame でデータを取得したら、pandas の to_csv() で compression='gzip'
を指定すれば簡単に ZIP 化できます。
※ Dry run は念の為記載しています。
import sys
import boto3
import json
import os
from google.cloud import bigquery
import pandas as pd
# 関数は省略
def lambda_handler(event, context):
path = '/tmp/credentials.json'
result_path = '/tmp/bq_data.csv.gz'
try:
set_gcp_auth(path) # Workload Identity 連携の場合は不要
client = bigquery.Client()
sql = 'SELECT <COLUMN> FROM `<PROJECT_ID>.<DATASET>.<TABLE>`;'
# Dry run で 100 bytes より大きい場合は処理を終了する
total_bytes = get_total_bytes(client, sql)
if total_bytes > 100:
sys.exit()
# SELECT を実行し DataFrame のデータを CSV 化 + ZIP 化する
df = client.query(sql).to_dataframe()
df.to_csv(result_path, sep=',', header=True, index=False, compression='gzip')
# S3 へアップロード
s3 = boto3.resource('s3')
bucket = s3.Bucket('<BUCKET_NAME>')
bucket.upload_file(result_path, 'bq_data.csv.gz')
finally:
# 一時ファイルを削除
if os.path.isfile(path):
os.remove(path)
if os.path.isfile(result_path):
os.remove(result_path)
これで BigQuery のデータを S3 に転送することができました。
お疲れ様でした!
5. 参考サイト一覧
-
GCP 公式ドキュメント
-
その他記事
ありがとうございました。