3
1

More than 1 year has passed since last update.

Lambda で BigQuery のデータを ZIP 化し S3 へアップロードする【無料枠】

Last updated at Posted at 2021-11-11

Google Cloud Platform (GCP) BigQuery から Amazon S3 へ転送する記事が少ない印象を受けたので、備忘録として初投稿します。
どなたかのお役に立てば幸いです。

1. やりたいこと

  1. AWS Lambda で BigQuery のデータを DataFrame で取得する
    • GCP との認証方法は以下を使用する
      • サービスアカウントキー + AWS SSM を使用する方法
      • Workload Identity 連携する方法
  2. データを CSV 化 & ZIP 圧縮し、 S3 へアップロードする

2. 前提

3. BigQuery

GCP BigQuery は Google 社が提供するデータ分析プラットフォームです。

今回はできるだけ無料枠の範囲で行いたいので、一度 BigQuery の無料枠について確認します。

3.1. 無料枠について

BigQuery では使用したクエリでスキャンされるデータ量と、ストレージの容量に応じて課金されます。(公式ドキュメント)

  • 例: 東京 (asia-northeast1) の料金
項目 料金 無料枠
クエリ (オンデマンド) $6.00 per TB 毎月 1 TB まで
BigQuery アクティブストレージ $0.023 per GB 毎月 10 GB まで

想定外の課金を防ぐために、事前に Dry run することで処理されるデータ量を確認することができます。(コードは後述)
ですが、どんなに小さいデータ量のクエリにしても、テーブル毎に最低 10 MB が課金対象となり無料枠を消費するので、注意が必要です。

また今回は関係ありませんが、よく一緒に使用される Google Cloud Storage (GCS) には無料枠対象リージョンが指定されています。
そのため GCS と一緒に無料枠で試したい場合には、BigQuery のリージョンにも注意が必要です。


今回は小さいテーブル 1 つで検証するので、相当連打しなければ問題なさそうです。
では実際にリソースを作成していきます。

4. 手順

  1. 事前確認 (省略可)
  2. サービスアカウントの作成
  3. GCP との認証設定
  4. モジュールを Lambda Layer に登録する
  5. Dry run でクエリの処理バイト数を確認する (疎通確認)
  6. 実際にデータを取得して S3 へアップロードする

コマンドやソースコード内の < > は適宜ご自身の環境の内容に置き換えて下さい。

4.1. 事前確認 (省略可)

gcloud のプロジェクトが目的のプロジェクトであることを確認します。

$ gcloud config list
:
Your active configuration is: [default]
$ gcloud config configurations list

NAME     IS_ACTIVE  ACCOUNT                  PROJECT          COMPUTE_DEFAULT_ZONE  COMPUTE_DEFAULT_REGION
default  True       xxxxxx                 <使用するPJ>

4.2. サービスアカウントの作成

Bigquery 用のサービスアカウント bq-test を作成し、ロールを設定します。
コンソールではIAM と管理 > サービスアカウント で作成、 IAM と管理 > IAM で確認&編集が出来ます。

$ gcloud iam service-accounts create bq-test \
    --description="Bigquery to S3" \
    --display-name="bq-test"

$ gcloud projects add-iam-policy-binding <PROJECT_ID> \
    --member="serviceAccount:bq-test@<PROJECT_ID>.iam.gserviceaccount.com" \
    --role="roles/bigquery.admin"
 [1] EXPRESSION=resource.type == "cloudresourcemanager.googleapis.com/Project", TITLE=test
 [2] None
 [3] Specify a new condition
The policy contains bindings with conditions, so specifying a condition is required when adding a
binding. Please specify a condition.: 1

今回ロールは大雑把ですがBigquery管理者(roles/bigquery.admin)とし、条件は1で行いました。
※ 各ロールについては公式ドキュメントを参照

これで Bigquery の操作が可能なサービスアカウントが作成されました。

4.3. GCP との認証設定

GCP API でデータにアクセスするための認証方法はいくつかあります。
今回は、以下2つの方法を記載します。

4.3.1. 方法1: サービスアカウントキー + AWS SSM パラメータストア

  1. サービスアカウントキーを作成する

    $ gcloud iam service-accounts keys create <KEY_FILE_PATH.json> \
        --iam-account=bq-test@<PROJECT_ID>.iam.gserviceaccount.com
    
  2. ダウンロードされたサービスアカウントキーをパラメータストアへ登録する
    SecureString で、値は鍵の中身を設定します。

  3. AWS Lambda で認証情報を設定する

    環境変数に GOOGLE_APPLICATION_CREDENTIALS を設定します。
    値はそのまま鍵の中身とすることは出来ないため、Lambda 上でファイルを作成し、パスを設定する必要があります。

    import boto3
    import json
    import os
    
    # パラメータストアから認証情報を取得
    def get_param():
        ssm = boto3.client('ssm', '<AWS_REGION>')
        response = ssm.get_parameters(
            Names=['<PARAMETER_PATH>'],
            WithDecryption=True
        )
        return response['Parameters'][0]['Value']
    
    # Google 認証ファイルを作成 + 環境変数に設定
    def set_gcp_auth(path):
        file = open(path, 'w')
        json.dump(json.loads(get_param()), file, indent=2)
        file.close()
    
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path
    
    def lambda_handler(event, context):
        path = '/tmp/credentials.json'
        try:
            set_gcp_auth(path)
        finally:
            # 一時ファイルを削除
            if os.path.isfile(path):
                os.remove(path)
    

これでサービスアカウントキーの場合の認証設定は完了です。

4.3.2. 方法2: Workload Identity 連携

この方法では、 GCP のサービスアカウントと AWS 等のロールを紐付け、サービスアカウントの権限を借用して認証します。(公式ブログ)

公式ドキュメントの手順に従い作成していきます。

  1. 事前確認
    GCP で以下 API が有効になっていることを確認します。

    • Identity and Access Management (IAM) API
    • Cloud Resource Manager API
    • IAM Service Account Credentials API
    • Security Token Service API
  2. AWS Lambda IAM ロールを作成する
    後の設定で使用するため、先にIAM ロールを作成します。

    【注意】: 検証時ここで作成したロールの名前が長すぎたため、 Lambda 実行の際にエラーとなりました。
    ロール名は短めがおすすめです。

  3. Workload Identity プールを作成する

    $ gcloud iam workload-identity-pools create "<POOL_NAME>" \
        --location="global" \
        --display-name="<DISPLAY_NAME>"
    
  4. Workload Identity プールで、サービスアカウントと Lambda の IAM ロールを紐付ける

    1. プールに AWS アカウント ID を プロバイダとして追加する

      $ gcloud iam workload-identity-pools providers create-aws "<PROVIDER_NAME>" \
          --location="global" \
          --workload-identity-pool="<POOL_NAME>" \
          --account-id="<AWS_ACCOUNT_ID>" \
          --display-name="<DISPLAY_NAME>"
      
    2. 権限を借用するサービスアカウント bq-test に、プリンシパル(権限借用を許可する ID)を設定する
      権限はワークロード ID ユーザー(roles/iam.workloadIdentityUser) を設定します。
      また、今回は作成した IAM ロールを持つすべての ID を許可します。

      $ gcloud iam service-accounts add-iam-policy-binding "bq-test@<PROJECT_ID>.iam.gserviceaccount.com" \
          --role="roles/iam.workloadIdentityUser" \
          --member="principalSet://iam.googleapis.com/projects/<PROJECT_NUMBER>/locations/global/workloadIdentityPools/<POOL_ID>/attribute.aws_role/arn:aws:sts::<AWS_ACCOUNT_ID>:assumed-role/<AWS_ROLE_NAME>"
      
    3. サービスアカウントの認証情報ファイルをダウンロードする
      キー情報のないファイルがダウンロードされます。

      $ gcloud iam workload-identity-pools create-cred-config "projects/<PROJECT_NUMBER>/locations/global/workloadIdentityPools/<POOL_ID>/providers/<PROVIDER_NAME>" \
          --service-account="bq-test@<PROJECT_ID>.iam.gserviceaccount.com" \
          --output-file="<FILE_PATH.json>" \
          --aws
      
  5. AWS Lambda で認証情報を設定する
    Lambda にソースコードと共に認証ファイルを配置し、環境変数に以下を設定します。

    • GOOGLE_APPLICATION_CREDENTIALS = <FILE_PATH>
    • GOOGLE_CLOUD_PROJECT = <PROJECT_ID>

これで Workload Identity 連携での認証設定は完了です。

4.4. モジュールを Lambda Layer に登録する

認証設定が完了したら実際に Lambda を作成していきます。

今回は 取得したデータを CSV 化するため、 google-cloud-bigquery に加え、 pandas も使用します。
以下コマンドで作成した ZIP ファイルを Lambda Layer に登録します。

【注意】: 今回は Lambda で実行するため、 pandas は AmazonLinux 環境かつ実行環境と同じ Python versionでインストールしないとエラーとなります。
今回は AWS Cloud 9 で行いました。(方法はこちら)

$ pip install google-cloud-bigquery pandas-gbq 'google-cloud-bigquery[pandas]' -t ./python/
$ zip -r google-cloud-bigquery.zip python

$ pip install protobuf --upgrade -t ./python/
$ zip -r google-cloud-bigquery.zip ./python/google/protobuf

4.5. Dry run でクエリの処理バイト数を確認する (疎通確認)

公式ドキュメントを参照し、実際に GCP にアクセスができるか Dry run で試します。
ライブラリの詳細はこちらをご参照ください。

from google.cloud import bigquery

# Dry run でクエリのデータ処理バイト数を確認
def get_total_bytes(client, sql):
    job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
    query_job = client.query(sql, job_config=job_config)

    total_bytes = query_job.total_bytes_processed
    print('This query will process {} bytes.'.format(total_bytes))
    return total_bytes

def lambda_handler(event, context):
    # サービスアカウントキーの方法の認証設定は省略
    client = bigquery.Client()
    sql = 'SELECT < COLUMN > FROM `<PROJECT_ID>.<DATASET>.<TABLE>`;'
    total_bytes = get_total_bytes(client, sql)

4.6. 実際にデータを取得して S3 へアップロードする

認証が通り無事 Dry run が出来たら、今度は実際にデータを取得して S3 へアップロードしてみます。

クエリ実行部分は公式ドキュメントを参照しました。
DataFrame でデータを取得したら、pandas の to_csv() で compression='gzip' を指定すれば簡単に ZIP 化できます。
※ Dry run は念の為記載しています。

import sys
import boto3
import json
import os
from google.cloud import bigquery
import pandas as pd

# 関数は省略

def lambda_handler(event, context):
    path = '/tmp/credentials.json'
    result_path = '/tmp/bq_data.csv.gz'
    try:
        set_gcp_auth(path) # Workload Identity 連携の場合は不要

        client = bigquery.Client()
        sql = 'SELECT <COLUMN> FROM `<PROJECT_ID>.<DATASET>.<TABLE>`;'

        # Dry run で 100 bytes より大きい場合は処理を終了する
        total_bytes = get_total_bytes(client, sql)
        if total_bytes > 100:
            sys.exit()

        # SELECT を実行し DataFrame のデータを CSV 化 + ZIP 化する
        df = client.query(sql).to_dataframe()
        df.to_csv(result_path, sep=',', header=True, index=False, compression='gzip')

        # S3 へアップロード
        s3 = boto3.resource('s3')
        bucket = s3.Bucket('<BUCKET_NAME>')
        bucket.upload_file(result_path, 'bq_data.csv.gz')
    finally:
        # 一時ファイルを削除
        if os.path.isfile(path):
            os.remove(path)
        if os.path.isfile(result_path):
            os.remove(result_path)

これで BigQuery のデータを S3 に転送することができました。
お疲れ様でした!

5. 参考サイト一覧

ありがとうございました。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1