12
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Storageにファイルを入れるとDataflow起動しBigQueryにデータを入れる方法(Python)

Last updated at Posted at 2021-03-02

・Storage→Cloud Functions→Dataflow→BigQuery

Cloud Functionsから直接BigQueryに入れることも出来るが、Cloud Functionsは今の所540秒以上かかると強制終了されてしまう。
また、データを加工したい場合とかにはDataflowに投げる方が良いと思う。

ここの記事を参考にするが、情報が少なくてどうやればいいのかがわからなくて、調べながらやる。

https://www.case-k.jp/entry/2019/10/08/233411

途中途中で確認しているのは、なかなかうまく行かなかったため、一つずつ確認しながら作っていたため*

#最初にバケットを作る

作業用のバケットと、トリガ用のinputバケットは分ける
(同じにするとテンプレートファイルが作成されるたびにキックされてしまうため)

$PROJECT/templateファイル
$PROJECT/tmpファイル
$PROJECT/stagingファイル
$PROJECT/dataファイル
$PROJECT-input/inputファイル

regional、us-central1で作成をする。
regionは、DataflowもStorageもすべてus-central1で揃える

#BigQueryのデータセットを作る

bq mk lake

#dataflowを作る

参考元のgithubを参考にして、テンプレートファイルを作成する

case-k-git/gcp

作成したプログラムはこちら

Pythonあまり良くわかってなくって、サンプルをできるだけ流用しているため、不要なものもあるかもしれない。

dataflow_gcs_to_bq.py

#!/usr/bin/env python
# coding: utf-8

# In[ ]:


# -*- coding: utf-8 -*-
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# プロジェクトID
PROJECTID = ''  #PROJECT_IDを入れてください

# オプション設定
class MyOptions(PipelineOptions):
    PROJECTID = ''  #PROJECT_IDを入れてください
    @classmethod  
    def _add_argparse_args(cls, parser):        
        # 実行時に指定するパラメータ
        parser.add_value_provider_argument('--input',
                            help='InputFile for the pipeline',
                            default='gs://{}/output/sample.csv'.format(PROJECTID))
        parser.add_value_provider_argument('--output',
                            help='Output Bigquery for the pipeline',
                            default='lake.usa_names')
            
# オプション設定
myoptions = MyOptions()
options = beam.options.pipeline_options.PipelineOptions(options=myoptions)

# GCP関連オプション
gcloud_options = options.view_as(
  beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.project = PROJECTID
gcloud_options.job_name = 'jobgcstogbq'
gcloud_options.staging_location = 'gs://{}/staging'.format(PROJECTID)
gcloud_options.temp_location = 'gs://{}/temp'.format(PROJECTID)

# テンプレート配置
#gcloud_options.template_location = 'gs://{}/template/dataflow_gcs_to_bq'.format(PROJECTID)

# 標準オプション(実行環境を設定)
std_options = options.view_as(
  beam.options.pipeline_options.StandardOptions)
std_options.runner = 'DataflowRunner'

table_spec = 'lake.table_gcs_to_gbq'
table_schema= 'word:STRING,word_count:INTEGER'

class Split(beam.DoFn):

    def process(self, element):
        word, word_count = element.split(",")

        return [{
            'word': word,
            'word_count': int(word_count),
        }]

p = beam.Pipeline(options=options)
(p | 'read from gcs' >> beam.io.ReadFromText(myoptions.input, skip_header_lines=1)
  | 'ParseCSV' >> beam.ParDo(Split())
  | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(lambda s: f'{PROJECTID}:{myoptions.output.get()}', schema=table_schema)
 )
p.run()

入力ファイルはこんな感じで作る

sample.csv
word,count
aaa,1
bbb,2
ccc,65535
ddd,256

このファイルを gs://$PROJECT-input/ に置く

gsutil cp sample.csv gs://$PROJECT-input

#CloudDataflowで実行してみる

※python環境が設定されていない場合はこちらを参考にして構築する

Python環境の設定

Dataflowで実行して、動くことを確認する

python dataflow_gcs_to_bq.py --project=$PROJECT --region=us-central1 --
runner=DataflowRunner --staging_location=gs://$PROJECT/staging --temp_location gs://$PROJECT/temp --input gs://$PROJECT-input/sample.csv --save_main_session

これでDataflowは動くが、WARNINGが出る。回避方法がわからず…知ってる人がいましたらコメントしていただければ幸いです。


WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
WARNING:apache_beam.options.pipeline_options:Discarding invalid overrides: {'options': <__main__.MyOptions object at 0x7f2d09d19e48>}
WARNING:apache_beam.options.pipeline_options:Discarding invalid overrides: {'options': <__main__.MyOptions object at 0x7f2d09d19e48>}

#Dataflowにカスタムテンプレートを作成する

参考元

https://cloud.google.com/dataflow/docs/guides/templates/creating-templates

metadataファイルを作る

dataflow_gcs_to_gcs_metadata
{
  "name": "CustomTemplateGCStoBQ",
  "description": "hogehoge",
  "parameters": [{
    "name": "input",
    "label": "Input Cloud Storage File(s)",
    "help_text": "Path of the file pattern glob to read from.",
    "regexes": ["^gs:\/\/[^\n\r]+$"]
  },
  {
    "name": "output",
    "label": "Output Bigquery  Prefix",
    "help_text": "table prefix. ex: lake.usa_name",
    "regexes": ["[^\n\r]+$"]
  }]
}

templateを入れるCloudStoregeの場所に保存する。

gsutil cp dataflow_gcs_to_gcs_metadata gs://$PROJECT/template

ステージングを作る

Googleのドキュメントだとregionが指定されていない。気になったので追加する。

python dataflow_gcs_to_bq.py --runner DataflowRunner --project $PROJECT --region=us-central1 --staging_location gs://$PROJECT/staging --t
emp_location gs://$PROJECT/temp --template_location gs://$PROJECT/template/dataflow_gcs_to_bq

ここを参考にして、GCPコンソールから実行してみて、実行できることを確認する。

https://cloud.google.com/dataflow/docs/guides/templates/running-templates#custom-templates

#Cloud Functionsを作成する

トリガーはStorageにファイルが置かれた場合にする。

イベントタイプファイナライズ / 作成
バケット $PROJECT-input
関数は失敗しても自動的に再試行しない

プログラムはここを参考に作るが、credentialsがないのとzoneが指定されていないのが気になるので、追加する。

https://github.com/case-k-git/gcp/blob/master/functions/functions_dataflow_job_start.py

完成したソースはこちら

main.py

from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials

def _dataflow_job_start(data, context):
    # read from gcs
    PROJECTID = "" #PROJECT IDを入れてください
    file_name = data['name']

    job = 'job from cloud functions'
    template = "gs://{}/template/dataflow_gcs_to_bq".format(PROJECTID)
    parameters = {
         'input': "gs://{}-input/{}".format(PROJECTID, file_name),
         'output': "lake.table_gcs_to_gbq"
     }

    
    credentials = GoogleCredentials.get_application_default()
    service  = build("dataflow","v1b3",credentials=credentials, cache_discovery=False)
    #templates = service.projects().templates()
        
    request = service.projects().templates().launch(
        projectId=PROJECTID,
        gcsPath=template,
        body={
            'jobName': job,
            'parameters': parameters,
            'environment': {
                'tempLocation': 'gs://{}/temp'.format(PROJECTID),
                'zone': 'us-central1-a'
            }
        }
    )
    return request.execute()
requirements.txt
google-api-python-client==1.7.8
oauth2client==4.1.3

endpointは _dataflow_job_start を指定する。

#サービスアカウントを設定する
実行するサービスアカウントにはdataflow.adminを指定する。指定しないと権限エラーで実行できない。

#ファイルを置いて、実行できることを確認する。


gsutil cp sample.csv gs://$PROJECT-input

#※うまく行かない場合

##JSONをどう指定すればいいのかわからない場合

以下のURLからAPIで試してみることが出来ます。

https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.templates/launch?hl=ja

##以下のエラーが出る場合

ImportError: file_cache is unavailable when using oauth2client >= 4.0.0 or google-auth

このエラーが出ても動くのですが、修正するには以下の記事を参考にしてください。
(ちなみに対応済)
https://qiita.com/kai_kou/items/4b754c61ac225daa0f7d

#課題1
実は、outputが使われていない。本当は引数でもらった内容でやりたいがうまくいかない。
分かる人がいましたらコメントをしていただけたらと思います。

解決しました! @hidecheck さん、コメントありがとうございます。

#課題2
連続してファイルが置くようなシステムの場合、終わるまでalreadyが出て実行できないので、実行するjob名はユニークする必要がある。

#課題3
CloudStorageから必ずイベントが発火することを前提としているが、もしかすると、発火しない場合があるかもしれない。回避するにはschedulerで何分かおきに巡回的にみたりとしないといけないが、それでは最初からスケジューラーでやれば良くて、本末転倒な気もする。

#所感

CloudStorageにファイルが置かれたらDataflowを実行するのはよくあるケースな気がするが、この方法での記事が少ない。なぜだろう?もしかすると、他の方法が良いのかもしれない。

12
4
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?