LoginSignup
6
4

More than 5 years have passed since last update.

PythonでApache Beamを触ってみる

Last updated at Posted at 2017-08-17

Python 2.7でApache Beamを触ってみる

Apache BeamはGoogleが自身のクラウドサービスDataflowの中で使っているフレームワーク。
バッチ処理もストリーミング処理も同じ書き方ができるというのが売り。
今回はPythonでミニマムなプログラムを書いて実行してみた。

Apache Beamインストール前提

Apache Beamをインストール

  • Python 2.7の環境に入ってから下記を実行。
  • ちなみにPython 3.5だとcythonでエラーが出て終わります。
  • わたしの環境だと
pip install apache-beam

下記のようなプログラムを書いてみる

  • Python用のプログラミングガイドを斜め読みして3個くらいソースをコピペして下記を作ってみた。
  • Apache Beam Programming Guide
  • 処理内容
    1. シェークスピアの4行を入力とする
    2. 各行の文字数をlen関数でカウントする
    3. 文字数を標準出力に出して終わる(本来はDBか何かに書き出して終わるが今回は割愛)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# まずパイプラインを作る
p = beam.Pipeline(options=PipelineOptions())

# 1.配列をパイプラインの入力に設定(4行を入力とする)
lines = (p
       | beam.Create([
           'To be, or not to be: that is the question: ',
           'Whether \'tis nobler in the mind to suffer ',
           'The slings and arrows of outrageous fortune, ',
           'Or to take arms against a sea of troubles, ']))

# 2.変換処理として各行の文字列カウントを設定
word_lengths = lines | beam.Map(len)

# 3.最後に標準出力にカウント数を出力して終わる
class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    print(element)
p_end = word_lengths | beam.ParDo(ExtractWordsFn())
p.run()
<apache_beam.runners.direct.direct_runner.DirectPipelineResult at 0x7f6b69c92950>
43
42
45
43

まとめ

  • かんたんではあるが一応これでPythonからApache Beamのバッチ処理が実行できた。インストールから実装、実行までだいたい1時間くらい。
  • 今後ストリーミング処理を実行したい。またSpark Streamingなどをエンジンとして利用できるようなので、それも試したい。
  • 上記をWindows上のBash on windows+Jupyterから実行できてハッピーだった。

その他参考文献

6
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
4