はじめに
この記事は、Google Cloud Platform(2) Advent Calendar 2016 4日目の記事です。
DataflowからS3へファイルを出力する方法について紹介します。
概要
Cloud DataflowではGoogle Cloud StorageやBigQuery、Cloud PubSubなどに対してI/Oを行うライブラリが提供されているが、S3などAWS系のI/Oは現在のところ提供されていない。
そこで今回はS3へ出力するCustom Sink(S3Sink)の作成を行い、Dataflowを使ってBigQueryから取得したデータを加工して、その結果をS3へ出力するパイプラインを作成した。
今回作成するパイプラインの概要は下記の表のとおりである。
デザイン | 備考 |
---|---|
入力データ | BigQuery (マスタテーブルとマッピングテーブルの2つ) |
出力データ | S3 |
ファイルフォーマット | csv |
変換 | idの中でマッピングテーブルに該当するidを対応する値に変換する |
開発環境 | Dataflow Python SDK |
S3Sinkの作成
S3Sinkは以下のような感じで作成した。
- FileSinkを継承
- initialize_write: GCSの一時ディレクトリを設定
- write_record: FileSinkのwrite_recordをコール
- write_encode_record: レコードを書き込み、改行を入れる
- open_writer:GCSにファイルを出力
- finalize_writeでGCSからファイルを取得し、一つのファイルにマージする。botoを使用してS3にマージしたファイルをアップロード
open_writerでS3に書き込んでいたが処理速度があまりに遅かったため、一度GCSに書き込んでからマージして、finalize_writeでアップロードするようにした。
カスタムシンクを利用してDataflowを実行するのに必要な準備
以下のようなフォルダ構造でs3sink.pyを配置し、"python setup.py sdist"を実行して、s3sink.pyをパッケージングする。dataflowを実行するとき--extra_packageオプションでパッケージファイルを指定する。
s3_custom_lib
|-dist
|-s3package
|-__init__.py
|-s3sink.py
|-setup.py
Dataflow
1.BigQueryに対してクエリをかけて、その結果をPCollectionで受け取る
rows = p | 'readBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=custom_options.query))
2.Mappging用のテーブルに対してもクエリをかけてデータを取得する
mapping = p | 'readBigQuery(MappingTable)' >> beam.io.Read(beam.io.BigQuerySource(query=custom_options.mappingquery))
3.CoGroupByKeyで2つのテーブルをJoinする。BigQueryでクエリをかけた結果はdictなので、tupleに変換してからCoGroupByKeyにかける。
join_data = {'rows': convert_tuple_main(rows), 'mapping': convert_tuple_mapping(mapping)} | beam.CoGroupByKey()
4.ParDoでIDを変換する。
この段階では各要素は以下のような形式になっている。
{"id xxx",{{"row":""},{"mapping":"mapped_id xxx"}}}
mapped_idにある文字列が入ってるIDだけ変換する処理をDoFnで記載する。
result = join_data | 'Replace ID' >> beam.ParDo(IdReplaceFn())
5.処理結果をS3にアップロードする
awscredentialfile = gcsio.GcsIO().open(filename=segment_options.awscredential)
awscredential = awscredentialfile.read()
result | 'writeS3' >> beam.io.Write(s3sink.S3Sink(awscredential, custom_options))
6.パイプラインを実行する
python run_dataflow --project {PJ_NAME} --job_name {JOB_NAME} --runner BlockingDataflowPipelineRunner --staging_location gs://xxx/stagin g --temp_location gs://xxx/temp --extra_package s3_custom_lib/dist/s3package-1.0.tar.gz
実行するとGCPのWebコンソールで以下のような図が出力されることを確認できる。
360万件のデータを処理するのに16分程度要した。処理中3vcpuしか使用されてなかったので、もう少し検討が必要。
#最後に
Dataflow のPython SDKはβ版なので現段階ではストリーミング処理機能が実装されてないなど、Prodocutionで利用するには推奨されてませんので、注意が必要です。
明日12/5はsoundTrickerによるGAEに関する発表です。