LoginSignup
11
8

More than 3 years have passed since last update.

PythonでGoogle Cloud Storage(GCS)、BigQueryへpandas DataFrameをcsv保存するclass

Last updated at Posted at 2019-10-12

やること

PythonでデータをGCSへ保存する、BigQueryへ保存&BigQueryからデータを取得するクラスを使って、
データを保存したり、取得したりする。

目的

pandas dataframeの形から、そのままGCSへ直接フリーキックする日本語の記事がなかったため、
記事を書いた。
オブジェクト指向がなんやらとか、クラスの使い方とかようわからんが、
このクラスを継承し、よく遊んでるので、参考になるやもしれぬ。

迷える子羊達よ。神の加護をあらんことを。

コード実行の前提条件

  1. 自ら機会を作り出し、機会によって自らを変えている
  2. この世で一匹光通信の営業マンである
  3. GCP credential.jsonファイルをダウンロードしている

ということでざっくり説明しますが、細かいところは自分で調べてちょ。

クラス

sample_class.py
from datetime import datetime
from google.cloud import storage
from google.cloud import bigquery
import pandas as pd
import os
import re

# gcp credentailファイルの読み込み
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/USERNAME/GOOGLE_CREDENTIAL.json"

class UpDownLoad:
    # BQクライアント初期化
    bq_client = bigquery.Client()

    # ファイルタイプ一覧の辞書
    content_dict = {
        "csv": "text/csv",
        "txt": "text/plain",
        "jpeg|jpg": "image/jpeg",
        "png": "image/png",
        "pdf": "application/pdf",
        "zip": "application/zip"
    }

    def data2gcs(self, data, bucket_name, folder_name, file_name):
        try:
            # GCSクライアント初期化
            client = storage.Client()

            # バケットオブジェクト取得
            bucket = client.get_bucket(bucket_name)

            # 保存先フォルダとファイル名作成
            blob = bucket.blob(os.path.join(folder_name, file_name))

            # コンテントタイプ判定
            for k, v in self.content_dict.items():
                if re.search(rf"\.{k}$", file_name):
                    content_type = v
            # 保存
            if content_type == 'text/csv':
                blob.upload_from_string(data=data.to_csv(sep=",", index=False), content_type=content_type)
            else:
                blob.upload_from_string(data=data, content_type=content_type)
            return "success"
        except Exception as e:
            return str(e)

    def make_bq(self, df, table_name):
        try:
            # _TABLE_SUFFIX作成のためのdate
            date = datetime.today().strftime("%Y%m%d")
            dataset, new_table = table_name.split(".")
            dataset_ref = self.bq_client.dataset(dataset)
            table_ref = dataset_ref.table(f'{new_table}_{date}')
            self.bq_client.load_table_from_dataframe(df.astype("str"), table_ref).result()
            return "success"
        except Exception as e:
            return str(e)

    def read_bq(self, query):
        return self.bq_client.query(query).to_dataframe()

pandas dataframeをGCSへCSVとして保存するコード

gcs.py
# 初期化
load = UpDownLoad()

# データフレーム作成
data = pd.DataFrame([i for i in range(100)], columns=["test_col"])
date = datetime.today().strftime("%Y%m%d%H%M%S")
bucket_name = "qiita_test"
folder_name = "test"
file_name = f"{date}_{folder_name}.csv"

# データをGCSへアップロードする
res = load.data2gcs(data, bucket_name, folder_name, file_name)
print(res)

pandas dataframeをBigQueryへ保存&取得するコード

bq.py
# 初期化
load = UpDownLoad()

# データをBigQueryへアップロードする
table_name = "test.qiita_test"
res_bq = load.make_bq(data, table_name)
print(res_bq)

# BigQueryへアップロードしたデータをデータフレームとして取得
project_id = "自分のPROJECT_ID"
bq_tb = f"`{project_id}.{table_name}_*`"
query = f"select * from {bq_tb}"
bq_df = load.read_bq(query)

注意点

data2gcsに関して、csv以外のデータを保存する際は、引数としてdataを与える前に、
事前に加工しとくべし。
こんな感じ。jpgとかはImageとか使いなや。

read.py
with open("./txt.txt") as f:
    data = f.read()

GCSへ保存する際には、事前にbucketを作成しておく必要がある。
また、BigQueryへ保存するもデータセットは先に作成しておく必要がある(うろ覚え)。

参考URL

GoogleのPython clientドキュメント見てたが、探すのがちと面倒なので、記載なし。

11
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
8