44
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Apache Beam チートシート【Python】

Last updated at Posted at 2019-12-31

はじめに

この記事では、Apache Beam Python SDK で提供されている Transform についてまとめています。簡単に呼び出すことが可能な Transform を一通り知っておくことで、より迅速に実装の方針を立てることができるかと思います。

要素ごとの処理 | Element-wise

ParDo - DoFn の実行

PCollection の各要素を考慮し、何らかの処理(DoFn)を実行します。

test_par_do.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class ComputeWordLength(beam.DoFn):

    def __init__(self):
        super(ComputeWordLength, self).__init__()

    def process(self, element):
        yield len(element)


class TestParDo(TestCase):

    def test_par_do(self):
        expected = [5, 3, 7, 7, 5]

        inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ParDo(ComputeWordLength()))

            assert_that(actual, equal_to(expected))

Filter - 要素のフィルタリング

PCollection の要素をフィルタリングします。

test_filter.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestFilter(TestCase):

    def test_filter(self):
        expected = ['A']

        inputs = ['A', 'B', 'C']

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Filter(lambda element: element.startswith('A')))

            assert_that(actual, equal_to(expected))

Map - 要素に関数を適用

PCollection の各要素に関数を適用します。

test_map.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestMap(TestCase):

    def test_map(self):
        expected = [5, 3, 7, 7, 5]

        inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Map(lambda element: len(element)))

            assert_that(actual, equal_to(expected))

FlatMap - 要素に関数を適用(反復可能)

PCollection の各要素に関数を適用します。

test_flat_map.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestFlatMap(TestCase):

    def test_flat_map(self):
        expected = [5, 3, 7, 7, 5]

        inputs = [['Alice', 'Bob'], ['Cameron', 'Daniele', 'Ellen']]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.FlatMap(lambda element: [len(e) for e in element]))

            assert_that(actual, equal_to(expected))

ToString - 要素を文字列に変換

PCollection の各要素を文字列に変換します。

test_to_string.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestToString(TestCase):

    def test_to_string_kvs(self):
        """Key, Value を , 区切りの文字列に."""
        expected = ['A,B', 'C,D']

        inputs = [('A', 'B'), ('C', 'D')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ToString.Kvs())

            assert_that(actual, equal_to(expected))

    def test_to_string_element(self):
        """各要素を文字列に."""
        expected = ["A", "['A', 'B']", "['C', 'D', 'E']"]

        inputs = ['A', ['A', 'B'], ['C', 'D', 'E']]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ToString.Element())

            assert_that(actual, equal_to(expected))

    def test_to_string_iterables(self):
        """イテラブルなオブジェクトを文字列に."""
        expected = ['A,B', 'C,D,E']

        inputs = [['A', 'B'], ['C', 'D', 'E']]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ToString.Iterables())

            assert_that(actual, equal_to(expected))

Keys - 要素から Key を抽出

PCollection の各要素(Key と Value のペア)から Key を抽出します。

test_keys.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestKeys(TestCase):

    def test_keys(self):
        expected = [0, 1, 2, 3, 4, 5, 6]

        inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
                  (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Keys())

            assert_that(actual, equal_to(expected))

Values - 要素から Value を抽出

PCollection の各要素(Key と Value のペア)から Value を抽出します。

test_values.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestValues(TestCase):

    def test_values(self):
        expected = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']

        inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
                  (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Values())

            assert_that(actual, equal_to(expected))

KvSwap - 要素の Key と Value を交換

PCollection の各要素(Key と Value のペア)の Key と Value の値を入れ替えます。

test_kv_swap.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestKvSwap(TestCase):

    def test_kv_swap(self):
        expected = [('Friday', 5), ('Monday', 1), ('Saturday', 6), ('Sunday', 0),
                    ('Thursday', 4), ('Tuesday', 2), ('Wednesday', 3)]

        inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
                  (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.KvSwap())

            assert_that(actual, equal_to(expected))

集約処理 | Aggregation

GroupByKey - 要素を Key で集約

PCollection の要素(Key と Value のペア)を Key によって集約します。

test_group_by_key.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestGroupByKey(TestCase):

    def test_group_by_key(self):
        expected = [('cat', ['tama', 'mike']), ('dog', ['pochi'])]

        inputs = [('cat', 'tama'), ('cat', 'mike'), ('dog', 'pochi')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.GroupByKey())

            assert_that(actual, equal_to(expected))

CoGroupByKey - 要素を Key で集約(複数の PCollection)

複数の PCollection の要素(Key と Value のペア)を Key によって集約します。

test_co_group_by_key.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestCoGroupByKey(TestCase):

    def test_co_group_by_key(self):
        expected = [
            ('amy', (['amy@example.com'], ['111-222-3333', '333-444-5555'])),
            ('julia', (['julia@example.com'], []))
        ]

        inputs1 = [('amy', 'amy@example.com'), ('julia', 'julia@example.com')]
        inputs2 = [('amy', '111-222-3333'), ('amy', '333-444-5555')]

        with TestPipeline() as p:
            pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
            pcol2 = p | 'create pcol2' >> beam.Create(inputs2)

            actual = ((pcol1, pcol2)
                      | beam.CoGroupByKey())

            assert_that(actual, equal_to(expected))

CombineGlobally - 要素の結合

PCollection のすべての要素を結合します。

combine_globally.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestCombineGlobally(TestCase):

    def test_combine_globally(self):
        expected = [55]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.CombineGlobally(sum))

            assert_that(actual, equal_to(expected))

ToList - 要素を1つのリストに格納

PCollection のすべての要素を1つのリストに格納します。

test_to_list.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestToList(TestCase):

    def test_to_list(self):
        expected = [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.ToList())

            assert_that(actual, equal_to(expected))

ToDict - 要素を1つの辞書型に格納

PCollection のすべての要素(KeyとValueのペア)を1つの辞書型に格納します。

test_to_dict.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestToDict(TestCase):

    def test_to_dict(self):
        expected = [{'A': 2, 'B': 1}]  # Key が被る場合はどちらか一方の Value が選択される

        inputs = [('A', 1), ('A', 2), ('B', 1)]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.ToDict())

            assert_that(actual, equal_to(expected))

Count - 要素数のカウント

PCollection の要素数を数えます。

test_count.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestCount(TestCase):

    def test_count(self):
        expected = [10]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Count.Globally())

            assert_that(actual, equal_to(expected))

Distinct - 要素の重複排除

PCollection の要素から重複を排除します。

test_distinct.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestDistinct(TestCase):

    def test_distinct(self):
        expected = [1, 2, 3]

        inputs = [1, 1, 2, 3]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Distinct())

            assert_that(actual, equal_to(expected))

Mean - 要素の平均の算出

PCollection のすべての要素の平均を算出します。

test_mean.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestMean(TestCase):

    def test_mean(self):
        expected = [5.5]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Mean.Globally())

            assert_that(actual, equal_to(expected))

Sample - 要素からランダムに抽出

PCollection のすべての要素からランダムに数件抽出します。

test_sample.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestSample(TestCase):

    def test_sample(self):
        expected = [[2, 8, 6]]  # 期待値は毎回ランダムな値になる

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Sample.FixedSizeGlobally(3))

            assert_that(actual, equal_to(expected))

Top - 要素から最大(または最小)値の抽出

PCollection のすべての要素から最大(または最小)のものを数件抽出します。

test_top.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestTop(TestCase):

    def test_top_largest(self):
        expected = [[10, 9, 8]]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Top.Largest(3))

            assert_that(actual, equal_to(expected))

    def test_top_smallest(self):
        expected = [[1, 2, 3]]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Top.Smallest(3))

            assert_that(actual, equal_to(expected))

その他の処理 | Others

Flatten - PCollection の結合

複数の PCollection を単一の PCollection に結合します。

test_flatten.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestFlatten(TestCase):

    def test_flatten(self):
        expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        inputs1 = [1, 2, 3, 4, 5]
        inputs2 = [6, 7, 8, 9, 10]

        with TestPipeline() as p:
            pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
            pcol2 = p | 'create pcol2' >> beam.Create(inputs2)

            actual = (pcol1, pcol2) | beam.Flatten()

            assert_that(actual, equal_to(expected))

Reshuffle - 要素の再分配

PCollection の要素をワーカー間で再分配します。

test_reshuffle.py
from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestReshuffle(TestCase):

    def test_reshuffle(self):
        expected = ['A', 'B', 'C']

        inputs = ['A', 'B', 'C']

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Reshuffle())

            assert_that(actual, equal_to(expected))

まとめ

Apache Beam Python SDK では、豊富な Transform が提供されています(Java と比べると少ないですが)。新たな機能が提供されたら随時更新していきたいと思います。

Apache Beam の Transform についてパッと思い出したい時などに参照していただけると幸いです!

参考 URL

44
30
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
44
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?