LoginSignup
2
1

More than 1 year has passed since last update.

CloudStorageを経由してBigqueryに転送する方法

Posted at

Motive

巨大なテーブルをBigqueryに転送する方法 ではひとつのテーブルに約2億レコードあって約100GBある巨大なデータをBigqueryに移行する方法を書いています。 現在は移行作業は完了し、次のタスクとして日次で追加されているMySQLのデータをBigqueryに移行するスクリプトを作成していました。24時間分と前回のデータよりはかなり少ないので、サーバ自体のメモリや容量などので考慮すべき部分はないのでここで紹介する方法はよく使うパターンではないかと思います。また、前回ではデータの経由地点としてS3を使っていましたが、ここではGCPのCloud Storageを使っています。同じGCP内であるためか空のテーブルの作成や転送の構成を設定する必要がないので比較的簡単に転送は出来ました。ここではGCPのサービスのみでBIgqueryに転送する方法を紹介します。

Preparetion

必要なパッケージは下記の通りです。

pip install -r requirements.txt
pyarrow
google-auth
google-api-python-client
google-auth-httplib2
google-auth-oauthlib
google-cloud-bigquery
google-cloud-storage

Method

フローは3つのステップ。

  1. MySQLからparquet形式のファイルを出力
  2. Cloud Storageのbucketに転送
  3. Bigqueryに転送

定期バッチ.drawio.png

Development

全体のコードを掲載します。 process の後半を見ると前述の3つのステップ output_parquet cs_upload fill_lake を処理していることがわかると思います。 また、 cs_upload fill_lakeではBigqueryAPI、CloudStorageAPIを有効にしたGCPの認証ファイルを読み込ませる必要があります。認証ファイル作成方法については、Google APIを利用するためのサービスアカウントの設定(認証)を参照。

認証ファイル読み込み

        credentials = service_account.Credentials.from_service_account_file(
                './featurepoints_credential.json',
                scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )

コード全体

from google.oauth2 import service_account
from google.cloud import bigquery
from google.cloud import storage

import mysql.connector
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

import argparse
import datetime
import os
import time

# gloval valiable
DB_USER="--user name--"
DB_PASSWORD="--password--"
DB_NAME="--database name--"
BUCKET_NAME = "-- the name of bucket in the cloud storage --"

if __name__ == '__main__':
    process()

def process():

    parser = argparse.ArgumentParser()
    parser.add_argument('table_name', help='テーブル名')
    parser.add_argument('--prod', action='store_true', help='本番環境指定')
    parser.add_argument('--begin', help='where from')
    parser.add_argument('--end', help='where to')
    args = parser.parse_args()

    target_date = datetime.datetime.now() - datetime.timedelta(days=1)

    try:
        begin_date = target_date.strftime('%Y-%m-%d 00:00:00')
        end_date = target_date.strftime('%Y-%m-%d 23:59:59')

        if args.begin is not None and args.end is not None:
            begin_date = datetime.datetime.strptime(args.begin, '%Y-%m-%d')
            end_date = datetime.datetime.strptime(args.end, '%Y-%m-%d')
            begin_date = begin_date.strftime('%Y-%m-%d 00:00:00')
            end_date = end_date.strftime('%Y-%m-%d 23:59:59')

    except ValueError as e:
        print(e)

    print("from:{0} to:{1}".format(begin_date, end_date))

    try:
        #MySQL から parquet形式のファイルをexport
        output_parquet(args.table_name, begin_date, end_date)

        #parquetファイルをcloud strageにupload
        cs_upload(args.table_name)

        #cloud strageファイルをbigqueryにexport
        dataset_name = "development"
        if args.prod:
            dataset_name = "TWITTER"
        fill_lake(args.table_name, dataset_name)

    except Exception as e :
        print ("不明なエラーです")
        print (e)

def output_parquet(table_name, begin_date, end_date):
    """
        MySQL から parquet形式のファイルをexport
    """
    conn = mysql.connector.connect(user=DB_USER, database=DB_NAME,
                               password=DB_PASSWORD,
                               host="localhost",
                               port=3306)
    cursor = conn.cursor()

    query = "SELECT * FROM ONE_WEEK_{0} WHERE '{1}' <= created_at AND created_at <= '{2}'".format(table_name, begin_date, end_date)

    pd_frame = pd.read_sql(query, con=conn)
    conn.close()

    table = pa.Table.from_pandas(pd_frame)
    pq.write_table(table, './src/{}_segment.parquet'.format(table_name))

def cs_upload(table_name):
    """
        parquetファイルをcloud strageにupload
    """
    credentials = service_account.Credentials.from_service_account_file(
        './featurepoints_credential.json',
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )

    storage_client = storage.Client(
        credentials=credentials,
        project=credentials.project_id,
    )

    bucket = storage_client.get_bucket(BUCKET_NAME)

    source_file_name = './src/{}_segment.parquet'.format(table_name)
    blob = bucket.blob(os.path.basename(source_file_name))
    blob.upload_from_filename(source_file_name)

def fill_lake(table_name, dataset_name):
    """
        cloud strageファイルをbigqueryにexport
    """
    credentials = service_account.Credentials.from_service_account_file(
        './featurepoints_credential.json',
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )
    client = bigquery.Client(
        credentials=credentials,
        project=credentials.project_id,
    )

    dataset_ref = client.dataset(dataset_name)

    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.source_format = bigquery.SourceFormat.PARQUET

    bucket_uri = "gs://{0}/{1}_segment.parquet".format(BUCKET_NAME, table_name)

    load_job = client.load_table_from_uri(
                bucket_uri,
                dataset_ref.table(table_name),
                job_config=job_config
    )
    print("Starting job {}".format(load_job.job_id))

    load_job.result()
    print("Job finished.")

    destination_table = client.get_table(dataset_ref.table(table_name))
    print("Loaded {} rows.".format(destination_table.num_rows))

PostScript

日次の定期実行のスクリプトも作成し、Biqueryへの移行はひと通り終わりました。
しかし、データ移行するときにEC2のストレージを大きくしたのですが、縮小することができないみたいです。
調べると新しい手頃なサイズのストレージを一つ用意してデータを移管して、古いストレージを削除する方法だとできるみたいですが、いつやろうか:tired_face:
サーバ止めてるだけでも結構料金がかかるみたいです。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1