Apache BeamのJobをCloud Dataflowにデプロイするために、以下のようなコマンドを使うことができます。
python -m apache_beam.examples.wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://<your-gcs-bucket>/counts \
--runner DataflowRunner \
--project your-gcp-project \
--region your-gcp-region \
--temp_location gs://<your-gcs-bucket>/tmp/
出典:
なお、ここで使っている apache_beam.examples.wordcount
の実体は以下のファイルに書かれています。
上記のPythonコマンドを実行すると、JOBが完了するまでブロッキングされ、完了のタイミングでshellに戻ってきます。
これは直ぐに終わるバッチJOBであれば特に問題ない挙動です。
ですが、ストリーミングJOBを動かす場合は問題になることが多いです。
ストリーミングJOBはUnboundデータを扱うために、「全てのデータを処理し終わる」という概念がありません。
そのため、JOBの完了待ちをすると、無限ループに陥ってしまいます。
解決策
JOBの完了待ちを行っている処理は、DataflowPipelineResultクラスのwait_until_finishメソッドなので、このメソッドにモンキーパッチを当てます。
if __name__ == '__main__':
DataflowPipelineResult.wait_until_finish = lambda duration=None: PipelineState.DONE
run()
これで、JOBが生成されたら即座にshellに戻すことができます。