LoginSignup
44
47

More than 5 years have passed since last update.

BigQueryをpythonから利用する。

Last updated at Posted at 2014-10-07

BigQueryをpythonで使う。

BigQueryはGoogleの提供しているとても早いデータ分析環境だ。なんでもデータを何千台のマシンに分散して処理しているそうで、まさにGoogleにしかできそうもない。

縁がありBigQueryを評価する機会に恵まれた。触ってみたら結構早かった。pythonから触ってみる方法を試したので共有したい。

事前準備

  • BigQuery単体で使えるようにしておく。
  • Billingを設定しないと使えないかもしれない。
  • BigQueryAPIのアクセスは有効にしておく。

2015-09-14 修正

下の方法を取らずとも、

というライブラリが出ているようです。
参考の為下の記述を残します。

環境整備

ubuntu13.10で試した。

pythonで使うには以下のようなライブラリを入れる。pipは事前に入れておくこと。

sudo pip install --upgrade google-api-python-client
sudo pip install --upgrade oauth2client
sudo pip install httplib

bigqueryのOauth認証を生成して、p12の鍵を手に入れる。

Googleのデベロッパーコンソールの認証のところから、クライアント鍵を作る。p12という鍵ファイルができるのでそれを利用する。

WS000000.JPG

pythonを利用してのアクセス。

こちらのstack over flowを参考にした。

同期(sync)と非同期(async)両方をメソッドを試した。試した環境は自分のニコニコ動画データセットの環境で試したのであまり一般性は無い。見たところ、日本語(utf-8)を用いても正常にマッチするようだ。

#!/usr/bin/python
# -*- coding:utf-8 -*-
import httplib2
import logging
import time
from apiclient.discovery import build
from oauth2client.client import SignedJwtAssertionCredentials
import pprint

# REPLACE WITH YOUR Project ID
PROJECT_NUMBER = 'XXXXXXXXXXXX'
# REPLACE WITH THE SERVICE ACCOUNT EMAIL FROM GOOGLE DEV CONSOLE
SERVICE_ACCOUNT_EMAIL ='xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx@developer.gserviceaccount.com'
KEYFILE='xxxxxxxxxxxxxxxx.p12'


class BQC(object):
    def __init__(self):
        f = file(KEYFILE, 'rb')
        self.key = f.read()
        f.close()

    def _credential(self):
        credentials = SignedJwtAssertionCredentials(
            SERVICE_ACCOUNT_EMAIL,
            self.key,
            scope='https://www.googleapis.com/auth/bigquery')

        http = httplib2.Http()
        http = credentials.authorize(http)

        service = build('bigquery', 'v2')
        return http,service

    def datalist(self,service,http):
        datasets = service.datasets()
        response = datasets.list(projectId=PROJECT_NUMBER).execute(http)
        print('Dataset list:\n')
        for dataset in response['datasets']:
            print("%s\n" % dataset['id'])

    def show_result(self,result):
        pprint.pprint(result)
        print 'Query Results:'
        for row in result['rows']:
            result_row = []
            for field in row['f']:
                if field['v']:
                    result_row.append(field['v'])
            print ('\t').join(result_row)



    def sync_method(self,service,http):
        q='SELECT *  FROM [nicodata_test.videoinfo] WHERE title like "%{}%" LIMIT 10;'.format(u'ボカロ'.encode('utf-8'))
        print q
        #q='SELECT TOP(title, 30) as title, COUNT(*) as revision_count FROM [publicdata:samples.wikipedia] WHERE wp_namespace = 0;'
        query_request=service.jobs()
        query_data={'query':q}
        query_response = query_request.query(projectId=PROJECT_NUMBER,
                                             body=query_data).execute(http)
    def async_method(self,service,http):
        q='SELECT *  FROM [nicodata_test.videoinfo] WHERE title like "%{}%" LIMIT 90;'.format(u'ボカロ'.encode('utf-8'))
        q='SELECT *  FROM [nicodata_test.comment_data] WHERE comment like "%{}%" LIMIT 90;'.format(u'ボカロ'.encode('utf-8'))

        print q
        query_request=service.jobs()
        query_data={'query':q}
        query_data = {
            'configuration': {
                'query': {
                    'query': q,
                    }
                }
            }

        ft=time.time()
        insertResponse = query_request.insert(projectId=PROJECT_NUMBER,
                                         body=query_data).execute(http)
        print 'start query diff={}'.format(time.time()-ft)
        while True:
            print 'stat_get'
            ft2=time.time()
            status = query_request.get(projectId=PROJECT_NUMBER, jobId=insertResponse['jobReference']['jobId']).execute(http)
            print 'end get diff={}'.format(time.time()-ft2)
            currentStatus = status['status']['state']
            if 'DONE' == currentStatus:
                print 'sql done'
                break
            else:
                print 'Waiting for the query to complete...'
                print 'Current status: ' + currentStatus
                print time.ctime()
                time.sleep(0.1)
        currentRow = 0
        queryReply = query_request.getQueryResults(
            projectId=PROJECT_NUMBER,
            jobId=insertResponse['jobReference']['jobId'],
            startIndex=currentRow).execute(http)

        while(('rows' in queryReply) and currentRow < queryReply['totalRows']):
            #self.show_result(queryReply)
            currentRow += len(queryReply['rows'])
            queryReply = query_request.getQueryResults(
                projectId=PROJECT_NUMBER,
                jobId=queryReply['jobReference']['jobId'],
                startIndex=currentRow).execute(http)

        print currentRow
    def show(self):
        http,service=self._credential()
        #self.datalist(service,http)
        #result=self.sync_method(service,http)
        result=self.async_method(service,http)
        #self.show_result(result)
def main():
    bqc=BQC()
    bqc.show()

if __name__=='__main__':main()





44
47
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
44
47