LoginSignup
3
4

More than 3 years have passed since last update.

Dataflowメモ ParDo編

Last updated at Posted at 2020-09-01

元ネタ

Dataflow とは

こちらをご覧ください。

ParDo とは

ParDoは、並列にTranformするための汎用的な処理関数です。

ParDoは、データの塊であるPCollectionが入力された時にその中の要素一つ一つに対して、ユーザが実装した任意の処理をかけ、その処理結果を出力のPCollectionにします。

使い方は、こんな感じです。

ParDo1.py
class SplitWords(beam.DoFn):

    def process(self, element):
        return [element.split()]

with beam.Pipeline() as p:
    len = (
        p
        | 'Create Data' >> beam.Create(['Cloud Dataflow is a distributed parallel processing service.',
                                       'BigQuery is a powerful Data WareHouse.',
                                       'Cloud Pub/Sub is a scalable messaging service.'])
        | 'Split into words' >> beam.ParDo(SplitWords())
        | 'Print' >> beam.Map(print)
        )

ParDoでさせる処理は、beam.DoFnを継承したクラスに実装する。
processメソッドに実際の処理を書く。
引数に取っている element がPCollectionのそれぞれのレコードになるので、ここではその文字列にsplit()を呼んでいる。
最後にreturnで処理結果を配列にして返す。

yield element.split()

で返してもよい

出力はこんな感じ。

['Cloud', 'Dataflow', 'is', 'a', 'distributed', 'parallel', 'processing', 'service.']
['BigQuery', 'is', 'a', 'powerful', 'Data', 'WareHouse.']
['Cloud', 'Pub/Sub', 'is', 'a', 'scalable', 'messaging', 'service.']

ParDo のメソッド

DoFn.setup()

DoFn インスタンスが初期化される際に、インスタンスにつき一度呼ばれる。ワーカーにつき一回以上呼ばれる可能性があります。データベースやネットワークなどの接続処理をここでしておくと良い。

DoFn.start_bundle()

エレメントの塊に対して一度呼ばれる。最初のelementprocessがコールされる前に呼ばれる。エレメントの塊の処理開始を追跡するのによい。

DoFn.process(element, *args, **kwargs)

element ごとに呼ばれる。0以上のelementを生み出す。ParDoの引数を通して、*args**kwargsを引数として受け取れる。

DoFn.finish_bundle()

エレメントの塊に対して一度呼ばれる。最後のelementprocessがコールされた後に呼ばれる。0以上のelementを生み出す。データベースクエリ実行のような、塊の最後で行うバッチを実行するのに良いところ。

start_bundle と finish_bundle の使い方イメージ

例えば、start_bundle でバッチを初期化しておき、process でreturnないしはyield する代わりにそのバッチにelementを追加し、最後にfinish_bundle でクエリを実行して結果を出力する、ってな使い方。

DoFn.teardown()

DoFnインスタンスが終了する時に、インスタンスごとに一度呼ばれる。(ただ、ベストエフォートらしい。ワーカーがクラッシュしたら実行されないという意味。)である点に注意。データベースやネットワークのコネクションクローズなどに最適。

TimestampやWindow情報を使う

elementが持つ時間(event_timeと呼ばれるもの)やwindowの開始時間や終了時間などの情報を以下のようにして取得することができる。

class AnalyzeElement(beam.DoFn):
  def process(self, elem, 
              timestamp=beam.DoFn.TimestampParam,
              window=beam.DoFn.WindowParam):
    yield '\n'.join(['# timestamp',
                     'type(timestamp) -> ' + repr(type(timestamp)),
                     'timestamp.micros -> ' + repr(timestamp.micros),
                     'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
                     'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
                     '',
                     '# window',
                     'type(window) -> ' + repr(type(window)),
                     'window.start -> {} ({})'.format(
                         window.start, window.start.to_utc_datetime()),
                     'window.end -> {} ({})'.format(
                         window.end, window.end.to_utc_datetime()),
                     'window.max_timestamp() -> {} ({})'.format(
                         window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
    ])
3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4