・Storage→Cloud Functions→Dataflow→BigQuery
Cloud Functionsから直接BigQueryに入れることも出来るが、Cloud Functionsは今の所540秒以上かかると強制終了されてしまう。
また、データを加工したい場合とかにはDataflowに投げる方が良いと思う。
ここの記事を参考にするが、情報が少なくてどうやればいいのかがわからなくて、調べながらやる。
https://www.case-k.jp/entry/2019/10/08/233411
途中途中で確認しているのは、なかなかうまく行かなかったため、一つずつ確認しながら作っていたため*
#最初にバケットを作る
作業用のバケットと、トリガ用のinputバケットは分ける
(同じにするとテンプレートファイルが作成されるたびにキックされてしまうため)
$PROJECT/templateファイル
$PROJECT/tmpファイル
$PROJECT/stagingファイル
$PROJECT/dataファイル
$PROJECT-input/inputファイル
regional、us-central1で作成をする。
regionは、DataflowもStorageもすべてus-central1で揃える
#BigQueryのデータセットを作る
bq mk lake
#dataflowを作る
参考元のgithubを参考にして、テンプレートファイルを作成する
作成したプログラムはこちら
Pythonあまり良くわかってなくって、サンプルをできるだけ流用しているため、不要なものもあるかもしれない。
#!/usr/bin/env python
# coding: utf-8
# In[ ]:
# -*- coding: utf-8 -*-
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# プロジェクトID
PROJECTID = '' #PROJECT_IDを入れてください
# オプション設定
class MyOptions(PipelineOptions):
PROJECTID = '' #PROJECT_IDを入れてください
@classmethod
def _add_argparse_args(cls, parser):
# 実行時に指定するパラメータ
parser.add_value_provider_argument('--input',
help='InputFile for the pipeline',
default='gs://{}/output/sample.csv'.format(PROJECTID))
parser.add_value_provider_argument('--output',
help='Output Bigquery for the pipeline',
default='lake.usa_names')
# オプション設定
myoptions = MyOptions()
options = beam.options.pipeline_options.PipelineOptions(options=myoptions)
# GCP関連オプション
gcloud_options = options.view_as(
beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.project = PROJECTID
gcloud_options.job_name = 'jobgcstogbq'
gcloud_options.staging_location = 'gs://{}/staging'.format(PROJECTID)
gcloud_options.temp_location = 'gs://{}/temp'.format(PROJECTID)
# テンプレート配置
#gcloud_options.template_location = 'gs://{}/template/dataflow_gcs_to_bq'.format(PROJECTID)
# 標準オプション(実行環境を設定)
std_options = options.view_as(
beam.options.pipeline_options.StandardOptions)
std_options.runner = 'DataflowRunner'
table_spec = 'lake.table_gcs_to_gbq'
table_schema= 'word:STRING,word_count:INTEGER'
class Split(beam.DoFn):
def process(self, element):
word, word_count = element.split(",")
return [{
'word': word,
'word_count': int(word_count),
}]
p = beam.Pipeline(options=options)
(p | 'read from gcs' >> beam.io.ReadFromText(myoptions.input, skip_header_lines=1)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(lambda s: f'{PROJECTID}:{myoptions.output.get()}', schema=table_schema)
)
p.run()
入力ファイルはこんな感じで作る
word,count
aaa,1
bbb,2
ccc,65535
ddd,256
このファイルを gs://$PROJECT-input/
に置く
gsutil cp sample.csv gs://$PROJECT-input
#CloudDataflowで実行してみる
※python環境が設定されていない場合はこちらを参考にして構築する
Dataflowで実行して、動くことを確認する
python dataflow_gcs_to_bq.py --project=$PROJECT --region=us-central1 --
runner=DataflowRunner --staging_location=gs://$PROJECT/staging --temp_location gs://$PROJECT/temp --input gs://$PROJECT-input/sample.csv --save_main_session
これでDataflowは動くが、WARNINGが出る。回避方法がわからず…知ってる人がいましたらコメントしていただければ幸いです。
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
WARNING:apache_beam.options.pipeline_options:Discarding invalid overrides: {'options': <__main__.MyOptions object at 0x7f2d09d19e48>}
WARNING:apache_beam.options.pipeline_options:Discarding invalid overrides: {'options': <__main__.MyOptions object at 0x7f2d09d19e48>}
#Dataflowにカスタムテンプレートを作成する
参考元
https://cloud.google.com/dataflow/docs/guides/templates/creating-templates
metadataファイルを作る
{
"name": "CustomTemplateGCStoBQ",
"description": "hogehoge",
"parameters": [{
"name": "input",
"label": "Input Cloud Storage File(s)",
"help_text": "Path of the file pattern glob to read from.",
"regexes": ["^gs:\/\/[^\n\r]+$"]
},
{
"name": "output",
"label": "Output Bigquery Prefix",
"help_text": "table prefix. ex: lake.usa_name",
"regexes": ["[^\n\r]+$"]
}]
}
templateを入れるCloudStoregeの場所に保存する。
gsutil cp dataflow_gcs_to_gcs_metadata gs://$PROJECT/template
ステージングを作る
Googleのドキュメントだとregionが指定されていない。気になったので追加する。
python dataflow_gcs_to_bq.py --runner DataflowRunner --project $PROJECT --region=us-central1 --staging_location gs://$PROJECT/staging --t
emp_location gs://$PROJECT/temp --template_location gs://$PROJECT/template/dataflow_gcs_to_bq
ここを参考にして、GCPコンソールから実行してみて、実行できることを確認する。
https://cloud.google.com/dataflow/docs/guides/templates/running-templates#custom-templates
#Cloud Functionsを作成する
トリガーはStorageにファイルが置かれた場合にする。
イベントタイプファイナライズ / 作成
バケット $PROJECT-input
関数は失敗しても自動的に再試行しない
プログラムはここを参考に作るが、credentialsがないのとzoneが指定されていないのが気になるので、追加する。
https://github.com/case-k-git/gcp/blob/master/functions/functions_dataflow_job_start.py
完成したソースはこちら
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
def _dataflow_job_start(data, context):
# read from gcs
PROJECTID = "" #PROJECT IDを入れてください
file_name = data['name']
job = 'job from cloud functions'
template = "gs://{}/template/dataflow_gcs_to_bq".format(PROJECTID)
parameters = {
'input': "gs://{}-input/{}".format(PROJECTID, file_name),
'output': "lake.table_gcs_to_gbq"
}
credentials = GoogleCredentials.get_application_default()
service = build("dataflow","v1b3",credentials=credentials, cache_discovery=False)
#templates = service.projects().templates()
request = service.projects().templates().launch(
projectId=PROJECTID,
gcsPath=template,
body={
'jobName': job,
'parameters': parameters,
'environment': {
'tempLocation': 'gs://{}/temp'.format(PROJECTID),
'zone': 'us-central1-a'
}
}
)
return request.execute()
google-api-python-client==1.7.8
oauth2client==4.1.3
endpointは _dataflow_job_start
を指定する。
#サービスアカウントを設定する
実行するサービスアカウントにはdataflow.adminを指定する。指定しないと権限エラーで実行できない。
#ファイルを置いて、実行できることを確認する。
gsutil cp sample.csv gs://$PROJECT-input
#※うまく行かない場合
##JSONをどう指定すればいいのかわからない場合
以下のURLからAPIで試してみることが出来ます。
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.templates/launch?hl=ja
##以下のエラーが出る場合
ImportError: file_cache is unavailable when using oauth2client >= 4.0.0 or google-auth
このエラーが出ても動くのですが、修正するには以下の記事を参考にしてください。
(ちなみに対応済)
https://qiita.com/kai_kou/items/4b754c61ac225daa0f7d
#課題1
実は、outputが使われていない。本当は引数でもらった内容でやりたいがうまくいかない。
分かる人がいましたらコメントをしていただけたらと思います。
解決しました! @hidecheck さん、コメントありがとうございます。
#課題2
連続してファイルが置くようなシステムの場合、終わるまでalreadyが出て実行できないので、実行するjob名はユニークする必要がある。
#課題3
CloudStorageから必ずイベントが発火することを前提としているが、もしかすると、発火しない場合があるかもしれない。回避するにはschedulerで何分かおきに巡回的にみたりとしないといけないが、それでは最初からスケジューラーでやれば良くて、本末転倒な気もする。
#所感
CloudStorageにファイルが置かれたらDataflowを実行するのはよくあるケースな気がするが、この方法での記事が少ない。なぜだろう?もしかすると、他の方法が良いのかもしれない。