LoginSignup
4
1

More than 3 years have passed since last update.

Dataflow入門 wordcount以前

Last updated at Posted at 2020-06-03

概要

PythonでDataflowを使とき、公式サンプルのwordcount.py を読み解くためのメモ。

具体的な処理の説明

1. データの読み込み

wordcount.py のL.95あたり

    # Read the text file[pattern] into a PCollection.
    lines = p | 'read' >> ReadFromText(known_args.input)

中略

wordcount.py のL.119あたり

  # Write the output using a "Write" transform that has side effects.
  # pylint: disable=expression-not-assigned
  output | 'write' >> WriteToText(known_args.output)

1.1. ファイルIO

  • ローカル実行
  • ローカルのファイルの読み込み&書き出し
import apache_beam as beam
import os


def first_pipeline():
    with beam.Pipeline('DirectRunner') as pipeline:
        # ファイルを読んで、"input_data"変数に代入
        input_data = pipeline | 'ReadMyFile' >> beam.io.ReadFromText('./data/sample.txt')
        # input_dataの内容をファイルに書き出す
        input_data | beam.io.WriteToText('./data/output.txt')


if __name__ == '__main__':
    # working directoryを実行ファイルのある場所に変更
    os.chdir(os.path.dirname(os.path.abspath(__file__)))
    first_pipeline()

2. ParDo や UDF等の分散処理

処理を順次追加指定する。

    # Count the occurrences of each word.
    def count_ones(word_ones):
        (word, ones) = word_ones
        return (word, sum(ones))

    counts = (
            lines
            | 'split' >>
            (beam.ParDo(WordExtractingDoFn()).with_output_types(unicode))
            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
            | 'group' >> beam.GroupByKey()
            | 'count' >> beam.Map(count_ones))

2.2. ParDo

2.2.1. apache_beam.Map()

順番に処理する場合は、apache_beam.Map()を使う

import apache_beam as beam
import os

def run_pipeline():
    with beam.Pipeline('DirectRunner') as pipeline:

        # [Final Output PCollection] = ([Initial Input PCollection]
        #                               | [First Transform]
        #                               | [Second Transform]
        #                               | [Third Transform])
        input_data = (pipeline
        | 'read_file' >> beam.io.ReadFromText('./data/sample.txt')
        | 'add_hoge_string' >> beam.Map(lambda line: line + "hoge")
        )

        # input_dataの内容をファイルに書き出す
        input_data | beam.io.WriteToText('./data/output.txt')


if __name__ == '__main__':
    # working directoryを実行ファイルのある場所に変更
    os.chdir(os.path.dirname(os.path.abspath(__file__)))
    run_pipeline()

sample.txt の内容

Hello World
foo
bar

出力

Hello Worldhoge
foohoge
barhoge

各行の最後にhogeが追加されている。

注記)
apache_beam.Map()の代わりにapache_beam.ParDo()使うと、各文字ごとに改行される。

→ここでは深く立ち入らない。

2.2.2. ParDo

apache_beam.Map()に対して、lambda式で無名関数を作るほかに、
apache_beam.ParDo() に対して、classと関数を指定する方法もある。

transformの中での処理は、
apache_beam.DoFnを継承したクラスでprocess()をオーバーライドしたものを
apache_beam.ParDo()の引数で指定する。
下記のソースだと、AppendStringEachline クラスを指定している。

import apache_beam as beam
import os

class AppendStringEachline(beam.DoFn):

    def process(self, element):
        yield element + "hoge"


def run_pipeline():
    with beam.Pipeline('DirectRunner') as pipeline:

        from past.builtins import unicode
        input_data = (
                pipeline
                # sample.txtを読み込んで
                | 'read_file' >> beam.io.textio.ReadFromText('./data/sample.txt')
                # 各行の最後に "hoge" を追加する
                # | 'add_hoge_string' >> beam.transforms.core.Map(lambda line : line + "hoge") # つまり左記と同じ結果
                | 'add_hoge_string' >> (beam.transforms.core.ParDo(AppendStringEachline()).with_output_types(unicode))
                # .with_output_types(unicode) について:
                # apache_beam.typehints.decorators.with_output_types(*return_type_hint, **kwargs)
                # https://beam.apache.org/releases/pydoc/2.21.0/apache_beam.typehints.decorators.html#apache_beam.typehints.decorators.with_output_types
                # 返り値の型をアノテーションしている。なくても動くけどあったほうが親切。
                # from past.builtins import unicode
        )

        # input_dataの内容をファイルに書き出す
        input_data | beam.io.textio.WriteToText('./data/output.txt')


if __name__ == '__main__':
    # working directoryを実行ファイルのある場所に変更
    os.chdir(os.path.dirname(os.path.abspath(__file__)))
    run_pipeline()

コメントに記載したが、wordcount.py で、 ParDo()のあとにwith_output_types(unicode)) というdecoratorがある。
これは、 戻り値の定義された型ヒントを型チェックするdecorator
今回は、stringを返却するので、unicodeを指定している。手元のPython3.7だと、strでもOK。 あると親切だが、間違った型、例えばintを指定するとexceptionがでる。

指定できる型の詳細は、Python-Future参照 (Pythonで2系と3系が共存していた時期が長い事による負債の対策に見える)。

2.3. apache_beamを適用する前の挙動確認

wordcount.pyの下記のコードを読む前に、apache_beamなしの挙動を確認してみる。

            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
            | 'group' >> beam.GroupByKey()
            | 'count' >> beam.Map(count_ones))

apache_beamなしで書いてみたのが下記。
import にapache_beamが存在していないことに注意。
注)厳密には同じ処理でないところもあるが、だいたい同じ挙動。

import re
import os
from itertools import groupby
from operator import itemgetter


class WordExtractingDoFn():

    def __init__(self):
        pass

    def split(self, element):
        text_line = element.strip()
        if not text_line:
            self.empty_line_counter.inc(1)
        words = re.findall(r'[\w\']+', text_line, re.UNICODE) # ' ←qiitaのシンタックスハイライトバグ対策
        return words

    @staticmethod
    def add_counter_number(words):
        return (words, 1)

    def cope(self):
        readfile = './data/sample.txt'
        with open(readfile) as input_file:
            wordlist = list()

            # | 'split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(unicode))
            for line in input_file:
                words = self.split(line)
                wordlist.extend(words)

            # 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            wordlist = sorted(wordlist, key=itemgetter(0))
            word_and_count_list = list()
            for x in wordlist:
                word_and_count_list.append(self.add_counter_number(x))

            # | 'group' >> beam.GroupByKey()
            result_list = list()
            for (k, g) in groupby(word_and_count_list, key=itemgetter(0)):
                #  | 'count' >> beam.Map(count_ones)) # 厳密にはちょっと違うけど,結果は同じ
                i = 0
                for item in g:
                    i += 1
                tuple1 = k, i
                result_list.append(tuple1)

            # output | 'write' >> WriteToText(known_args.output)
            output = '\r\n'.join(map(lambda x: x[0] + ': ' + str(x[1]), result_list))
            with open('./data/output.txt', mode='w') as f:
                f.write(output)

        pass


if __name__ == '__main__':
    # working directoryを実行ファイルのある場所に変更
    os.chdir(os.path.dirname(os.path.abspath(__file__)))
    w = WordExtractingDoFn()
    w.cope()

tuple, groupby, itemgetter の使い方を確認しておく。
特に、groupbyはSQLのgroup byとは違う挙動なので注意。

2.4. コマンドライン引数での指定以外内容が揃ったので、中身を確認

wordcount.pyより主だった部分を抜粋

    # Read the text file[pattern] into a PCollection.
    lines = p | 'read' >> ReadFromText(known_args.input) # a

    # Count the occurrences of each word.
    def count_ones(word_ones):
        (word, ones) = word_ones
        return (word, sum(ones))

    counts = (
            lines
            | 'split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(unicode)) # b
            | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) # c
            | 'group' >> beam.GroupByKey() # d
            | 'count' >> beam.Map(count_ones)) # e

    # Format the counts into a PCollection of strings.
    def format_result(word_count):
        (word, count) = word_count
        return '%s: %d' % (word, count)

    output = counts | 'format' >> beam.Map(format_result)

    # Write the output using a "Write" transform that has side effects.
    # pylint: disable=expression-not-assigned
    output | 'write' >> WriteToText(known_args.output)

    result = p.run()
    result.wait_until_finish()

a ファイルの読み込み

説明ずみ

b WordExtractingDoFnクラスのprocess() メソッドを実行

process() メソッドは下記
for文の中はメトリクスの記録なので、一旦スルー
メインの処理は、(読み込みファイルから渡されたの)1行の文字列を単語に分割すること
→正規表現を使っている
element は、実質的に読み込みファイルの一行の文字列を受け取っている

    def process(self, element):
        text_line = element.strip()
        if not text_line:
            self.empty_line_counter.inc(1)
        words = re.findall(r'[\w\']+', text_line, re.UNICODE)#' 
        for w in words:
            self.words_counter.inc()
            self.word_lengths_counter.inc(len(w))
            self.word_lengths_dist.update(len(w))
        return words

入力ファイル(./data/sample.txt)

Hello World
foo
bar
foo
Hello Japan
hoge
Hello World
foo

出力ファイル()
ただし、transformの処理は下記のみを実行

| 'split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(unicode))

Hello
World
foo
bar
foo
Hello
Japan
hoge
Hello
World
foo

c 単語ごとに1をつけたtupleにする

| 'pair_with_one' >> beam.Map(lambda x: (x, 1))

入力ファイルは先程と同じ。

出力ファイル

('Hello', 1)
('World', 1)
('foo', 1)
('bar', 1)
('foo', 1)
('Hello', 1)
('Japan', 1)
('hoge', 1)
('Hello', 1)
('World', 1)
('foo', 1)

d 単語ごとに出現回数を集約

| 'group' >> beam.GroupByKey()

出力結果

('Hello', [1, 1, 1])
('World', [1, 1])
('foo', [1, 1, 1])
('bar', [1])
('Japan', [1])
('hoge', [1])

e 単語ごとに出現回数の合計を算出

count_ones メソッドの(並列)実行

| 'count' >> beam.Map(count_ones)

メソッドの定義に動作時の挙動例を追加

def count_ones(word_ones): # word_ones:tuple  ('Hello',[1,1,1])
    (word, ones) = word_ones # word: 'Hello', ones:[1,1,1]
    return (word, sum(ones))  # tupleの合計を算出

2.5 コマンドライン引数関連

argparse パッケージ

https://docs.python.org/ja/3/howto/argparse.html

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input',
      dest='input',
      default='gs://dataflow-samples/shakespeare/kinglear.txt',
      help='Input file to process.')
  parser.add_argument(
      '--output',
      dest='output',
      required=True,
      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)

書いてあるとおり、実行時に:

--input input_file_uri で、入力ファイルを指定。 デフォルト値は'gs://dataflow-samples/shakespeare/kinglear.txt' つまり、GCS上。 もちろんローカル指定も可能。

--output output_file_uri で、出力ファイルを指定。

コマンド例

python wordcount.py --input ./data/input_sample.txt --output ./data/output

--output YOUR_OUTPUT_FILE
で実行すると、 YOUR_OUTPUT_FILE-00000-of-00001 のような形式で出力されるので、
拡張子を指定するときは注意。

apache_beam本体の詳細なオプションについては、必要になったときに実例を見ながら
ドキュメントを見るので良さそう というか今以上に読み解けない
- Apache Beamのドキュメント
- https://beam.apache.org/get-started/wordcount-example/
- GCPのドキュメント
- https://cloud.google.com/dataflow/docs/guides/specifying-exec-params?hl=ja
- https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline?hl=ja

概念編

関連の資料
1. Apache Beam のプログラミング モデル
- 簡潔な用語集。日本語なので、最初に通読するには良いです。

  1. Apache Beam (Dataflow) 実践入門【Python】
    • すでにQiitaに記事があるので、公式ドキュメント(英語)以外だとこれを読むのが良さそう。
  2. Apache Beam公式Document
    • 公式ドキュメント(英語のみ)。日本語情報は見当たらず。

以上から、1.を読む。2.を読む。2.でわからないことがあれば3.を(ヒィヒィ言いながら)読むという位置づけだと効率が良さそう。

4
1
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1