LoginSignup
4

More than 5 years have passed since last update.

posted at

updated at

gcloud-pythonを使って、GCSを経由せずにデータをBigQueryに投入する

ちょっとしたデータを、ローカルからGCS経由せずにBigQueryにデータをアップロードしようとするなら、普通はGoogle Cloud SDKのbqコマンドを使うと思います。

ただ、EC2のスポットインスタンスで動かしている日次バッチから自動でアップロードしたいとか考えた時に、Cloud SDKのセットアップ自動化と認証の設定が存外面倒な感じがしておりまして。

pythonのgcloudパッケージを使うと、少なくともセットアップ自動化・認証の設定は楽に行けたので、メモを残しておきます。

Google Cloud Python Client
https://github.com/GoogleCloudPlatform/gcloud-python

準備

サービスアカウントの設定

GCPのサービスアカウントを設定して、認証用のjsonを予め取得しておくことが必要。

詳細は「GCP サービスアカウント」等で検索していだければ。

gcloudパッケージのインストール

pipで一発で入ってくれるはず。

pip install gcloud

Code

以下を保存してpython -u upload.pyで実行。タブ区切り、一行目はheader、同じ名前のテーブルが既にあったら消すの設定になってますので、適宜変更していただければ。

upload.py
from gcloud import bigquery
from gcloud.bigquery import SchemaField

json_key = '/my-folder/my-project-xxxxxxxxxxxx.json'
project_name = 'my-project-1234'
dataset_name = 'my-dataset'


def run():
    schema = [
        SchemaField('Col1', 'INTEGER', mode='required'),
        SchemaField('Col2', 'BOOLEAN', mode='required'),
        SchemaField('Col3', 'STRING', mode='nullable'),
        SchemaField('Col3', 'FLOAT', mode='nullable'),
    ]
    input_file_path = '/my-folder/input.txt'
    header_rows_to_skip = 1
    table_name = 'MyTable'
    csv_delimiter = '\t'
    delete_existing_table = True
    upload_one_csv_to_bq(input_file_path, csv_delimiter, header_rows_to_skip, table_name, schema, delete_existing_table)


def upload_one_csv_to_bq(local_csv_file_path, delimiter, header_rows_to_skip, 
                         table_name, table_schema, delete_existing_table):
    client = bigquery.Client.from_service_account_json(json_key, project=project_name)
    dataset = client.dataset(dataset_name)
    table = dataset.table(name=table_name)
    if delete_existing_table:
        if table.exists():
            print 'Delete existing table: ' + table_name
            table.delete()
    table.schema = table_schema
    table.create()
    with open(local_csv_file_path, 'rb') as file_obj:
        table.upload_from_file(file_obj, 'CSV', field_delimiter=delimiter, skip_leading_rows=header_rows_to_skip, ignore_unknown_values=False)
    print 'Done'


###########
# Entry Point
###########
if __name__ == "__main__":
    run()

Amazon Linux 2016.03のpython2.7.10と、手元のwindowsに入っているpython 2.7.11では動きました。日本語含むデータでもUTF-8なら行けますね。

余談

最初は適当にぐぐって出てきた以下を見ながらやってたんですが(Inserting data (synchronous)の項)、Windowsでは動くのに、EC2に持ってくとtable.insert_data(rows) # API requestのあたりで「broken pipe」などと嫌なエラーが。

https://googlecloudplatform.github.io/gcloud-python/stable/bigquery-usage.html

で、いろいろ回り道した挙句table.upload_from_fileを使うという形に。まあ、問題が出たら諦めてコード読むことをお勧めします

https://github.com/GoogleCloudPlatform/gcloud-python/tree/master/gcloud/bigquery

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
4