LoginSignup
0
0

More than 3 years have passed since last update.

csvを加工してBigQueryにAPPENDする

Posted at

csvを加工してBigQueryにAppendする方法のメモ。

残課題

BigQueryのテーブルで、DATE型の列があるときのpandas.DataFrameからの書き出しが上手くいかない。

まずはColaboratoryでテスト

Colaboratoryからはうまく動作することを確認

ColaboratoryでBigQueryとGSCへアクセスするための認証
from google.colab import auth
auth.authenticate_user()
project_id = 'xxx'
!gcloud config set project {project_id}
BigQueryテーブルの最新データを取得
from google.cloud import bigquery as bq
from google.cloud import storage
dataset = "aaa"
table = "bbb"

bq_client = bq.Client(project_id) #初期化
query = "SELECT max(dy_date) as max_date FROM `{0}.{1}.{2}`".format(project_id, dataset, table)
rows = bq_client.query(query).result()
for r in rows:
  max_date = r['max_date']
max_date
GCSからcsvを読み込み、pandasで加工
import pandas as pd
import datetime as dt

bucket_name = 'yyy'
file_read   = 'test.csv'

s_client = storage.Client(project_id) #初期化
s_bucket = s_client.get_bucket(bucket_name)
blob = storage.Blob(file_read, s_bucket)
data = blob.download_as_string()
df = pd.read_csv(BytesIO(data))

df['date'] = pd.to_datetime(df['date'], format='%Y%m%d') #'date'列を日付型に変更
df = df[df['date'].dt.date >max_date] #BigQueryテーブルの最新日付よりも後の日付のみに絞る
df['date'] = df['date'].dt.date #日付型に変更しておく(Object型になる)
pandas.DataframeをBigQueryのテーブルにAPPEND
bq_client = bq.Client(project=project_id) #初期化
table_ref = bq_client.dataset(dataset_name).table(table_name) #BigQueryのテーブルを指定

#BigQuery のJobConfigを作成
job_config = bq.job.LoadJobConfig()
job_config.write_disposition = bq.WriteDisposition.WRITE_APPEND

#BigQueryのテーブルにデータをAPPEND
load_job = bq_client.load_table_from_dataframe(
    df
    ,table_ref
    ,job_config = job_config
    ,location   = 'US'
    ,project    = project_id
)  # API request

CloudFunctionsで実装

①BigQueryに書き出すためには、pyarrow, fastparquetが必要。pyarrowはバージョンの指定が必要。

main.pyの一部
import pyarrow
import fastparquet
reqirment.txt
google-cloud-storage==1.16.1
google-cloud-bigquery==1.15.0
pandas
pyarrow>=0.4
fastparquet

②BigQueryに書き出すためには、テーブルの列はすべてNULLABLEでないとならない。

③BigQueryのテーブルDATE型に指定しており、datetime.dateで型を変更したが、型の不一致でうまくいかなかった。
→一度GCSにcsvを出力し、csvからBigQueryへAPPENDする形に変更

DataframeをGCSのcsvに書き出す
s_client = storage.Client(project_id) #初期化
s_bucket = s_client.get_bucket(バケット名)
blob = storage.Blob(ファイル名.csv, s_bucket)
blob.upload_from_string(data=df.to_csv(sep=",", index=False), content_type='text/csv')
GCSからBigQueryへAPPEND
bq_client = bq.Client(project=project_id) #初期化
table_ref = bq_client.dataset(dataset_name).table(table_name)

#BigQuery のJobConfigを作成
job_config = bq.job.LoadJobConfig()
job_config.write_disposition = bq.WriteDisposition.WRITE_APPEND
job_config.skip_leading_rows = 1
job_config.source_format = bq.SourceFormat.CSV
job_config.allow_jagged_rows = True
job_config.ignore_unknown_values = True

#BigQueryのテーブルにデータをAPPEND
load_job = bq_client.load_table_from_uri(
    uri_out  # uri_out = 'gs://バケット名/ファイル名.csv
    ,table_ref
    ,job_config = job_config
    ,location   = 'US'
    ,project    = project_id
)  # API request
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0