BigQueryをpythonで使う。
BigQueryはGoogleの提供しているとても早いデータ分析環境だ。なんでもデータを何千台のマシンに分散して処理しているそうで、まさにGoogleにしかできそうもない。
縁がありBigQueryを評価する機会に恵まれた。触ってみたら結構早かった。pythonから触ってみる方法を試したので共有したい。
事前準備
- BigQuery単体で使えるようにしておく。
- Billingを設定しないと使えないかもしれない。
- BigQueryAPIのアクセスは有効にしておく。
2015-09-14 修正
下の方法を取らずとも、
というライブラリが出ているようです。
参考の為下の記述を残します。
環境整備
ubuntu13.10で試した。
pythonで使うには以下のようなライブラリを入れる。pip
は事前に入れておくこと。
sudo pip install --upgrade google-api-python-client
sudo pip install --upgrade oauth2client
sudo pip install httplib
bigqueryのOauth認証を生成して、p12の鍵を手に入れる。
Googleのデベロッパーコンソールの認証のところから、クライアント鍵を作る。p12という鍵ファイルができるのでそれを利用する。
pythonを利用してのアクセス。
こちらのstack over flowを参考にした。
同期(sync)と非同期(async)両方をメソッドを試した。試した環境は自分のニコニコ動画データセットの環境で試したのであまり一般性は無い。見たところ、日本語(utf-8)を用いても正常にマッチするようだ。
#!/usr/bin/python
# -*- coding:utf-8 -*-
import httplib2
import logging
import time
from apiclient.discovery import build
from oauth2client.client import SignedJwtAssertionCredentials
import pprint
# REPLACE WITH YOUR Project ID
PROJECT_NUMBER = 'XXXXXXXXXXXX'
# REPLACE WITH THE SERVICE ACCOUNT EMAIL FROM GOOGLE DEV CONSOLE
SERVICE_ACCOUNT_EMAIL ='xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx@developer.gserviceaccount.com'
KEYFILE='xxxxxxxxxxxxxxxx.p12'
class BQC(object):
def __init__(self):
f = file(KEYFILE, 'rb')
self.key = f.read()
f.close()
def _credential(self):
credentials = SignedJwtAssertionCredentials(
SERVICE_ACCOUNT_EMAIL,
self.key,
scope='https://www.googleapis.com/auth/bigquery')
http = httplib2.Http()
http = credentials.authorize(http)
service = build('bigquery', 'v2')
return http,service
def datalist(self,service,http):
datasets = service.datasets()
response = datasets.list(projectId=PROJECT_NUMBER).execute(http)
print('Dataset list:\n')
for dataset in response['datasets']:
print("%s\n" % dataset['id'])
def show_result(self,result):
pprint.pprint(result)
print 'Query Results:'
for row in result['rows']:
result_row = []
for field in row['f']:
if field['v']:
result_row.append(field['v'])
print ('\t').join(result_row)
def sync_method(self,service,http):
q='SELECT * FROM [nicodata_test.videoinfo] WHERE title like "%{}%" LIMIT 10;'.format(u'ボカロ'.encode('utf-8'))
print q
#q='SELECT TOP(title, 30) as title, COUNT(*) as revision_count FROM [publicdata:samples.wikipedia] WHERE wp_namespace = 0;'
query_request=service.jobs()
query_data={'query':q}
query_response = query_request.query(projectId=PROJECT_NUMBER,
body=query_data).execute(http)
def async_method(self,service,http):
q='SELECT * FROM [nicodata_test.videoinfo] WHERE title like "%{}%" LIMIT 90;'.format(u'ボカロ'.encode('utf-8'))
q='SELECT * FROM [nicodata_test.comment_data] WHERE comment like "%{}%" LIMIT 90;'.format(u'ボカロ'.encode('utf-8'))
print q
query_request=service.jobs()
query_data={'query':q}
query_data = {
'configuration': {
'query': {
'query': q,
}
}
}
ft=time.time()
insertResponse = query_request.insert(projectId=PROJECT_NUMBER,
body=query_data).execute(http)
print 'start query diff={}'.format(time.time()-ft)
while True:
print 'stat_get'
ft2=time.time()
status = query_request.get(projectId=PROJECT_NUMBER, jobId=insertResponse['jobReference']['jobId']).execute(http)
print 'end get diff={}'.format(time.time()-ft2)
currentStatus = status['status']['state']
if 'DONE' == currentStatus:
print 'sql done'
break
else:
print 'Waiting for the query to complete...'
print 'Current status: ' + currentStatus
print time.ctime()
time.sleep(0.1)
currentRow = 0
queryReply = query_request.getQueryResults(
projectId=PROJECT_NUMBER,
jobId=insertResponse['jobReference']['jobId'],
startIndex=currentRow).execute(http)
while(('rows' in queryReply) and currentRow < queryReply['totalRows']):
#self.show_result(queryReply)
currentRow += len(queryReply['rows'])
queryReply = query_request.getQueryResults(
projectId=PROJECT_NUMBER,
jobId=queryReply['jobReference']['jobId'],
startIndex=currentRow).execute(http)
print currentRow
def show(self):
http,service=self._credential()
#self.datalist(service,http)
#result=self.sync_method(service,http)
result=self.async_method(service,http)
#self.show_result(result)
def main():
bqc=BQC()
bqc.show()
if __name__=='__main__':main()