2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

ZOZOAdvent Calendar 2021

Day 11

DataflowのストリーミングJOBをデプロイするときの無限ループの回避

Last updated at Posted at 2021-12-10

Apache BeamのJobをCloud Dataflowにデプロイするために、以下のようなコマンドを使うことができます。

python -m apache_beam.examples.wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
                                         --output gs://<your-gcs-bucket>/counts \
                                         --runner DataflowRunner \
                                         --project your-gcp-project \
                                         --region your-gcp-region \
                                         --temp_location gs://<your-gcs-bucket>/tmp/

出典:

なお、ここで使っている apache_beam.examples.wordcount の実体は以下のファイルに書かれています。

上記のPythonコマンドを実行すると、JOBが完了するまでブロッキングされ、完了のタイミングでshellに戻ってきます。
これは直ぐに終わるバッチJOBであれば特に問題ない挙動です。
ですが、ストリーミングJOBを動かす場合は問題になることが多いです。
ストリーミングJOBはUnboundデータを扱うために、「全てのデータを処理し終わる」という概念がありません。
そのため、JOBの完了待ちをすると、無限ループに陥ってしまいます。

解決策

JOBの完了待ちを行っている処理は、DataflowPipelineResultクラスのwait_until_finishメソッドなので、このメソッドにモンキーパッチを当てます。

if __name__ == '__main__':
  DataflowPipelineResult.wait_until_finish = lambda duration=None: PipelineState.DONE
  run()

これで、JOBが生成されたら即座にshellに戻すことができます。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?