SENSY株式会社のwasnotです。
Adventカレンダーにも水増し記事を投与していきます。
前回まで、Datastore->GCS、GCS変更->GAEときたので、この続きを書こうと思います。
元々からの話をすると、
なるべくGAE/SEのアプリを中心に、Datastore->Bigqueryを自動化したいという要求がありました。
手順を考えると、
- まず、GAEアプリからイベントを発行して、Datastoreの中身をGCSにExportします。
- 次に、GCSにDatastore backupが保存し終わったイベントをPub/Sub経由でGAEアプリに通知します。
- 最後に、受け取った通知を元に、BigQueryのDatastoreバックアップ読み込みジョブを開始します。
- この記事
この記事ではSEのAPIを叩くと、所定の場所のDatastoreバックアップをBigqueryに読ませる、というものを作ります。
BigQueryにDatastore backupを読ませる
BigQueryには色々なAPIがありますが、あまり使ったことないので、今回はloadのAPIだけ使います。
ドキュメントはこちらですね。
Loading Data From Cloud Datastore Backups
ロードの仕方はWeb UIやCLI, REST APIがあります。
CLIの例だと以下のようなコマンドを叩くことが多いと思います。
> bq load --source_format=DATASTORE_BACKUP --replace \
[PROJECT_ID]:[DATASET].[TABLE] \
gs://[BUCKET_NAME]/20171121-0615/all_namespaces/kind_User/all_namespaces_kind_User.export_metadata
Waiting on bqjob_r72af5ceb2736ff19_0000015fdd73417a_1 ... (0s) Current status: DONE
bucketのPATHは前回までの記事で作った日付prefixをつけています。
GAE上のPythonアプリから実行するにはこれに当たるREST APIを叩けば良さそうですね。
REST APIのリファレンスやJobsのInsertのページとTry API toolを使ってみると、わかりますが、
REST APIのbodyは以下のようにすれば上のコマンドと同じものになりそうです。
{
"configuration": {
"load": {
"sourceUris": [
"gs://[BUCKET]/20171121-0615/all_namespaces/kind_User/all_namespaces_kind_User.export_metadata"
],
"sourceFormat": "DATASTORE_BACKUP",
"destinationTable": {
"projectId": "[PROJECT_ID]",
"datasetId": "[DATASET]",
"tableId": "[TABLE]"
},
"writeDisposition": "WRITE_TRUNCATE"
}
}
}
"writeDisposition": "WRITE_TRUNCATE"
が--replace
指定と同じ意味だと思われます。
defaultのWRITE_APPEND
はDATASTORE_BACKUP
の読み込みでは使えないのでTRUNCATE
を指定しておくといいでしょう。
GAE/SE pythonのアプリからのロード
上のようなREST APIへのリクエストをAppEngine Standard Enviroment上のpythonアプリから行ってみます。
権限については、IAMでApp Engine default service account
にbigquery向けのものが付与されているとします。
色々な方法があると思いますが、いくつか書いてみます。
URLFetch
Datastoreのbackup APIを叩く時に、チュートリアル(エクスポートのスケジューリング)ではAppEngine/SE上で使えるURLFetch APIを使った例が載っていました。
REST APIの形式もほぼ同じなのでまずはURLFetchを使ったパターンが外部パッケージを入れなくて済むので簡単そうです。
import httplib
import json
import logging
import webapp2
from google.appengine.api import app_identity
from google.appengine.api import urlfetch
BASE_SOURCE_URI = "gs://{bucket}/{target_dir}/all_namespaces/kind_{kind}/all_namespaces_kind_{kind}.export_metadata"
class ImportBackup(webapp2.RequestHandler):
def get(self):
access_token, _ = app_identity.get_access_token('https://www.googleapis.com/auth/bigquery')
app_id = app_identity.get_application_id()
bucket = 'my-bucket'
kind = 'User'
target_dir = '20171121-0615'
dataset = 'Datastore'
request = {
"configuration": {
"load": {
"sourceUris": [
BASE_SOURCE_URI.format(bucket=bucket, target_dir=target_dir, kind=kind)
],
"sourceFormat": "DATASTORE_BACKUP",
"destinationTable": {
"datasetId": 'Datastore',
"projectId": app_id,
"tableId": kind
},
"writeDisposition": "WRITE_TRUNCATE"
}
}
}
url = 'https://www.googleapis.com/bigquery/v2/projects/{}/jobs'.format(app_id)
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + access_token
}
try:
result = urlfetch.fetch(
url=url,
payload=json.dumps(body),
method=urlfetch.POST,
deadline=60,
headers=headers)
if result.status_code == httplib.OK:
logging.info(result.content)
elif result.status_code >= 500:
logging.error(result.content)
else:
logging.warning(result.content)
return result.status_code
except urlfetch.Error:
logging.exception('Failed to initiate export.')
return httplib.INTERNAL_SERVER_ERROR
メリット
- AppEngineの場合はaccess_tokenが手軽に取得できるから、普通のREST APIリクエストとしてかける
- 外部パッケージが不要なので動作確認が最低限で済む。
デメリット
- リクエスト内容を自分でちゃんと書かないとダメ→ミス・バグが起こりやすい
- 記述量は多め
google-api-python-client
GoogleのREST API向けにpython用のクライアントライブラリが用意されているので、こちらを使う方法もいくつか見かけました。
google-api-python-client
- GAE/Python→BigQueryへのデータ投入、Spreadsheetからのクエリ実行&集計
- [GoogleCloudPlatform] API Client Libraryを用いてGoogle Cloud APIを利用する
- BigQueryをpythonから利用する。
ただ、このライブラリはメンテナンスモードになってしまっているので、あまりお勧めはしません。
あと、あまり記述量が減ったりはしない気がします。
import logging
import httplib2
import webapp2
from google.appengine.api import app_identity
from googleapiclient import discovery, errors
from oauth2client.client import GoogleCredentials
BASE_SOURCE_URI = "gs://{bucket}/{target_dir}/all_namespaces/kind_{kind}/all_namespaces_kind_{kind}.export_metadata"
class ImportBackup(webapp2.RequestHandler):
def get(self):
credentials = GoogleCredentials.get_application_default()
app_id = app_identity.get_application_id()
bucket = 'my-bucket'
kind = 'User'
target_dir = '20171121-0615'
dataset = 'Datastore'
service = discovery.build('bigquery', 'v2', credentials=credentials)
request = {
"configuration": {
"load": {
"sourceUris": [
BASE_SOURCE_URI.format(bucket=bucket, target_dir=target_dir, kind=kind)
],
"sourceFormat": "DATASTORE_BACKUP",
"destinationTable": {
"datasetId": dataset,
"projectId": app_id,
"tableId": kind
},
"writeDisposition": "WRITE_TRUNCATE"
}
}
}
try:
service.jobs().insert(projectId=app_id, body=request).execute()
self.response.status_int = 200
except errors.HttpError as e:
logging.error(e.content)
self.response.status_int = e.resp.status
except httplib2.HttpLib2Error:
logging.exception('Failed to initiate export.')
self.response.status_int = 500
ちなみにgoogleのpubsubサンプルである、cloud-pubsub-samples-pythonでもこの方法を見かけたので、動作の信頼性はたかそうです。
メリット
- REST APIの書き方をラップしてくれる
- 公式ラッパーなので信頼性が高い?
デメリット
- 外部パッケージのインストールが必要
- メンテナンスモードのライブラリで不安
- リクエスト内容は記述する必要がある
外部パッケージのインストール方法は以下を参考にしてください
Using third-party libraries
BigQuery-Python
google-api-python-clientがイケてないということで、このclientをさらにラップしたライブラリを出しています。
BigQuery-Python
12月からSENSYに入社してくれたitkrさんの記事が参考になります!
PythonからBigQueryを操作するときは BigQuery-Python が便利だった
import logging
import webapp2
from bigquery import get_client, JOB_SOURCE_FORMAT_DATASTORE_BACKUP, JOB_WRITE_TRUNCATE, BIGQUERY_SCOPE
from bigquery.errors import JobInsertException
from google.appengine.api import app_identity
from oauth2client.client import GoogleCredentials
BASE_SOURCE_URI = "gs://{bucket}/{target_dir}/all_namespaces/kind_{kind}/all_namespaces_kind_{kind}.export_metadata"
class ImportBackup(webapp2.RequestHandler):
def get(self):
credentials = GoogleCredentials.get_application_default()
if credentials.create_scoped_required():
credentials = credentials.create_scoped(BIGQUERY_SCOPE)
app_id = app_identity.get_application_id()
bucket = 'my-bucket'
kind = 'User'
target_dir = '20171206-1800'
dataset = 'Datastore'
client = get_client(app_id, credentials=credentials)
try:
client.import_data_from_uris(
source_uris=[BASE_SOURCE_URI.format(bucket=bucket, target_dir=target_dir, kind=kind)],
write_disposition=JOB_WRITE_TRUNCATE,
dataset=dataset,
table=kind,
source_format=JOB_SOURCE_FORMAT_DATASTORE_BACKUP
)
self.response.status_int = 200
except JobInsertException as e:
logging.error(e.message)
self.response.status_int = 500
メリット
- 簡単にかける
デメリット
- 外部パッケージのインストールが必要
- 非公式ラッパー
外部パッケージもgoogle-api-python-clientに追加でいくつか程度なので、あまり変わらないです。
REST APIの仕様変更に追従するのが難しいかとも思いましたが、Breakingな変更だったら結局書き直しが発生するので、使いやすいラッパーを使うメリットは大きいかなと思います。
どれがいい?
三つも方法を書きましたが、結局REST APIのリクエストなので特にどれでもいいですね、、
AppEngine/SEからでも意外とスムーズにリクエストができたので、いい感じですね。
現在プロジェクトでは、google-api-clientは使いづらいし、
BigQueryをあまり使わないのに追加でライブラリをインストールするのも抵抗があり、
URLFetchでの実装をしていますが、やはりおすすめはBigQuery-Pythonでしょうか。
好みや要件で選ぶといいと思います。
まとめ
今回は3連続で記事を書いて、DatastoreからBigQueryへのインポートを自動化してみました。
きっかけはds2bqというgoでのサンプルに感化されてpythonでもやってみよう!ということでした。
python版サンプルは、もう少し整理して、公開しようと思っています。
また、今回の記事は違いますが、前の二つはどうせならBeta機能を試そう!ということで、まだベータが外れないうちに試しています。
既存の方法よりも洗練されていて使いやすいなぁ、という印象です。
注意点
今回のBigQueryへのロード依頼ですが、
「GCSの変更をPub/Sub経由でAppEngineに通知する」で受け取ったHandlerで直にREST APIにアクセスすると、
対象のTableやkindが複数あるケースなどで結構時間がかかってしまうので、
- pubsubのsubscriptionの時間制限を伸ばす(デフォルト10秒->30秒など?)
- 今回のBigQueryへのリクエストをTaskQueue内で行う
などの工夫をした方が良いと思います。