元ネタ
Dataflow とは
こちらをご覧ください。
ParDo とは
ParDoは、並列にTranformするための汎用的な処理関数です。
ParDoは、データの塊であるPCollectionが入力された時にその中の要素一つ一つに対して、ユーザが実装した任意の処理をかけ、その処理結果を出力のPCollectionにします。
使い方は、こんな感じです。
class SplitWords(beam.DoFn):
def process(self, element):
return [element.split()]
with beam.Pipeline() as p:
len = (
p
| 'Create Data' >> beam.Create(['Cloud Dataflow is a distributed parallel processing service.',
'BigQuery is a powerful Data WareHouse.',
'Cloud Pub/Sub is a scalable messaging service.'])
| 'Split into words' >> beam.ParDo(SplitWords())
| 'Print' >> beam.Map(print)
)
ParDoでさせる処理は、beam.DoFn
を継承したクラスに実装する。
process
メソッドに実際の処理を書く。
引数に取っている element
がPCollectionのそれぞれのレコードになるので、ここではその文字列にsplit()を呼んでいる。
最後にreturnで処理結果を配列にして返す。
yield element.split()
で返してもよい
出力はこんな感じ。
['Cloud', 'Dataflow', 'is', 'a', 'distributed', 'parallel', 'processing', 'service.']
['BigQuery', 'is', 'a', 'powerful', 'Data', 'WareHouse.']
['Cloud', 'Pub/Sub', 'is', 'a', 'scalable', 'messaging', 'service.']
ParDo のメソッド
DoFn.setup()
DoFn
インスタンスが初期化される際に、インスタンスにつき一度呼ばれる。ワーカーにつき一回以上呼ばれる可能性があります。データベースやネットワークなどの接続処理をここでしておくと良い。
DoFn.start_bundle()
エレメントの塊に対して一度呼ばれる。最初のelement
でprocess
がコールされる前に呼ばれる。エレメントの塊の処理開始を追跡するのによい。
DoFn.process(element, *args, **kwargs)
element
ごとに呼ばれる。0以上のelement
を生み出す。ParDoの引数を通して、*args
と**kwargs
を引数として受け取れる。
DoFn.finish_bundle()
エレメントの塊に対して一度呼ばれる。最後のelement
でprocess
がコールされた後に呼ばれる。0以上のelement
を生み出す。データベースクエリ実行のような、塊の最後で行うバッチを実行するのに良いところ。
start_bundle と finish_bundle の使い方イメージ
例えば、start_bundle
でバッチを初期化しておき、process
でreturnないしはyield する代わりにそのバッチにelementを追加し、最後にfinish_bundle
でクエリを実行して結果を出力する、ってな使い方。
DoFn.teardown()
DoFn
インスタンスが終了する時に、インスタンスごとに一度呼ばれる。(ただ、ベストエフォートらしい。ワーカーがクラッシュしたら実行されないという意味。)である点に注意。データベースやネットワークのコネクションクローズなどに最適。
TimestampやWindow情報を使う
elementが持つ時間(event_timeと呼ばれるもの)やwindowの開始時間や終了時間などの情報を以下のようにして取得することができる。
class AnalyzeElement(beam.DoFn):
def process(self, elem,
timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam):
yield '\n'.join(['# timestamp',
'type(timestamp) -> ' + repr(type(timestamp)),
'timestamp.micros -> ' + repr(timestamp.micros),
'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
'',
'# window',
'type(window) -> ' + repr(type(window)),
'window.start -> {} ({})'.format(
window.start, window.start.to_utc_datetime()),
'window.end -> {} ({})'.format(
window.end, window.end.to_utc_datetime()),
'window.max_timestamp() -> {} ({})'.format(
window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
])