LoginSignup
4
6

More than 5 years have passed since last update.

PythonでBigQueryの差分更新をする方法

Last updated at Posted at 2018-12-11

この記事は ウェブクルー Advent Calendar 2018の12日目の記事です。
昨日は昨日は@wc-kobayashiTさんの「CSSアニメーションで動きのあるサイトに」でした。

今年はBigQueryの連携業務がメインだったので、連携基盤の構築方法をまとめていきたいと思います。

最初は、Embulkを使ってMercariさんのブログで書かれていた方法(数百GBのデータをMySQLからBigQueryへ同期する)でやっていたのですが、
一時的にテーブルを作成しないといけないのと、Embulkを使う人が社内で少ないので、Pythonのみで構築する方法を考えていました。
その時にpandas.ioのgbqが便利だったので、pandas.ioのgbqを使った構築方法を書いていきます。

BigQueryで差分更新をするやり方

BigQueryで差分更新するフローとしては次のようになります。

BigQueryにデータをappendする

BigQueryの分析関数で一意に抽出する

テーブルを削除してから、抽出したデータを用いて登録する

といった流れです。

分析関数でやる方法はMercariさんのブログで載っていた方法と同様です。
BigQueryの分析関数であるrow_number()を用います。
一度重複で登録したテーブルからキーとなるものでソートをかけて、row_numberが1となるものを抽出して、抽出したデータを再度もとのテーブルへとtruncateしてから再度登録します。

BigQueryへのデータ登録方法

BiqQueryへのデータ登録は基本的にファイルをloadするのがメインです。
BigQueryへのデータの読み込みの概要
CSVファイルなどを作って、登録する方法です。

今回は、ファイルを作成せずにそのままBigQueryへと流し込む方法として実装します。
DataFrameを作成して、そのデータを登録するといった方法です。
それが、pandas.ioのgbqを使った方法です。

pandas.ioのgbqを使ってデータ登録

gbqとBigQueryの環境構築はこちらに詳しく記載しているので、割愛します。
pandasとGoogle BigQueryを連携させる
私がOracleからgbqを使って差分更新するために使っているロジックをもとに詳細を説明していきたいと思います。

先に全貌から載せます。


import os 
import cx_Oracle
import pandas as pd
from pandas.io import gbq
from google.cloud import bigquery

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'credentials.json'


def db_connect(user, password):
    conn = cx_Oracle(user, password, 'ip/service_name')


def db_select(sql, conn):
    df = pd.read_sql(sql, conn)
    return df


def get_sql(start_date, end_date):
    sql = """
    select
      id,
      update_time,
      name,
      zip,
      pref,
      county
    from hoge.user_info
    where '""" + start_date + """' <= update_time and update_time < '""" + end_date + """'
    """


def insert_bigquery(df, dataset_name, table_name, project_id):
    gbq.to_gbq(df, dataset_name + "." + table_name, project_id, if_exists='append')


def merge_bigquery(project_id, dataset_name, table_name, partition_key, sort_key):
    client  = bigquery.Client(project_id)
    job_config = bigquery.QueryJobConfig()
    table_ref = client.dataset(dataset_name).table(table_name)
    job_config.destination = table_ref
    job_config.write_disposition = 'WRITE_TRUNCATE'
    sql = """
      select
        * except(rn)
      from (
        select
          *,
          row_number() over(partition by """ + partition_key + """ order by """ + sort_key + """ ) as rn
        from """ + dataset_name + """.""" + table_name + """
      )
      where rn = 1
    """
    query_job = client.query(
        sql,
        location="US"
        job_config=job_config,
    )
    return query_job.result()


def main():
    connection = db_connect('username', 'password')
    start_date = '2018-11-01 00:00:00'
    end_date = '2018-12-01 00:00:00'
    sql = get_sql(start_date, end_date)
    df = db_select(sql, connection)
    df['COUNTY'] = df['COUNTY'].astype('object')
    dataset_name = 'HOGE'
    table_name = 'TEST'
    project_id = 'abcd-1234'
    insert_bigquery(df, dataset_name, table_name, project_id)
    partition_key = 'id'
    sort_key = 'update_time desc'
    merge_bigquery(project_id, dataset_name, table_name, partition_key, sort_key)

Oracleからデータ取得

Oracleからのデータ取得にはcx_Oracleを使用しています。
インストールなどについては下記を参照していただくとよいです。
pythonからOracleに接続するための拡張モジュール、cx_Oracleの基本的な使い方

cur.execute()でも取得することはできるのですが、すぐにDataFrameに入れたい場合には、pd.read_sqlを使用するのがいいです。
カラム名もその時に保持してくれるのが便利ですね。

def db_select(sql, conn):
    df = pd.read_sql(sql, conn)
    return df

BigQueryの差分更新

厳密には差分更新といっていいのかわからないですが、一度古いのも含めてそのテーブルの中で新しい情報のみを抽出して再度生成しているような流れです。

ですので、まずはデータの重複などを気にせずにBigQueryへとappendします。
ここで、gbqを使います。gbqを使用するとDataFrameをそのまま渡すことができます。

def insert_bigquery(df, dataset_name, table_name, project_id):
    gbq.to_gbq(df, dataset_name + "." + table_name, project_id, if_exists='append')

そのあとに重複状態のテーブルから一意となるように抽出して、そのデータを再度テーブルに登録するという処理を
merge_bigqueryで行っております。


def merge_bigquery(project_id, dataset_name, table_name, partition_key, sort_key):
    client  = bigquery.Client(project_id)
    job_config = bigquery.QueryJobConfig()
    table_ref = client.dataset(dataset_name).table(table_name)
    job_config.destination = table_ref
    job_config.write_disposition = 'WRITE_TRUNCATE'
    sql = """
      select
        * except(rn)
      from (
        select
          *,
          row_number() over(partition by """ + partition_key + """ order by """ + sort_key + """ ) as rn
        from """ + dataset_name + """.""" + table_name + """
      )
      where rn = 1
    """
    query_job = client.query(
        sql,
        location="US"
        job_config=job_config,
    )
    return query_job.result()

row_number()で番号付けしたものをSELECTして、その後にexceptで不要なカラムであるrnを除外しています。

DataFrameを扱う上での注意点

例としては適切ではないかもしれないですが、main処理でdf['COUNTY'] = df['COUNTY'].astype('object')
と行っております。
DataFrameではカラムの型を自動的に推定してくれます。

そのため、
抽出した時点の内容のデータではintを期待してたがnullが入ってきて、float型に変換される。
Stringを期待していたが、float型に変更されてしまった。なんてことがあります。

なので、データの内容が設計上何が入ってくるか読めない場合にはあらかじめ設定しておくことによって、
データ登録失敗を回避することができます。

def main():
    # ~~~略~~~
    df = db_select(sql, connection)
    df['COUNTY'] = df['COUNTY'].astype('object')
    dataset_name = 'HOGE'
    table_name = 'TEST'
    project_id = 'abcd-1234'
    # ~~~略~~~

まとめ

以上、BigQueryの差分更新をを行うプログラムの解説をいたしました。
BigQueryをpandas.ioのgbqを使用して行うと、Pythonのみでの構築が可能となります。
embulkを使用するのも有効ですが、その環境での保守性も考えた構築が必要になることも大いにあるので、その参考となればと思います。

明日の記事は@DotaKobayashiさんです。よろしくお願いいたします。

4
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
6