概要
PythonでDataflowを使とき、公式サンプルのwordcount.py を読み解くためのメモ。
具体的な処理の説明
1. データの読み込み
wordcount.py のL.95あたり
# Read the text file[pattern] into a PCollection.
lines = p | 'read' >> ReadFromText(known_args.input)
中略
wordcount.py のL.119あたり
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | 'write' >> WriteToText(known_args.output)
1.1. ファイルIO
- ローカル実行
- ローカルのファイルの読み込み&書き出し
import apache_beam as beam
import os
def first_pipeline():
with beam.Pipeline('DirectRunner') as pipeline:
# ファイルを読んで、"input_data"変数に代入
input_data = pipeline | 'ReadMyFile' >> beam.io.ReadFromText('./data/sample.txt')
# input_dataの内容をファイルに書き出す
input_data | beam.io.WriteToText('./data/output.txt')
if __name__ == '__main__':
# working directoryを実行ファイルのある場所に変更
os.chdir(os.path.dirname(os.path.abspath(__file__)))
first_pipeline()
2. ParDo や UDF等の分散処理
処理を順次追加指定する。
# Count the occurrences of each word.
def count_ones(word_ones):
(word, ones) = word_ones
return (word, sum(ones))
counts = (
lines
| 'split' >>
(beam.ParDo(WordExtractingDoFn()).with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(count_ones))
2.2. ParDo
2.2.1. apache_beam.Map()
順番に処理する場合は、apache_beam.Map()を使う
import apache_beam as beam
import os
def run_pipeline():
with beam.Pipeline('DirectRunner') as pipeline:
# [Final Output PCollection] = ([Initial Input PCollection]
# | [First Transform]
# | [Second Transform]
# | [Third Transform])
input_data = (pipeline
| 'read_file' >> beam.io.ReadFromText('./data/sample.txt')
| 'add_hoge_string' >> beam.Map(lambda line: line + "hoge")
)
# input_dataの内容をファイルに書き出す
input_data | beam.io.WriteToText('./data/output.txt')
if __name__ == '__main__':
# working directoryを実行ファイルのある場所に変更
os.chdir(os.path.dirname(os.path.abspath(__file__)))
run_pipeline()
sample.txt の内容
Hello World
foo
bar
出力
Hello Worldhoge
foohoge
barhoge
各行の最後にhogeが追加されている。
注記)
apache_beam.Map()の代わりにapache_beam.ParDo()使うと、各文字ごとに改行される。
→ここでは深く立ち入らない。
2.2.2. ParDo
apache_beam.Map()に対して、lambda式で無名関数を作るほかに、
apache_beam.ParDo() に対して、classと関数を指定する方法もある。
transformの中での処理は、
apache_beam.DoFnを継承したクラスでprocess()をオーバーライドしたものを
apache_beam.ParDo()の引数で指定する。
下記のソースだと、AppendStringEachline クラスを指定している。
import apache_beam as beam
import os
class AppendStringEachline(beam.DoFn):
def process(self, element):
yield element + "hoge"
def run_pipeline():
with beam.Pipeline('DirectRunner') as pipeline:
from past.builtins import unicode
input_data = (
pipeline
# sample.txtを読み込んで
| 'read_file' >> beam.io.textio.ReadFromText('./data/sample.txt')
# 各行の最後に "hoge" を追加する
# | 'add_hoge_string' >> beam.transforms.core.Map(lambda line : line + "hoge") # つまり左記と同じ結果
| 'add_hoge_string' >> (beam.transforms.core.ParDo(AppendStringEachline()).with_output_types(unicode))
# .with_output_types(unicode) について:
# apache_beam.typehints.decorators.with_output_types(*return_type_hint, **kwargs)
# https://beam.apache.org/releases/pydoc/2.21.0/apache_beam.typehints.decorators.html#apache_beam.typehints.decorators.with_output_types
# 返り値の型をアノテーションしている。なくても動くけどあったほうが親切。
# from past.builtins import unicode
)
# input_dataの内容をファイルに書き出す
input_data | beam.io.textio.WriteToText('./data/output.txt')
if __name__ == '__main__':
# working directoryを実行ファイルのある場所に変更
os.chdir(os.path.dirname(os.path.abspath(__file__)))
run_pipeline()
コメントに記載したが、wordcount.py で、 ParDo()のあとにwith_output_types(unicode)) というdecoratorがある。
これは、 戻り値の定義された型ヒントを型チェックするdecorator 。
今回は、stringを返却するので、unicodeを指定している。手元のPython3.7だと、strでもOK。 あると親切だが、間違った型、例えばintを指定するとexceptionがでる。
指定できる型の詳細は、Python-Future参照 (Pythonで2系と3系が共存していた時期が長い事による負債の対策に見える)。
2.3. apache_beamを適用する前の挙動確認
wordcount.pyの下記のコードを読む前に、apache_beamなしの挙動を確認してみる。
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(count_ones))
apache_beamなしで書いてみたのが下記。
import にapache_beamが存在していないことに注意。
注)厳密には同じ処理でないところもあるが、だいたい同じ挙動。
import re
import os
from itertools import groupby
from operator import itemgetter
class WordExtractingDoFn():
def __init__(self):
pass
def split(self, element):
text_line = element.strip()
if not text_line:
self.empty_line_counter.inc(1)
words = re.findall(r'[\w\']+', text_line, re.UNICODE) # ' ←qiitaのシンタックスハイライトバグ対策
return words
@staticmethod
def add_counter_number(words):
return (words, 1)
def cope(self):
readfile = './data/sample.txt'
with open(readfile) as input_file:
wordlist = list()
# | 'split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(unicode))
for line in input_file:
words = self.split(line)
wordlist.extend(words)
# 'PairWithOne' >> beam.Map(lambda x: (x, 1))
wordlist = sorted(wordlist, key=itemgetter(0))
word_and_count_list = list()
for x in wordlist:
word_and_count_list.append(self.add_counter_number(x))
# | 'group' >> beam.GroupByKey()
result_list = list()
for (k, g) in groupby(word_and_count_list, key=itemgetter(0)):
# | 'count' >> beam.Map(count_ones)) # 厳密にはちょっと違うけど,結果は同じ
i = 0
for item in g:
i += 1
tuple1 = k, i
result_list.append(tuple1)
# output | 'write' >> WriteToText(known_args.output)
output = '\r\n'.join(map(lambda x: x[0] + ': ' + str(x[1]), result_list))
with open('./data/output.txt', mode='w') as f:
f.write(output)
pass
if __name__ == '__main__':
# working directoryを実行ファイルのある場所に変更
os.chdir(os.path.dirname(os.path.abspath(__file__)))
w = WordExtractingDoFn()
w.cope()
tuple, groupby, itemgetter の使い方を確認しておく。
特に、groupbyはSQLのgroup byとは違う挙動なので注意。
2.4. コマンドライン引数での指定以外内容が揃ったので、中身を確認
wordcount.pyより主だった部分を抜粋
# Read the text file[pattern] into a PCollection.
lines = p | 'read' >> ReadFromText(known_args.input) # a
# Count the occurrences of each word.
def count_ones(word_ones):
(word, ones) = word_ones
return (word, sum(ones))
counts = (
lines
| 'split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(unicode)) # b
| 'pair_with_one' >> beam.Map(lambda x: (x, 1)) # c
| 'group' >> beam.GroupByKey() # d
| 'count' >> beam.Map(count_ones)) # e
# Format the counts into a PCollection of strings.
def format_result(word_count):
(word, count) = word_count
return '%s: %d' % (word, count)
output = counts | 'format' >> beam.Map(format_result)
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | 'write' >> WriteToText(known_args.output)
result = p.run()
result.wait_until_finish()
a ファイルの読み込み
説明ずみ
b WordExtractingDoFnクラスのprocess() メソッドを実行
process() メソッドは下記
for文の中はメトリクスの記録なので、一旦スルー
メインの処理は、(読み込みファイルから渡されたの)1行の文字列を単語に分割すること
→正規表現を使っている
element は、実質的に読み込みファイルの一行の文字列を受け取っている
def process(self, element):
text_line = element.strip()
if not text_line:
self.empty_line_counter.inc(1)
words = re.findall(r'[\w\']+', text_line, re.UNICODE)#'
for w in words:
self.words_counter.inc()
self.word_lengths_counter.inc(len(w))
self.word_lengths_dist.update(len(w))
return words
入力ファイル(./data/sample.txt)
Hello World
foo
bar
foo
Hello Japan
hoge
Hello World
foo
出力ファイル()
ただし、transformの処理は下記のみを実行
| 'split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(unicode))
Hello
World
foo
bar
foo
Hello
Japan
hoge
Hello
World
foo
c 単語ごとに1をつけたtupleにする
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
入力ファイルは先程と同じ。
出力ファイル
('Hello', 1)
('World', 1)
('foo', 1)
('bar', 1)
('foo', 1)
('Hello', 1)
('Japan', 1)
('hoge', 1)
('Hello', 1)
('World', 1)
('foo', 1)
d 単語ごとに出現回数を集約
| 'group' >> beam.GroupByKey()
出力結果
('Hello', [1, 1, 1])
('World', [1, 1])
('foo', [1, 1, 1])
('bar', [1])
('Japan', [1])
('hoge', [1])
e 単語ごとに出現回数の合計を算出
count_ones メソッドの(並列)実行
| 'count' >> beam.Map(count_ones)
メソッドの定義に動作時の挙動例を追加
def count_ones(word_ones): # word_ones:tuple ('Hello',[1,1,1])
(word, ones) = word_ones # word: 'Hello', ones:[1,1,1]
return (word, sum(ones)) # tupleの合計を算出
2.5 コマンドライン引数関連
argparse パッケージ
https://docs.python.org/ja/3/howto/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
書いてあるとおり、実行時に:
--input input_file_uri で、入力ファイルを指定。 デフォルト値は'gs://dataflow-samples/shakespeare/kinglear.txt' つまり、GCS上。 もちろんローカル指定も可能。
--output output_file_uri で、出力ファイルを指定。
コマンド例
python wordcount.py --input ./data/input_sample.txt --output ./data/output
--output YOUR_OUTPUT_FILE
で実行すると、 YOUR_OUTPUT_FILE-00000-of-00001 のような形式で出力されるので、
拡張子を指定するときは注意。
apache_beam本体の詳細なオプションについては、必要になったときに実例を見ながら
ドキュメントを見るので良さそう というか今以上に読み解けない。
- Apache Beamのドキュメント
- GCPのドキュメント
概念編
関連の資料
-
- 簡潔な用語集。日本語なので、最初に通読するには良いです。
-
Apache Beam (Dataflow) 実践入門【Python】
- すでにQiitaに記事があるので、公式ドキュメント(英語)以外だとこれを読むのが良さそう。
-
- 公式ドキュメント(英語のみ)。日本語情報は見当たらず。
以上から、1.を読む。2.を読む。2.でわからないことがあれば3.を(ヒィヒィ言いながら)読むという位置づけだと効率が良さそう。