3
0

More than 3 years have passed since last update.

Cloud DataflowでBigQueryにストリーミングインサートする時のちょっとした注意

Last updated at Posted at 2020-08-28

問題

Dataflowで、Build-in I/Oのライブラリを使って、BigQueryにストリーミングインサートでデータを挿入しようとすると思わぬエラーに見舞われた。

こういうコード

pubsub2bigquery.py
import argparse
import logging
import json

from past.builtins import unicode

import apache_beam as beam

from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

class JSONStringToDictFn(beam.DoFn):
    def process(self, element):
        items = json.loads(element)

        yield items

def run(argv=None, save_main_session=True):
    """Main entry point; defines and runs the wordcount pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
      '--input_subscription',
      required=True,
      help=(
          'Input PubSub subscription '
          '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>".'))
    parser.add_argument(
      '--output_dataset',
      required=True,
      help=(
          'Output BigQuery dataset '
          '"<PROJECT>.<DATASET>"'))
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args, streaming=True)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    with beam.Pipeline(options=pipeline_options) as p:

        subscription = known_args.input_subscription
        rides = (
            p
            | 'Read' >> ReadFromPubSub(subscription=subscription).with_output_types(bytes)
            | 'ToDict' >> beam.ParDo(JSONStringToDictFn())
        )

        (bigquery_project, dataset) = known_args.output_dataset.split('.')
        rides | 'Write rides to BigQuery' >> WriteToBigQuery(table,
                                                             dataset=dataset,
                                                             project=bigquery_project)

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

こういうプログラムを以下のように実行するとエラーが発生。

python pubsub2bigquery.py \
--project <PROJECT ID> \
--region='us-central1' \
--runner DataflowRunner \
--autoscaling_algorithm=THROUGHPUT_BASED \
--max_num_workers=50 \
--input_subscription <SUBSCRIPTION> \
--output_dataset <DATASET ID> \
--temp_location=<GCP PATH for temp> \
--staging_location=<GCP PATH for staging>

手元の環境では、自動スケーリングの以下のオプションを抜くと、つまりワーカー1で実行するとエラーは発生せず。

--autoscaling_algorithm=THROUGHPUT_BASED \
--max_num_workers=50 \

エラー

全文は抜粋しませんが、以下のような内容を含んだエラーがログに出ます。

"/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse http_response, method_config=method_config, request=request) RuntimeError: apitools.base.py.exceptions.HttpForbiddenError: HttpError accessing https://bigquery.googleapis.com/bigquery/v2/projects/shua-gcp-book/datasets/nyc_taxi_trip/tables/realtime_rides?alt=json: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Fri, 28 Aug 2020 03:40:42 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '403', 'content-length': '560', '-content-encoding': 'gzip'}>, content <{ "error": { "code": 403, "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors", "errors": [ { "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors", "domain": "usageLimits", "reason": "rateLimitExceeded" } ], "status": "PERMISSION_DENIED" } } > [while running 'generatedPtransform-121843']

下の方に "code": 403 とか "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors" とか "reason": "rateLimitExceeded" とかあるので、APIコールの上限に引っかかったようです。

うーん、ストリーミングインサートで書き込みすぎか。。。
と思ったのですが、公式ドキュメントの以下の通り、クォータはないはず。

Screen Shot 2020-08-28 at 14.34.05.png

もうちょっとよくログをみてみると。。。

"/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1126, in process bigquery_tools.parse_table_reference(destination), schema) File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1112, in _create_table_if_needed additional_create_parameters=self.additional_bq_parameters)
(省略)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 534, in get_table response = self.client.tables.Get(request)

_create_table_if_needed? get_table? どうやらテーブル情報の取得次のAPI操作引っかかっている模様。BigQueryに書き込みするところの apache_beam.io.gcp.bigquery.WriteToBigQuerycreate_disposition パラメータがあってこれがデフォルトで CREATE_IF_NEEDED になっているようで、おそらくストリーミングインサート実行時にテーブルの存在確認のために参照している模様。

テーブルは基本あるし、こんな動きは不要なのでこのパラメータを以下のように CREATE_NEVER にしてしまう。

fixed.py
        rides | 'Write rides to BigQuery' >> WriteToBigQuery('realtime_rides',
                                                             dataset=dataset,
                                                             project=bigquery_project,
                                                             create_disposition=BigQueryDisposition.CREATE_NEVER)

importも忘れずに。

fixed.py
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition

これでエラーは出なくなりました!みなさまお気をつけください。

参考

Apache Beamのコミュニティにもバグチケットがきられていました。

ちなみに、Stackoverflowにも同じ悩みの方がいました。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0