0
0

More than 1 year has passed since last update.

Apache Beamで実装したアプリのユニットテストを試す

Posted at

はじめに

下記ドキュメントを参考に、Beamで実装したアプリのユニットテストを試しました。

準備

ドキュメントに従って以下のライブラリを追加しました。

pom.xml抜粋
<dependency>
    <groupId>org.hamcrest</groupId>
    <artifactId>hamcrest-all</artifactId>
    <version>1.3</version>
    <scope>test</scope>
</dependency>

Transformのテスト

テストコードの流れ

詳細は上記ドキュメントを見ていただいた方が早いですが、テストコードの流れをざっくり書くと、以下となる認識です。

  • 通常のPipeline.createの代わりに、TestPipeline.createでテスト用パイプラインを作成
  • Create.ofを使用して、静的な入力データのList等から入力PCollectionを生成
  • 入力PCollectionにテスト対象のTransformをapplyして出力PCollectionを生成
  • PAssertを使用して出力PCollectionの内容を期待値と比較

ドキュメント記載のサンプルコードのテスト実行

まず、ドキュメントに記載されている以下のサンプルコードをそのまま実行してみました。
Beamに標準で含まれているTransformであるCountクラスのテストのようです。

CountTest.java
public class CountTest {

  // Our static input data, which will make up the initial PCollection.
  static final String[] WORDS_ARRAY = new String[] {
  "hi", "there", "hi", "hi", "sue", "bob",
  "hi", "sue", "", "", "ZOW", "bob", ""};

  static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

  public void testCount() {
    // Create a test pipeline.
    Pipeline p = TestPipeline.create();

    // Create an input PCollection.
    PCollection<String> input = p.apply(Create.of(WORDS));

    // Apply the Count transform under test.
    PCollection<KV<String, Long>> output =
      input.apply(Count.<String>perElement());

    // Assert on the results.
    PAssert.that(output)
      .containsInAnyOrder(
          KV.of("hi", 4L),
          KV.of("there", 1L),
          KV.of("sue", 2L),
          KV.of("bob", 2L),
          KV.of("", 3L),
          KV.of("ZOW", 1L));

    // Run the pipeline.
    p.run();
  }
}

しかし、何もテストされず。。。

$ mvn test
〜省略〜
-------------------------------------------------------
 T E S T S
-------------------------------------------------------

Results :

Tests run: 0, Failures: 0, Errors: 0, Skipped: 0

恥ずかしながら、テストファイルの配置場所がおかしいか、pom.xmlの設定不足か、など無駄に調べてしまいましたが、そもそもテストメソッドに@Testをつけてないだけでした。。。

CountTest.java
    @Test
    public void testCount() {

改めて実行したところ、以下のエラーが発生しました

Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.399 sec <<< FAILURE!
testCount(<パッケージ名>.CountTest)  Time elapsed: 3.32 sec  <<< ERROR!
java.lang.IllegalStateException: Is your TestPipeline declaration missing a @Rule annotation? Usage: @Rule public final transient TestPipeline pipeline = TestPipeline.create();
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:507)
        at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:337)
        at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
        at <パッケージ名>.CountTest.testCount(CountTest.java:51)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:564)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
        at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
        at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
        at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
        at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:564)
        at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
        at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
        at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
        at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
        at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)

エラーメッセージに従って以下のように修正したら、テスト成功するようになりました。

CountTest.java
    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    @Test
    public void testCount() {
        // Create a test pipeline.
        //Pipeline p = TestPipeline.create();

代わりに、このissueを参考に、enableAbandonedNodeEnforcement(false)を追加した場合でもテスト成功するようになりました。

CountTest.java
    @Test
    public void testCount() {
        // Create a test pipeline.
        Pipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);

ただ、メソッドの説明を読んでも、なぜそうなるのかはよくわからなかったです。。。

自作Transformのテスト実行

自作のTransformのテストも試してみました。テスト対象クラスは下記のとおりです。
Kinesis Data AnalyticsのBeam実装サンプルを少しいじっただけの単純なものです。
入力文字列が"ping"なら"pong\n"に変換して出力し、他の文字列ならそのまま出力します。

StringPingPongFn.java
public class StringPingPongFn extends DoFn<String, String> {

    @ProcessElement
    public void processElement(ProcessContext c) {
        String content = c.element();
        if (content.trim().equalsIgnoreCase("ping")) {
            c.output("pong\n");
        } else {
            c.output(c.element());
        }
    }
}

作成したテストは以下となります。CountTestと同じ要領です。

StringPingPongFnTest.java
public class StringPingPongFnTest {

    static final String[] INPUT_ARRAY = new String[] {
            "ping",
            "hoge",
            "",
            "",
            "fuga",
            "ping",
            ""};

    static final List<String> INPUT = Arrays.asList(INPUT_ARRAY);

    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    @Test
    public void testProcessElement() {
        PCollection<String> input = p.apply(Create.of(INPUT));

        PCollection<String> output =
                input.apply(ParDo.of(new StringPingPongFn()));

        PAssert.that(output)
                .containsInAnyOrder(
                        "pong\n",
                        "hoge",
                        "",
                        "",
                        "fuga",
                        "pong\n",
                        "");

        p.run();
    }
}

Pipelineのテスト

上記は単一のTransformクラスのテストでしたが、次は複数のTransformをつなげたPipeline全体のテストです。

テスト対象はメインクラスとなりますが、入出力を除いた一連の変換処理の部分のみをユニットテストの対象としています。入出力を含めたアプリケーション全体のテストは、結合テストとして、DirectRunnerを使用したローカル実行や、実際の想定実行環境(DataFlow、Spark、Flink等)で実行してテストした方が効果的かと思います。

テスト対象クラス

テスト対象のメインクラスは以下となります。

GeneralBeamPingPongJob.java
public class GeneralBeamPingPongJob {

    public static void main(String[] args) {
        IOConfiguration config = getConfig(args);

        Pipeline p = Pipeline.create(config.options());

        PCollection<String> input = p.apply("source", config.readTransform());
        PCollection<String> output = transform(input);
        output.apply("sink", config.writeTransform());

        p.run().waitUntilFinish();
    }

    public static PCollection<String> transform(PCollection<String> input) {
        return input
                .apply(ParDo.of(new StringPingPongFn()))
                .apply(Filter.by((SerializableFunction<String, Boolean>) str -> !str.isEmpty()))
                .apply(MapElements.into(TypeDescriptors.strings()).via(String::toUpperCase));
    }

    private static IOConfiguration getConfig(String[] args) {
        // 省略
    }
}

mainメソッドではPipelineの生成、入出力の指定、実行を行い、一連の変換処理の組み立てはtransformメソッドに切り出しています。このtransformメソッドがユニットテスト対象となります。

一連の変換処理としては、前述のStringPingPongFnの後ろに、空文字削除と大文字変換をつなげてみました。

なお、メインクラスのgetConfigメソッドについては、本記事の主旨から外れるので省略していますが、詳細は以下の記事をご参照ください。mainメソッドの中身は以下の記事の記載のままだとテストしづらかったので、上記の通り変更しています。

テストクラス

テストクラスが以下となります。

流れとしては単一Transformのテストとほぼ同じです。パイプラインの組み立てをtransformメソッドに切り出したことにより、入出力をテスト用のものに差し替えやすくしています。

単純な変換処理である限り※、変換処理を追加・変更した場合も、テスト用の入力データと出力データの期待値を変更するだけで対応可能です。

GeneralBeamPingPongJobTest.java
public class GeneralBeamPingPongJobTest {

    static final String[] INPUT_ARRAY = new String[] {
            "ping",
            "hoge",
            "",
            "",
            "fuga",
            "ping",
            ""};

    static final List<String> INPUT = Arrays.asList(INPUT_ARRAY);

    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    @Test
    public void testTransform() {

        PCollection<String> input = p.apply(Create.of(INPUT));

        PCollection<String> output = GeneralBeamPingPongJob.transform(input);

        PAssert.that(output)
                .containsInAnyOrder(
                        "PONG\n",
                        "HOGE",
                        "FUGA",
                        "PONG\n");

        p.run();
    }
}

※ 例えば、Side InputsやAdditional outputsにより入出力が複数だったり、Transform内で外部リソースにアクセスするような処理の場合は、さすがにテストのセットアップ処理もより複雑になると思いますが、そういったケースはまた別記事にしたいと思います。

まとめ

Beamで実装したアプリのユニットテストとして、単一Transformのテストと、複数TransformをつなげたPipeline全体のテストを行いました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0