Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
Help us understand the problem. What is going on with this article?

gcloud-pythonを使って、GCSを経由せずにデータをBigQueryに投入する

More than 3 years have passed since last update.

ちょっとしたデータを、ローカルからGCS経由せずにBigQueryにデータをアップロードしようとするなら、普通はGoogle Cloud SDKのbqコマンドを使うと思います。

ただ、EC2のスポットインスタンスで動かしている日次バッチから自動でアップロードしたいとか考えた時に、Cloud SDKのセットアップ自動化と認証の設定が存外面倒な感じがしておりまして。

pythonのgcloudパッケージを使うと、少なくともセットアップ自動化・認証の設定は楽に行けたので、メモを残しておきます。

Google Cloud Python Client
https://github.com/GoogleCloudPlatform/gcloud-python

準備

サービスアカウントの設定

GCPのサービスアカウントを設定して、認証用のjsonを予め取得しておくことが必要。

詳細は「GCP サービスアカウント」等で検索していだければ。

gcloudパッケージのインストール

pipで一発で入ってくれるはず。

pip install gcloud

Code

以下を保存してpython -u upload.pyで実行。タブ区切り、一行目はheader、同じ名前のテーブルが既にあったら消すの設定になってますので、適宜変更していただければ。

upload.py
from gcloud import bigquery
from gcloud.bigquery import SchemaField

json_key = '/my-folder/my-project-xxxxxxxxxxxx.json'
project_name = 'my-project-1234'
dataset_name = 'my-dataset'


def run():
    schema = [
        SchemaField('Col1', 'INTEGER', mode='required'),
        SchemaField('Col2', 'BOOLEAN', mode='required'),
        SchemaField('Col3', 'STRING', mode='nullable'),
        SchemaField('Col3', 'FLOAT', mode='nullable'),
    ]
    input_file_path = '/my-folder/input.txt'
    header_rows_to_skip = 1
    table_name = 'MyTable'
    csv_delimiter = '\t'
    delete_existing_table = True
    upload_one_csv_to_bq(input_file_path, csv_delimiter, header_rows_to_skip, table_name, schema, delete_existing_table)


def upload_one_csv_to_bq(local_csv_file_path, delimiter, header_rows_to_skip, 
                         table_name, table_schema, delete_existing_table):
    client = bigquery.Client.from_service_account_json(json_key, project=project_name)
    dataset = client.dataset(dataset_name)
    table = dataset.table(name=table_name)
    if delete_existing_table:
        if table.exists():
            print 'Delete existing table: ' + table_name
            table.delete()
    table.schema = table_schema
    table.create()
    with open(local_csv_file_path, 'rb') as file_obj:
        table.upload_from_file(file_obj, 'CSV', field_delimiter=delimiter, skip_leading_rows=header_rows_to_skip, ignore_unknown_values=False)
    print 'Done'


###########
# Entry Point
###########
if __name__ == "__main__":
    run()

Amazon Linux 2016.03のpython2.7.10と、手元のwindowsに入っているpython 2.7.11では動きました。日本語含むデータでもUTF-8なら行けますね。

余談

最初は適当にぐぐって出てきた以下を見ながらやってたんですが(Inserting data (synchronous)の項)、Windowsでは動くのに、EC2に持ってくとtable.insert_data(rows) # API requestのあたりで「broken pipe」などと嫌なエラーが。

https://googlecloudplatform.github.io/gcloud-python/stable/bigquery-usage.html

で、いろいろ回り道した挙句table.upload_from_fileを使うという形に。まあ、問題が出たら諦めてコード読むことをお勧めします

https://github.com/GoogleCloudPlatform/gcloud-python/tree/master/gcloud/bigquery

pyr_revs
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away