LoginSignup
11
11

More than 5 years have passed since last update.

Cloud Dataflow PythonSDKでBigQueryからデータを読み出し、加工してBigQueryとGCSへ書き出す

Posted at

DataflowのサンプルをベースにBigQueryからデータを読み出し、加工してBigQueryとGCSの両方へ書き出すことを試したときのメモ。

実行環境

GCP CloudShellを利用。
pipはCloud Shellではデフォルトでインストールされてるのでスキップ(upgradeは必要だったので実施)し、cloud-dataflow SDKをインストールする。

$ sudo pip install --upgrade pip
$ sudo pip install google-cloud-dataflow

/usr/local/lib/python2.7/dist-packages/apache_beam/examples配下にいくつかのサンプルコードがあり、その中からbigquery_tornadoes.pyをベースに試す。

実行

$ export PROJECT={PJ名}
$ export BUCKET={gs://バケット名}

このワークフローはmonthとtornadoフィールドをテーブルから取得し、各月のtornadoの数を出力する。
以下のコマンドを実行し、ワークフローを実行する。stagingとtempロケーションを指定する必要がある。

$ python -m test --project $PROJECT --job_name $PROJECT-test --runner BlockingDataflowPipelineRunner --staging_location $BUCKET/staging --temp_location $BUCKET/temp

正常に終了すると下図のようなワークフローをGCP WebConsleから確認することができる。

スクリーンショット 2016-09-28 8.40.20.png

スクリーンショット 2016-09-23 9.01.06.png

コードの確認

以下のパッケージをimportする。

#__future__モジュールはPython3系に実装されているPython2系と互換性の無い機能をPython2系で使用できるようにする。相対インポートではなく、絶対インポート優先にする。
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam ※apache_beam(dataflow)のインポート

コマンドライン引数として"--input"を定義し、デフォルトで'clouddataflow-readonly:samples.weather_stations'を読み込む。

def run(argv=None):
  parser = argparse.ArgumentParser()
  parser.add_argument('--input',
                      default='clouddataflow-readonly:samples.weather_stations',
                      help=('Input BigQuery table to process specified as: '
                            'PROJECT:DATASET.TABLE or DATASET.TABLE.'))

known_argsには--inputや--outputを指定したときのパラメターが入るが、今回は指定しないようにしたので何も入らない。piepeline_argsにはpipelineに関するパラメータ(input,output以外のパラメータ)が入る。

  known_args, pipeline_args = parser.parse_known_args(argv)

パイプラインを作成する。runnerはコマンドライン引数で設定するようにしている。もし直接設定するなら"beam.Pipeline('DirectPipelineRunner')みたいに書けばよい。pはapache_beam.pipeline.Pipelineオブジェクト。
DirectPipelineRunner以外に, ”BlockingDataflowPipelineRunner”、 ”DataflowPipelineRunner”が指定できる。

  p = beam.Pipeline(argv=pipeline_args)

BigQueryからデータを読み込んで、PCollectionにセットする。pipeオペレータ(|)を使用してtransformを適用する。記述方法は以下のとおり。

<PCollection_name> | <transform_name>

beam.io.BigQuerySourceの戻りは
PCollection[read.None]
beam.io.Readメソッドの戻りは
<Read(PTransform) label=[Read]>オブジェクト

rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input))

count_tornadesをコールする。

counts = count_tornadoes(rows)

count_tornadesはmonthとtornadoをキーとした情報のPCollection型のinput_dataを入力とし、monthとtornado_countをキーとしたPCollectionを返す。
beam.FlatMapで各行をwordに分割。
beam.CombinePerKeyはPCollectionがkeyによってグループ化し、平均や合計などの演算を行う。
beam.Mapで各要素をPCollectionにMapする。

def count_tornadoes(input_data):
  return (input_data
          | 'months with tornatoes' >> beam.FlatMap(
              lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
          | 'monthly count' >> beam.CombinePerKey(sum)
          | 'format' >> beam.Map(
              lambda (k, v): {'month': k, 'tornado_count': v}))

outputはソースに直接埋めて、コマンドライン引数では受け取らないように変更。
BigQueryへの書き出しはbeam.io.BigQuerySinkを用いる。

counts | beam.io.Write(
      'writeBQ',
      beam.io.BigQuerySink(
          'PJ名:データセット名.テーブル名',
          schema='month:INTEGER, tornado_count:INTEGER',
          create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
          write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))

CloudStorageへの書き出しはbeam.io.TextFileSinkを利用。デフォルトでは複数のファイルが生成されるので、num_shards=1を指定して1ファイルのみ生成するようにする。

counts | 'writeGCS' >> beam.io.Write(beam.io.TextFileSink('gs://XXX/count.txt',num_shards=1))

参考URL
https://cloud.google.com/dataflow/pipelines/constructing-your-pipeline
https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python
https://cloud.google.com/dataflow/model/pipelines
https://cloud.google.com/dataflow/model/pcollection
https://cloud.google.com/dataflow/service/dataflow-service-desc
https://cloud.google.com/dataflow/examples/wordcount-example
https://cloud.google.com/dataflow/examples/wordcount-example
https://cloud.google.com/dataflow/model/text-io
https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/fileio.py

11
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
11