DataflowのサンプルをベースにBigQueryからデータを読み出し、加工してBigQueryとGCSの両方へ書き出すことを試したときのメモ。
#実行環境
GCP CloudShellを利用。
pipはCloud Shellではデフォルトでインストールされてるのでスキップ(upgradeは必要だったので実施)し、cloud-dataflow SDKをインストールする。
$ sudo pip install --upgrade pip
$ sudo pip install google-cloud-dataflow
/usr/local/lib/python2.7/dist-packages/apache_beam/examples配下にいくつかのサンプルコードがあり、その中からbigquery_tornadoes.pyをベースに試す。
#実行
$ export PROJECT={PJ名}
$ export BUCKET={gs://バケット名}
このワークフローはmonthとtornadoフィールドをテーブルから取得し、各月のtornadoの数を出力する。
以下のコマンドを実行し、ワークフローを実行する。stagingとtempロケーションを指定する必要がある。
$ python -m test --project $PROJECT --job_name $PROJECT-test --runner BlockingDataflowPipelineRunner --staging_location $BUCKET/staging --temp_location $BUCKET/temp
正常に終了すると下図のようなワークフローをGCP WebConsleから確認することができる。
#コードの確認
以下のパッケージをimportする。
#__future__モジュールはPython3系に実装されているPython2系と互換性の無い機能をPython2系で使用できるようにする。相対インポートではなく、絶対インポート優先にする。
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam ※apache_beam(dataflow)のインポート
コマンドライン引数として"--input"を定義し、デフォルトで'clouddataflow-readonly:samples.weather_stations'を読み込む。
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
default='clouddataflow-readonly:samples.weather_stations',
help=('Input BigQuery table to process specified as: '
'PROJECT:DATASET.TABLE or DATASET.TABLE.'))
known_argsには--inputや--outputを指定したときのパラメターが入るが、今回は指定しないようにしたので何も入らない。piepeline_argsにはpipelineに関するパラメータ(input,output以外のパラメータ)が入る。
known_args, pipeline_args = parser.parse_known_args(argv)
パイプラインを作成する。runnerはコマンドライン引数で設定するようにしている。もし直接設定するなら"beam.Pipeline('DirectPipelineRunner')みたいに書けばよい。pはapache_beam.pipeline.Pipelineオブジェクト。
DirectPipelineRunner以外に, ”BlockingDataflowPipelineRunner”、 ”DataflowPipelineRunner”が指定できる。
p = beam.Pipeline(argv=pipeline_args)
BigQueryからデータを読み込んで、PCollectionにセットする。pipeオペレータ(|)を使用してtransformを適用する。記述方法は以下のとおり。
<PCollection_name> | <transform_name>
beam.io.BigQuerySourceの戻りは
PCollection[read.None]
beam.io.Readメソッドの戻りは
<Read(PTransform) label=[Read]>オブジェクト
rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input))
count_tornadesをコールする。
counts = count_tornadoes(rows)
count_tornadesはmonthとtornadoをキーとした情報のPCollection型のinput_dataを入力とし、monthとtornado_countをキーとしたPCollectionを返す。
beam.FlatMapで各行をwordに分割。
beam.CombinePerKeyはPCollectionがkeyによってグループ化し、平均や合計などの演算を行う。
beam.Mapで各要素をPCollectionにMapする。
def count_tornadoes(input_data):
return (input_data
| 'months with tornatoes' >> beam.FlatMap(
lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
| 'monthly count' >> beam.CombinePerKey(sum)
| 'format' >> beam.Map(
lambda (k, v): {'month': k, 'tornado_count': v}))
outputはソースに直接埋めて、コマンドライン引数では受け取らないように変更。
BigQueryへの書き出しはbeam.io.BigQuerySinkを用いる。
counts | beam.io.Write(
'writeBQ',
beam.io.BigQuerySink(
'PJ名:データセット名.テーブル名',
schema='month:INTEGER, tornado_count:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
CloudStorageへの書き出しはbeam.io.TextFileSinkを利用。デフォルトでは複数のファイルが生成されるので、num_shards=1を指定して1ファイルのみ生成するようにする。
counts | 'writeGCS' >> beam.io.Write(beam.io.TextFileSink('gs://XXX/count.txt',num_shards=1))
参考URL
https://cloud.google.com/dataflow/pipelines/constructing-your-pipeline
https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python
https://cloud.google.com/dataflow/model/pipelines
https://cloud.google.com/dataflow/model/pcollection
https://cloud.google.com/dataflow/service/dataflow-service-desc
https://cloud.google.com/dataflow/examples/wordcount-example
https://cloud.google.com/dataflow/examples/wordcount-example
https://cloud.google.com/dataflow/model/text-io
https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/fileio.py