14
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Google Cloud Platform(2)Advent Calendar 2016

Day 4

Cloud DataflowでBigQueryから取得したデータを加工してS3に出力する

Last updated at Posted at 2016-12-03

はじめに

この記事は、Google Cloud Platform(2) Advent Calendar 2016 4日目の記事です。
DataflowからS3へファイルを出力する方法について紹介します。

概要

Cloud DataflowではGoogle Cloud StorageやBigQuery、Cloud PubSubなどに対してI/Oを行うライブラリが提供されているが、S3などAWS系のI/Oは現在のところ提供されていない。
そこで今回はS3へ出力するCustom Sink(S3Sink)の作成を行い、Dataflowを使ってBigQueryから取得したデータを加工して、その結果をS3へ出力するパイプラインを作成した。

今回作成するパイプラインの概要は下記の表のとおりである。

デザイン 備考
入力データ BigQuery (マスタテーブルとマッピングテーブルの2つ)
出力データ S3
ファイルフォーマット csv
変換 idの中でマッピングテーブルに該当するidを対応する値に変換する
開発環境 Dataflow Python SDK
スクリーンショット 2016-12-01 21.30.45.png

S3Sinkの作成

S3Sinkは以下のような感じで作成した。

  • FileSinkを継承 
  • initialize_write: GCSの一時ディレクトリを設定
  • write_record: FileSinkのwrite_recordをコール
  • write_encode_record: レコードを書き込み、改行を入れる
  • open_writer:GCSにファイルを出力
  • finalize_writeでGCSからファイルを取得し、一つのファイルにマージする。botoを使用してS3にマージしたファイルをアップロード

open_writerでS3に書き込んでいたが処理速度があまりに遅かったため、一度GCSに書き込んでからマージして、finalize_writeでアップロードするようにした。

カスタムシンクを利用してDataflowを実行するのに必要な準備

以下のようなフォルダ構造でs3sink.pyを配置し、"python setup.py sdist"を実行して、s3sink.pyをパッケージングする。dataflowを実行するとき--extra_packageオプションでパッケージファイルを指定する。

s3_custom_lib
    |-dist
    |-s3package
         |-__init__.py
         |-s3sink.py
    |-setup.py

Dataflow

1.BigQueryに対してクエリをかけて、その結果をPCollectionで受け取る

rows = p | 'readBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=custom_options.query))

2.Mappging用のテーブルに対してもクエリをかけてデータを取得する

mapping = p | 'readBigQuery(MappingTable)' >> beam.io.Read(beam.io.BigQuerySource(query=custom_options.mappingquery))

3.CoGroupByKeyで2つのテーブルをJoinする。BigQueryでクエリをかけた結果はdictなので、tupleに変換してからCoGroupByKeyにかける。

join_data = {'rows': convert_tuple_main(rows), 'mapping': convert_tuple_mapping(mapping)} | beam.CoGroupByKey()

4.ParDoでIDを変換する。
この段階では各要素は以下のような形式になっている。

{"id xxx",{{"row":""},{"mapping":"mapped_id xxx"}}}

mapped_idにある文字列が入ってるIDだけ変換する処理をDoFnで記載する。

result = join_data | 'Replace ID' >> beam.ParDo(IdReplaceFn())

5.処理結果をS3にアップロードする

awscredentialfile = gcsio.GcsIO().open(filename=segment_options.awscredential)
        awscredential = awscredentialfile.read()
        result | 'writeS3' >> beam.io.Write(s3sink.S3Sink(awscredential, custom_options))

6.パイプラインを実行する

python run_dataflow --project {PJ_NAME} --job_name {JOB_NAME} --runner BlockingDataflowPipelineRunner --staging_location gs://xxx/stagin g --temp_location gs://xxx/temp --extra_package s3_custom_lib/dist/s3package-1.0.tar.gz

実行するとGCPのWebコンソールで以下のような図が出力されることを確認できる。

スクリーンショット 2016-12-01 21.32.56.png

360万件のデータを処理するのに16分程度要した。処理中3vcpuしか使用されてなかったので、もう少し検討が必要。

#最後に
Dataflow のPython SDKはβ版なので現段階ではストリーミング処理機能が実装されてないなど、Prodocutionで利用するには推奨されてませんので、注意が必要です。
明日12/5はsoundTrickerによるGAEに関する発表です。

14
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?