Python
python2.7
gcp
apache-beam

PythonでApache Beamを触ってみる

Python 2.7でApache Beamを触ってみる

Apache BeamはGoogleが自身のクラウドサービスDataflowの中で使っているフレームワーク。
バッチ処理もストリーミング処理も同じ書き方ができるというのが売り。
今回はPythonでミニマムなプログラムを書いて実行してみた。

Apache Beamインストール前提

Apache Beamをインストール

  • Python 2.7の環境に入ってから下記を実行。
  • ちなみにPython 3.5だとcythonでエラーが出て終わります。
  • わたしの環境だと
pip install apache-beam

下記のようなプログラムを書いてみる

  • Python用のプログラミングガイドを斜め読みして3個くらいソースをコピペして下記を作ってみた。
  • Apache Beam Programming Guide
  • 処理内容
    1. シェークスピアの4行を入力とする
    2. 各行の文字数をlen関数でカウントする
    3. 文字数を標準出力に出して終わる(本来はDBか何かに書き出して終わるが今回は割愛)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# まずパイプラインを作る
p = beam.Pipeline(options=PipelineOptions())

# 1.配列をパイプラインの入力に設定(4行を入力とする)
lines = (p
       | beam.Create([
           'To be, or not to be: that is the question: ',
           'Whether \'tis nobler in the mind to suffer ',
           'The slings and arrows of outrageous fortune, ',
           'Or to take arms against a sea of troubles, ']))

# 2.変換処理として各行の文字列カウントを設定
word_lengths = lines | beam.Map(len)

# 3.最後に標準出力にカウント数を出力して終わる
class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    print(element)
p_end = word_lengths | beam.ParDo(ExtractWordsFn())
p.run()
<apache_beam.runners.direct.direct_runner.DirectPipelineResult at 0x7f6b69c92950>
43
42
45
43

まとめ

  • かんたんではあるが一応これでPythonからApache Beamのバッチ処理が実行できた。インストールから実装、実行までだいたい1時間くらい。
  • 今後ストリーミング処理を実行したい。またSpark Streamingなどをエンジンとして利用できるようなので、それも試したい。
  • 上記をWindows上のBash on windows+Jupyterから実行できてハッピーだった。

その他参考文献