SENSYDay 5

GAE/SEからGCSにおいたファイルをBigQueryに読み込む

More than 1 year has passed since last update.

SENSY株式会社のwasnotです。

Adventカレンダーにも水増し記事を投与していきます。

前回まで、Datastore->GCS、GCS変更->GAEときたので、この続きを書こうと思います。

元々からの話をすると、

なるべくGAE/SEのアプリを中心に、Datastore->Bigqueryを自動化したいという要求がありました。

手順を考えると、

この記事ではSEのAPIを叩くと、所定の場所のDatastoreバックアップをBigqueryに読ませる、というものを作ります。


BigQueryにDatastore backupを読ませる

BigQueryには色々なAPIがありますが、あまり使ったことないので、今回はloadのAPIだけ使います。

ドキュメントはこちらですね。

Loading Data From Cloud Datastore Backups

ロードの仕方はWeb UIやCLI, REST APIがあります。

CLIの例だと以下のようなコマンドを叩くことが多いと思います。

> bq load --source_format=DATASTORE_BACKUP --replace \

[PROJECT_ID]:[DATASET].[TABLE] \
gs://[BUCKET_NAME]/20171121-0615/all_namespaces/kind_User/all_namespaces_kind_User.export_metadata
Waiting on bqjob_r72af5ceb2736ff19_0000015fdd73417a_1 ... (0s) Current status: DONE

bucketのPATHは前回までの記事で作った日付prefixをつけています。

GAE上のPythonアプリから実行するにはこれに当たるREST APIを叩けば良さそうですね。

REST APIのリファレンスJobsのInsertのページとTry API toolを使ってみると、わかりますが、

REST APIのbodyは以下のようにすれば上のコマンドと同じものになりそうです。


body

{

"configuration": {
"load": {
"sourceUris": [
"gs://[BUCKET]/20171121-0615/all_namespaces/kind_User/all_namespaces_kind_User.export_metadata"
],
"sourceFormat": "DATASTORE_BACKUP",
"destinationTable": {
"projectId": "[PROJECT_ID]",
"datasetId": "[DATASET]",
"tableId": "[TABLE]"
},
"writeDisposition": "WRITE_TRUNCATE"
}
}
}

"writeDisposition": "WRITE_TRUNCATE"--replace指定と同じ意味だと思われます。

defaultのWRITE_APPENDDATASTORE_BACKUPの読み込みでは使えないのでTRUNCATEを指定しておくといいでしょう。


GAE/SE pythonのアプリからのロード

上のようなREST APIへのリクエストをAppEngine Standard Enviroment上のpythonアプリから行ってみます。

権限については、IAMでApp Engine default service accountにbigquery向けのものが付与されているとします。

色々な方法があると思いますが、いくつか書いてみます。


URLFetch

Datastoreのbackup APIを叩く時に、チュートリアル(エクスポートのスケジューリング)ではAppEngine/SE上で使えるURLFetch APIを使った例が載っていました。

REST APIの形式もほぼ同じなのでまずはURLFetchを使ったパターンが外部パッケージを入れなくて済むので簡単そうです。

import httplib

import json
import logging
import webapp2

from google.appengine.api import app_identity
from google.appengine.api import urlfetch

BASE_SOURCE_URI = "gs://{bucket}/{target_dir}/all_namespaces/kind_{kind}/all_namespaces_kind_{kind}.export_metadata"

class ImportBackup(webapp2.RequestHandler):

def get(self):
access_token, _ = app_identity.get_access_token('https://www.googleapis.com/auth/bigquery')
app_id = app_identity.get_application_id()
bucket = 'my-bucket'

kind = 'User'
target_dir = '20171121-0615'
dataset = 'Datastore'

request = {
"configuration": {
"load": {
"sourceUris": [
BASE_SOURCE_URI.format(bucket=bucket, target_dir=target_dir, kind=kind)
],
"sourceFormat": "DATASTORE_BACKUP",
"destinationTable": {
"datasetId": 'Datastore',
"projectId": app_id,
"tableId": kind
},
"writeDisposition": "WRITE_TRUNCATE"
}
}
}
url = 'https://www.googleapis.com/bigquery/v2/projects/{}/jobs'.format(app_id)

headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + access_token
}
try:
result = urlfetch.fetch(
url=url,
payload=json.dumps(body),
method=urlfetch.POST,
deadline=60,
headers=headers)
if result.status_code == httplib.OK:
logging.info(result.content)
elif result.status_code >= 500:
logging.error(result.content)
else:
logging.warning(result.content)
return result.status_code
except urlfetch.Error:
logging.exception('Failed to initiate export.')
return httplib.INTERNAL_SERVER_ERROR


メリット


  • AppEngineの場合はaccess_tokenが手軽に取得できるから、普通のREST APIリクエストとしてかける

  • 外部パッケージが不要なので動作確認が最低限で済む。


デメリット


  • リクエスト内容を自分でちゃんと書かないとダメ→ミス・バグが起こりやすい

  • 記述量は多め


google-api-python-client

GoogleのREST API向けにpython用のクライアントライブラリが用意されているので、こちらを使う方法もいくつか見かけました。

google-api-python-client

ただ、このライブラリはメンテナンスモードになってしまっているので、あまりお勧めはしません。

あと、あまり記述量が減ったりはしない気がします。

import logging

import httplib2
import webapp2
from google.appengine.api import app_identity
from googleapiclient import discovery, errors
from oauth2client.client import GoogleCredentials

BASE_SOURCE_URI = "gs://{bucket}/{target_dir}/all_namespaces/kind_{kind}/all_namespaces_kind_{kind}.export_metadata"

class ImportBackup(webapp2.RequestHandler):
def get(self):
credentials = GoogleCredentials.get_application_default()
app_id = app_identity.get_application_id()
bucket = 'my-bucket'

kind = 'User'
target_dir = '20171121-0615'
dataset = 'Datastore'

service = discovery.build('bigquery', 'v2', credentials=credentials)
request = {
"configuration": {
"load": {
"sourceUris": [
BASE_SOURCE_URI.format(bucket=bucket, target_dir=target_dir, kind=kind)
],
"sourceFormat": "DATASTORE_BACKUP",
"destinationTable": {
"datasetId": dataset,
"projectId": app_id,
"tableId": kind
},
"writeDisposition": "WRITE_TRUNCATE"
}
}
}
try:
service.jobs().insert(projectId=app_id, body=request).execute()
self.response.status_int = 200
except errors.HttpError as e:
logging.error(e.content)
self.response.status_int = e.resp.status
except httplib2.HttpLib2Error:
logging.exception('Failed to initiate export.')
self.response.status_int = 500

ちなみにgoogleのpubsubサンプルである、cloud-pubsub-samples-pythonでもこの方法を見かけたので、動作の信頼性はたかそうです。


メリット


  • REST APIの書き方をラップしてくれる

  • 公式ラッパーなので信頼性が高い?


デメリット


  • 外部パッケージのインストールが必要

  • メンテナンスモードのライブラリで不安

  • リクエスト内容は記述する必要がある

外部パッケージのインストール方法は以下を参考にしてください

Using third-party libraries


BigQuery-Python

google-api-python-clientがイケてないということで、このclientをさらにラップしたライブラリを出しています。

BigQuery-Python

12月からSENSYに入社してくれたitkrさんの記事が参考になります!

PythonからBigQueryを操作するときは BigQuery-Python が便利だった

import logging

import webapp2
from bigquery import get_client, JOB_SOURCE_FORMAT_DATASTORE_BACKUP, JOB_WRITE_TRUNCATE, BIGQUERY_SCOPE
from bigquery.errors import JobInsertException
from google.appengine.api import app_identity
from oauth2client.client import GoogleCredentials

BASE_SOURCE_URI = "gs://{bucket}/{target_dir}/all_namespaces/kind_{kind}/all_namespaces_kind_{kind}.export_metadata"

class ImportBackup(webapp2.RequestHandler):
def get(self):
credentials = GoogleCredentials.get_application_default()
if credentials.create_scoped_required():
credentials = credentials.create_scoped(BIGQUERY_SCOPE)
app_id = app_identity.get_application_id()
bucket = 'my-bucket'

kind = 'User'
target_dir = '20171206-1800'
dataset = 'Datastore'

client = get_client(app_id, credentials=credentials)
try:
client.import_data_from_uris(
source_uris=[BASE_SOURCE_URI.format(bucket=bucket, target_dir=target_dir, kind=kind)],
write_disposition=JOB_WRITE_TRUNCATE,
dataset=dataset,
table=kind,
source_format=JOB_SOURCE_FORMAT_DATASTORE_BACKUP
)
self.response.status_int = 200
except JobInsertException as e:
logging.error(e.message)
self.response.status_int = 500


メリット


  • 簡単にかける


デメリット


  • 外部パッケージのインストールが必要

  • 非公式ラッパー

外部パッケージもgoogle-api-python-clientに追加でいくつか程度なので、あまり変わらないです。

REST APIの仕様変更に追従するのが難しいかとも思いましたが、Breakingな変更だったら結局書き直しが発生するので、使いやすいラッパーを使うメリットは大きいかなと思います。


どれがいい?

三つも方法を書きましたが、結局REST APIのリクエストなので特にどれでもいいですね、、

AppEngine/SEからでも意外とスムーズにリクエストができたので、いい感じですね。

現在プロジェクトでは、google-api-clientは使いづらいし、

BigQueryをあまり使わないのに追加でライブラリをインストールするのも抵抗があり、

URLFetchでの実装をしていますが、やはりおすすめはBigQuery-Pythonでしょうか。

好みや要件で選ぶといいと思います。


まとめ

今回は3連続で記事を書いて、DatastoreからBigQueryへのインポートを自動化してみました。

きっかけはds2bqというgoでのサンプルに感化されてpythonでもやってみよう!ということでした。

python版サンプルは、もう少し整理して、公開しようと思っています。

また、今回の記事は違いますが、前の二つはどうせならBeta機能を試そう!ということで、まだベータが外れないうちに試しています。

既存の方法よりも洗練されていて使いやすいなぁ、という印象です。


注意点

今回のBigQueryへのロード依頼ですが、

GCSの変更をPub/Sub経由でAppEngineに通知する」で受け取ったHandlerで直にREST APIにアクセスすると、

対象のTableやkindが複数あるケースなどで結構時間がかかってしまうので、


  • pubsubのsubscriptionの時間制限を伸ばす(デフォルト10秒->30秒など?)

  • 今回のBigQueryへのリクエストをTaskQueue内で行う

などの工夫をした方が良いと思います。