Python 2.7でApache Beamを触ってみる
Apache BeamはGoogleが自身のクラウドサービスDataflowの中で使っているフレームワーク。
バッチ処理もストリーミング処理も同じ書き方ができるというのが売り。
今回はPythonでミニマムなプログラムを書いて実行してみた。
Apache Beamインストール前提
- 前提はPython 2.7。
- Python 3系しかない人やもともとPythonを使っていない人は、Pythonの環境を用意します。
- 私はPython 3系のconda環境だけだったので、conda installでpython2.7環境を作りました。参考:Python2, Python3 を切り替えて jupyter notebook を使う
- Virtualenvを用いたインストール方法はBeam本家ドキュメント通り。 Apache Beam Python SDK Quickstart
- Google のGCPを使うならGCPドキュメント。Python を使用したクイックスタート
- Python 3系しかない人やもともとPythonを使っていない人は、Pythonの環境を用意します。
Apache Beamをインストール
- Python 2.7の環境に入ってから下記を実行。
- ちなみにPython 3.5だとcythonでエラーが出て終わります。
- わたしの環境だと
pip install apache-beam
下記のようなプログラムを書いてみる
- Python用のプログラミングガイドを斜め読みして3個くらいソースをコピペして下記を作ってみた。
- Apache Beam Programming Guide
- 処理内容
- シェークスピアの4行を入力とする
- 各行の文字数をlen関数でカウントする
- 文字数を標準出力に出して終わる(本来はDBか何かに書き出して終わるが今回は割愛)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# まずパイプラインを作る
p = beam.Pipeline(options=PipelineOptions())
# 1.配列をパイプラインの入力に設定(4行を入力とする)
lines = (p
| beam.Create([
'To be, or not to be: that is the question: ',
'Whether \'tis nobler in the mind to suffer ',
'The slings and arrows of outrageous fortune, ',
'Or to take arms against a sea of troubles, ']))
# 2.変換処理として各行の文字列カウントを設定
word_lengths = lines | beam.Map(len)
# 3.最後に標準出力にカウント数を出力して終わる
class ExtractWordsFn(beam.DoFn):
def process(self, element):
print(element)
p_end = word_lengths | beam.ParDo(ExtractWordsFn())
p.run()
- Jupyterで実行した結果
<apache_beam.runners.direct.direct_runner.DirectPipelineResult at 0x7f6b69c92950>
43
42
45
43
まとめ
- かんたんではあるが一応これでPythonからApache Beamのバッチ処理が実行できた。インストールから実装、実行までだいたい1時間くらい。
- 今後ストリーミング処理を実行したい。またSpark Streamingなどをエンジンとして利用できるようなので、それも試したい。
- 上記をWindows上のBash on windows+Jupyterから実行できてハッピーだった。