8
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

SENSYAdvent Calendar 2017

Day 5

GAE/SEからGCSにおいたファイルをBigQueryに読み込む

Last updated at Posted at 2017-12-07

SENSY株式会社のwasnotです。

Adventカレンダーにも水増し記事を投与していきます。
前回まで、Datastore->GCS、GCS変更->GAEときたので、この続きを書こうと思います。

元々からの話をすると、
なるべくGAE/SEのアプリを中心に、Datastore->Bigqueryを自動化したいという要求がありました。

手順を考えると、

この記事ではSEのAPIを叩くと、所定の場所のDatastoreバックアップをBigqueryに読ませる、というものを作ります。

BigQueryにDatastore backupを読ませる

BigQueryには色々なAPIがありますが、あまり使ったことないので、今回はloadのAPIだけ使います。
ドキュメントはこちらですね。
Loading Data From Cloud Datastore Backups

ロードの仕方はWeb UIやCLI, REST APIがあります。

CLIの例だと以下のようなコマンドを叩くことが多いと思います。

> bq load --source_format=DATASTORE_BACKUP --replace \
  [PROJECT_ID]:[DATASET].[TABLE] \
  gs://[BUCKET_NAME]/20171121-0615/all_namespaces/kind_User/all_namespaces_kind_User.export_metadata
Waiting on bqjob_r72af5ceb2736ff19_0000015fdd73417a_1 ... (0s) Current status: DONE

bucketのPATHは前回までの記事で作った日付prefixをつけています。

GAE上のPythonアプリから実行するにはこれに当たるREST APIを叩けば良さそうですね。

REST APIのリファレンスJobsのInsertのページとTry API toolを使ってみると、わかりますが、
REST APIのbodyは以下のようにすれば上のコマンドと同じものになりそうです。

body
{
  "configuration": {
    "load": {
      "sourceUris": [
        "gs://[BUCKET]/20171121-0615/all_namespaces/kind_User/all_namespaces_kind_User.export_metadata"
      ],
      "sourceFormat": "DATASTORE_BACKUP",
      "destinationTable": {
        "projectId": "[PROJECT_ID]",
        "datasetId": "[DATASET]",
        "tableId": "[TABLE]"
      },
      "writeDisposition": "WRITE_TRUNCATE"
    }
  }
}

"writeDisposition": "WRITE_TRUNCATE"--replace指定と同じ意味だと思われます。
defaultのWRITE_APPENDDATASTORE_BACKUPの読み込みでは使えないのでTRUNCATEを指定しておくといいでしょう。

GAE/SE pythonのアプリからのロード

上のようなREST APIへのリクエストをAppEngine Standard Enviroment上のpythonアプリから行ってみます。
権限については、IAMでApp Engine default service accountにbigquery向けのものが付与されているとします。

色々な方法があると思いますが、いくつか書いてみます。

URLFetch

Datastoreのbackup APIを叩く時に、チュートリアル(エクスポートのスケジューリング)ではAppEngine/SE上で使えるURLFetch APIを使った例が載っていました。

REST APIの形式もほぼ同じなのでまずはURLFetchを使ったパターンが外部パッケージを入れなくて済むので簡単そうです。

import httplib
import json
import logging
import webapp2

from google.appengine.api import app_identity
from google.appengine.api import urlfetch

BASE_SOURCE_URI = "gs://{bucket}/{target_dir}/all_namespaces/kind_{kind}/all_namespaces_kind_{kind}.export_metadata"

class ImportBackup(webapp2.RequestHandler):

    def get(self):
        access_token, _ = app_identity.get_access_token('https://www.googleapis.com/auth/bigquery')
        app_id = app_identity.get_application_id()
        bucket = 'my-bucket'
        
        kind = 'User'
        target_dir = '20171121-0615'
        dataset = 'Datastore'
        
        request = {
            "configuration": {
                "load": {
                    "sourceUris": [
                        BASE_SOURCE_URI.format(bucket=bucket, target_dir=target_dir, kind=kind)
                    ],
                    "sourceFormat": "DATASTORE_BACKUP",
                    "destinationTable": {
                        "datasetId": 'Datastore',
                        "projectId": app_id,
                        "tableId": kind
                    },
                    "writeDisposition": "WRITE_TRUNCATE"
                }
            }
        }
        url = 'https://www.googleapis.com/bigquery/v2/projects/{}/jobs'.format(app_id)
        
        headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer ' + access_token
        }
        try:
            result = urlfetch.fetch(
                url=url,
                payload=json.dumps(body),
                method=urlfetch.POST,
                deadline=60,
                headers=headers)
            if result.status_code == httplib.OK:
                logging.info(result.content)
            elif result.status_code >= 500:
                logging.error(result.content)
            else:
                logging.warning(result.content)
            return result.status_code
        except urlfetch.Error:
            logging.exception('Failed to initiate export.')
            return httplib.INTERNAL_SERVER_ERROR

メリット

  • AppEngineの場合はaccess_tokenが手軽に取得できるから、普通のREST APIリクエストとしてかける
  • 外部パッケージが不要なので動作確認が最低限で済む。

デメリット

  • リクエスト内容を自分でちゃんと書かないとダメ→ミス・バグが起こりやすい
  • 記述量は多め

google-api-python-client

GoogleのREST API向けにpython用のクライアントライブラリが用意されているので、こちらを使う方法もいくつか見かけました。
google-api-python-client

ただ、このライブラリはメンテナンスモードになってしまっているので、あまりお勧めはしません。
あと、あまり記述量が減ったりはしない気がします。

import logging

import httplib2
import webapp2
from google.appengine.api import app_identity
from googleapiclient import discovery, errors
from oauth2client.client import GoogleCredentials

BASE_SOURCE_URI = "gs://{bucket}/{target_dir}/all_namespaces/kind_{kind}/all_namespaces_kind_{kind}.export_metadata"


class ImportBackup(webapp2.RequestHandler):
    def get(self):
        credentials = GoogleCredentials.get_application_default()
        app_id = app_identity.get_application_id()
        bucket = 'my-bucket'

        kind = 'User'
        target_dir = '20171121-0615'
        dataset = 'Datastore'

        service = discovery.build('bigquery', 'v2', credentials=credentials)
        request = {
            "configuration": {
                "load": {
                    "sourceUris": [
                        BASE_SOURCE_URI.format(bucket=bucket, target_dir=target_dir, kind=kind)
                    ],
                    "sourceFormat": "DATASTORE_BACKUP",
                    "destinationTable": {
                        "datasetId": dataset,
                        "projectId": app_id,
                        "tableId": kind
                    },
                    "writeDisposition": "WRITE_TRUNCATE"
                }
            }
        }
        try:
            service.jobs().insert(projectId=app_id, body=request).execute()
            self.response.status_int = 200
        except errors.HttpError as e:
            logging.error(e.content)
            self.response.status_int = e.resp.status
        except httplib2.HttpLib2Error:
            logging.exception('Failed to initiate export.')
            self.response.status_int = 500

ちなみにgoogleのpubsubサンプルである、cloud-pubsub-samples-pythonでもこの方法を見かけたので、動作の信頼性はたかそうです。

メリット

  • REST APIの書き方をラップしてくれる
  • 公式ラッパーなので信頼性が高い?

デメリット

  • 外部パッケージのインストールが必要
  • メンテナンスモードのライブラリで不安
  • リクエスト内容は記述する必要がある

外部パッケージのインストール方法は以下を参考にしてください
Using third-party libraries

BigQuery-Python

google-api-python-clientがイケてないということで、このclientをさらにラップしたライブラリを出しています。
BigQuery-Python

12月からSENSYに入社してくれたitkrさんの記事が参考になります!
PythonからBigQueryを操作するときは BigQuery-Python が便利だった

import logging

import webapp2
from bigquery import get_client, JOB_SOURCE_FORMAT_DATASTORE_BACKUP, JOB_WRITE_TRUNCATE, BIGQUERY_SCOPE
from bigquery.errors import JobInsertException
from google.appengine.api import app_identity
from oauth2client.client import GoogleCredentials

BASE_SOURCE_URI = "gs://{bucket}/{target_dir}/all_namespaces/kind_{kind}/all_namespaces_kind_{kind}.export_metadata"


class ImportBackup(webapp2.RequestHandler):
    def get(self):
        credentials = GoogleCredentials.get_application_default()
        if credentials.create_scoped_required():
            credentials = credentials.create_scoped(BIGQUERY_SCOPE)
        app_id = app_identity.get_application_id()
        bucket = 'my-bucket'

        kind = 'User'
        target_dir = '20171206-1800'
        dataset = 'Datastore'

        client = get_client(app_id, credentials=credentials)
        try:
            client.import_data_from_uris(
                source_uris=[BASE_SOURCE_URI.format(bucket=bucket, target_dir=target_dir, kind=kind)],
                write_disposition=JOB_WRITE_TRUNCATE,
                dataset=dataset,
                table=kind,
                source_format=JOB_SOURCE_FORMAT_DATASTORE_BACKUP
            )
            self.response.status_int = 200
        except JobInsertException as e:
            logging.error(e.message)
            self.response.status_int = 500

メリット

  • 簡単にかける

デメリット

  • 外部パッケージのインストールが必要
  • 非公式ラッパー

外部パッケージもgoogle-api-python-clientに追加でいくつか程度なので、あまり変わらないです。
REST APIの仕様変更に追従するのが難しいかとも思いましたが、Breakingな変更だったら結局書き直しが発生するので、使いやすいラッパーを使うメリットは大きいかなと思います。

どれがいい?

三つも方法を書きましたが、結局REST APIのリクエストなので特にどれでもいいですね、、
AppEngine/SEからでも意外とスムーズにリクエストができたので、いい感じですね。

現在プロジェクトでは、google-api-clientは使いづらいし、
BigQueryをあまり使わないのに追加でライブラリをインストールするのも抵抗があり、
URLFetchでの実装をしていますが、やはりおすすめはBigQuery-Pythonでしょうか。

好みや要件で選ぶといいと思います。

まとめ

今回は3連続で記事を書いて、DatastoreからBigQueryへのインポートを自動化してみました。
きっかけはds2bqというgoでのサンプルに感化されてpythonでもやってみよう!ということでした。
python版サンプルは、もう少し整理して、公開しようと思っています。

また、今回の記事は違いますが、前の二つはどうせならBeta機能を試そう!ということで、まだベータが外れないうちに試しています。
既存の方法よりも洗練されていて使いやすいなぁ、という印象です。

注意点

今回のBigQueryへのロード依頼ですが、
GCSの変更をPub/Sub経由でAppEngineに通知する」で受け取ったHandlerで直にREST APIにアクセスすると、
対象のTableやkindが複数あるケースなどで結構時間がかかってしまうので、

  • pubsubのsubscriptionの時間制限を伸ばす(デフォルト10秒->30秒など?)
  • 今回のBigQueryへのリクエストをTaskQueue内で行う

などの工夫をした方が良いと思います。

8
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?