はじめに
この記事では、Apache Beam Python SDK で提供されている Transform についてまとめています。簡単に呼び出すことが可能な Transform を一通り知っておくことで、より迅速に実装の方針を立てることができるかと思います。
要素ごとの処理 | Element-wise
ParDo - DoFn の実行
PCollection の各要素を考慮し、何らかの処理(DoFn)を実行します。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class ComputeWordLength(beam.DoFn):
def __init__(self):
super(ComputeWordLength, self).__init__()
def process(self, element):
yield len(element)
class TestParDo(TestCase):
def test_par_do(self):
expected = [5, 3, 7, 7, 5]
inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ParDo(ComputeWordLength()))
assert_that(actual, equal_to(expected))
Filter - 要素のフィルタリング
PCollection の要素をフィルタリングします。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestFilter(TestCase):
def test_filter(self):
expected = ['A']
inputs = ['A', 'B', 'C']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Filter(lambda element: element.startswith('A')))
assert_that(actual, equal_to(expected))
Map - 要素に関数を適用
PCollection の各要素に関数を適用します。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestMap(TestCase):
def test_map(self):
expected = [5, 3, 7, 7, 5]
inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Map(lambda element: len(element)))
assert_that(actual, equal_to(expected))
FlatMap - 要素に関数を適用(反復可能)
PCollection の各要素に関数を適用します。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestFlatMap(TestCase):
def test_flat_map(self):
expected = [5, 3, 7, 7, 5]
inputs = [['Alice', 'Bob'], ['Cameron', 'Daniele', 'Ellen']]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.FlatMap(lambda element: [len(e) for e in element]))
assert_that(actual, equal_to(expected))
ToString - 要素を文字列に変換
PCollection の各要素を文字列に変換します。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestToString(TestCase):
def test_to_string_kvs(self):
"""Key, Value を , 区切りの文字列に."""
expected = ['A,B', 'C,D']
inputs = [('A', 'B'), ('C', 'D')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ToString.Kvs())
assert_that(actual, equal_to(expected))
def test_to_string_element(self):
"""各要素を文字列に."""
expected = ["A", "['A', 'B']", "['C', 'D', 'E']"]
inputs = ['A', ['A', 'B'], ['C', 'D', 'E']]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ToString.Element())
assert_that(actual, equal_to(expected))
def test_to_string_iterables(self):
"""イテラブルなオブジェクトを文字列に."""
expected = ['A,B', 'C,D,E']
inputs = [['A', 'B'], ['C', 'D', 'E']]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ToString.Iterables())
assert_that(actual, equal_to(expected))
Keys - 要素から Key を抽出
PCollection の各要素(Key と Value のペア)から Key を抽出します。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestKeys(TestCase):
def test_keys(self):
expected = [0, 1, 2, 3, 4, 5, 6]
inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
(4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Keys())
assert_that(actual, equal_to(expected))
Values - 要素から Value を抽出
PCollection の各要素(Key と Value のペア)から Value を抽出します。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestValues(TestCase):
def test_values(self):
expected = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']
inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
(4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Values())
assert_that(actual, equal_to(expected))
KvSwap - 要素の Key と Value を交換
PCollection の各要素(Key と Value のペア)の Key と Value の値を入れ替えます。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestKvSwap(TestCase):
def test_kv_swap(self):
expected = [('Friday', 5), ('Monday', 1), ('Saturday', 6), ('Sunday', 0),
('Thursday', 4), ('Tuesday', 2), ('Wednesday', 3)]
inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
(4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.KvSwap())
assert_that(actual, equal_to(expected))
集約処理 | Aggregation
GroupByKey - 要素を Key で集約
PCollection の要素(Key と Value のペア)を Key によって集約します。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestGroupByKey(TestCase):
def test_group_by_key(self):
expected = [('cat', ['tama', 'mike']), ('dog', ['pochi'])]
inputs = [('cat', 'tama'), ('cat', 'mike'), ('dog', 'pochi')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.GroupByKey())
assert_that(actual, equal_to(expected))
CoGroupByKey - 要素を Key で集約(複数の PCollection)
複数の PCollection の要素(Key と Value のペア)を Key によって集約します。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestCoGroupByKey(TestCase):
def test_co_group_by_key(self):
expected = [
('amy', (['amy@example.com'], ['111-222-3333', '333-444-5555'])),
('julia', (['julia@example.com'], []))
]
inputs1 = [('amy', 'amy@example.com'), ('julia', 'julia@example.com')]
inputs2 = [('amy', '111-222-3333'), ('amy', '333-444-5555')]
with TestPipeline() as p:
pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
pcol2 = p | 'create pcol2' >> beam.Create(inputs2)
actual = ((pcol1, pcol2)
| beam.CoGroupByKey())
assert_that(actual, equal_to(expected))
CombineGlobally - 要素の結合
PCollection のすべての要素を結合します。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestCombineGlobally(TestCase):
def test_combine_globally(self):
expected = [55]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.CombineGlobally(sum))
assert_that(actual, equal_to(expected))
ToList - 要素を1つのリストに格納
PCollection のすべての要素を1つのリストに格納します。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestToList(TestCase):
def test_to_list(self):
expected = [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.ToList())
assert_that(actual, equal_to(expected))
ToDict - 要素を1つの辞書型に格納
PCollection のすべての要素(KeyとValueのペア)を1つの辞書型に格納します。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestToDict(TestCase):
def test_to_dict(self):
expected = [{'A': 2, 'B': 1}] # Key が被る場合はどちらか一方の Value が選択される
inputs = [('A', 1), ('A', 2), ('B', 1)]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.ToDict())
assert_that(actual, equal_to(expected))
Count - 要素数のカウント
PCollection の要素数を数えます。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestCount(TestCase):
def test_count(self):
expected = [10]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Count.Globally())
assert_that(actual, equal_to(expected))
Distinct - 要素の重複排除
PCollection の要素から重複を排除します。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestDistinct(TestCase):
def test_distinct(self):
expected = [1, 2, 3]
inputs = [1, 1, 2, 3]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Distinct())
assert_that(actual, equal_to(expected))
Mean - 要素の平均の算出
PCollection のすべての要素の平均を算出します。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestMean(TestCase):
def test_mean(self):
expected = [5.5]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Mean.Globally())
assert_that(actual, equal_to(expected))
Sample - 要素からランダムに抽出
PCollection のすべての要素からランダムに数件抽出します。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestSample(TestCase):
def test_sample(self):
expected = [[2, 8, 6]] # 期待値は毎回ランダムな値になる
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Sample.FixedSizeGlobally(3))
assert_that(actual, equal_to(expected))
Top - 要素から最大(または最小)値の抽出
PCollection のすべての要素から最大(または最小)のものを数件抽出します。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestTop(TestCase):
def test_top_largest(self):
expected = [[10, 9, 8]]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Top.Largest(3))
assert_that(actual, equal_to(expected))
def test_top_smallest(self):
expected = [[1, 2, 3]]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Top.Smallest(3))
assert_that(actual, equal_to(expected))
その他の処理 | Others
Flatten - PCollection の結合
複数の PCollection を単一の PCollection に結合します。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestFlatten(TestCase):
def test_flatten(self):
expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
inputs1 = [1, 2, 3, 4, 5]
inputs2 = [6, 7, 8, 9, 10]
with TestPipeline() as p:
pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
pcol2 = p | 'create pcol2' >> beam.Create(inputs2)
actual = (pcol1, pcol2) | beam.Flatten()
assert_that(actual, equal_to(expected))
Reshuffle - 要素の再分配
PCollection の要素をワーカー間で再分配します。
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestReshuffle(TestCase):
def test_reshuffle(self):
expected = ['A', 'B', 'C']
inputs = ['A', 'B', 'C']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Reshuffle())
assert_that(actual, equal_to(expected))
まとめ
Apache Beam Python SDK では、豊富な Transform が提供されています(Java と比べると少ないですが)。新たな機能が提供されたら随時更新していきたいと思います。
Apache Beam の Transform についてパッと思い出したい時などに参照していただけると幸いです!