はじめに
下記ドキュメントを参考に、Beamで実装したアプリのユニットテストを試しました。
準備
ドキュメントに従って以下のライブラリを追加しました。
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
Transformのテスト
テストコードの流れ
詳細は上記ドキュメントを見ていただいた方が早いですが、テストコードの流れをざっくり書くと、以下となる認識です。
- 通常の
Pipeline.create
の代わりに、TestPipeline.create
でテスト用パイプラインを作成 -
Create.of
を使用して、静的な入力データのList等から入力PCollectionを生成 - 入力PCollectionにテスト対象のTransformをapplyして出力PCollectionを生成
-
PAssert
を使用して出力PCollectionの内容を期待値と比較
ドキュメント記載のサンプルコードのテスト実行
まず、ドキュメントに記載されている以下のサンプルコードをそのまま実行してみました。
Beamに標準で含まれているTransformであるCount
クラスのテストのようです。
public class CountTest {
// Our static input data, which will make up the initial PCollection.
static final String[] WORDS_ARRAY = new String[] {
"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""};
static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
public void testCount() {
// Create a test pipeline.
Pipeline p = TestPipeline.create();
// Create an input PCollection.
PCollection<String> input = p.apply(Create.of(WORDS));
// Apply the Count transform under test.
PCollection<KV<String, Long>> output =
input.apply(Count.<String>perElement());
// Assert on the results.
PAssert.that(output)
.containsInAnyOrder(
KV.of("hi", 4L),
KV.of("there", 1L),
KV.of("sue", 2L),
KV.of("bob", 2L),
KV.of("", 3L),
KV.of("ZOW", 1L));
// Run the pipeline.
p.run();
}
}
しかし、何もテストされず。。。
$ mvn test
〜省略〜
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Results :
Tests run: 0, Failures: 0, Errors: 0, Skipped: 0
恥ずかしながら、テストファイルの配置場所がおかしいか、pom.xmlの設定不足か、など無駄に調べてしまいましたが、そもそもテストメソッドに@Test
をつけてないだけでした。。。
@Test
public void testCount() {
改めて実行したところ、以下のエラーが発生しました
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.399 sec <<< FAILURE!
testCount(<パッケージ名>.CountTest) Time elapsed: 3.32 sec <<< ERROR!
java.lang.IllegalStateException: Is your TestPipeline declaration missing a @Rule annotation? Usage: @Rule public final transient TestPipeline pipeline = TestPipeline.create();
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:507)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:337)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
at <パッケージ名>.CountTest.testCount(CountTest.java:51)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
エラーメッセージに従って以下のように修正したら、テスト成功するようになりました。
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
public void testCount() {
// Create a test pipeline.
//Pipeline p = TestPipeline.create();
代わりに、このissueを参考に、enableAbandonedNodeEnforcement(false)
を追加した場合でもテスト成功するようになりました。
@Test
public void testCount() {
// Create a test pipeline.
Pipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
ただ、メソッドの説明を読んでも、なぜそうなるのかはよくわからなかったです。。。
自作Transformのテスト実行
自作のTransformのテストも試してみました。テスト対象クラスは下記のとおりです。
Kinesis Data AnalyticsのBeam実装サンプルを少しいじっただけの単純なものです。
入力文字列が"ping"
なら"pong\n"
に変換して出力し、他の文字列ならそのまま出力します。
public class StringPingPongFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
String content = c.element();
if (content.trim().equalsIgnoreCase("ping")) {
c.output("pong\n");
} else {
c.output(c.element());
}
}
}
作成したテストは以下となります。CountTest
と同じ要領です。
public class StringPingPongFnTest {
static final String[] INPUT_ARRAY = new String[] {
"ping",
"hoge",
"",
"",
"fuga",
"ping",
""};
static final List<String> INPUT = Arrays.asList(INPUT_ARRAY);
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
public void testProcessElement() {
PCollection<String> input = p.apply(Create.of(INPUT));
PCollection<String> output =
input.apply(ParDo.of(new StringPingPongFn()));
PAssert.that(output)
.containsInAnyOrder(
"pong\n",
"hoge",
"",
"",
"fuga",
"pong\n",
"");
p.run();
}
}
Pipelineのテスト
上記は単一のTransformクラスのテストでしたが、次は複数のTransformをつなげたPipeline全体のテストです。
テスト対象はメインクラスとなりますが、入出力を除いた一連の変換処理の部分のみをユニットテストの対象としています。入出力を含めたアプリケーション全体のテストは、結合テストとして、DirectRunnerを使用したローカル実行や、実際の想定実行環境(DataFlow、Spark、Flink等)で実行してテストした方が効果的かと思います。
テスト対象クラス
テスト対象のメインクラスは以下となります。
public class GeneralBeamPingPongJob {
public static void main(String[] args) {
IOConfiguration config = getConfig(args);
Pipeline p = Pipeline.create(config.options());
PCollection<String> input = p.apply("source", config.readTransform());
PCollection<String> output = transform(input);
output.apply("sink", config.writeTransform());
p.run().waitUntilFinish();
}
public static PCollection<String> transform(PCollection<String> input) {
return input
.apply(ParDo.of(new StringPingPongFn()))
.apply(Filter.by((SerializableFunction<String, Boolean>) str -> !str.isEmpty()))
.apply(MapElements.into(TypeDescriptors.strings()).via(String::toUpperCase));
}
private static IOConfiguration getConfig(String[] args) {
// 省略
}
}
main
メソッドではPipelineの生成、入出力の指定、実行を行い、一連の変換処理の組み立てはtransform
メソッドに切り出しています。このtransform
メソッドがユニットテスト対象となります。
一連の変換処理としては、前述のStringPingPongFn
の後ろに、空文字削除と大文字変換をつなげてみました。
なお、メインクラスのgetConfig
メソッドについては、本記事の主旨から外れるので省略していますが、詳細は以下の記事をご参照ください。mainメソッドの中身は以下の記事の記載のままだとテストしづらかったので、上記の通り変更しています。
テストクラス
テストクラスが以下となります。
流れとしては単一Transformのテストとほぼ同じです。パイプラインの組み立てをtransform
メソッドに切り出したことにより、入出力をテスト用のものに差し替えやすくしています。
単純な変換処理である限り※、変換処理を追加・変更した場合も、テスト用の入力データと出力データの期待値を変更するだけで対応可能です。
public class GeneralBeamPingPongJobTest {
static final String[] INPUT_ARRAY = new String[] {
"ping",
"hoge",
"",
"",
"fuga",
"ping",
""};
static final List<String> INPUT = Arrays.asList(INPUT_ARRAY);
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
public void testTransform() {
PCollection<String> input = p.apply(Create.of(INPUT));
PCollection<String> output = GeneralBeamPingPongJob.transform(input);
PAssert.that(output)
.containsInAnyOrder(
"PONG\n",
"HOGE",
"FUGA",
"PONG\n");
p.run();
}
}
※ 例えば、Side InputsやAdditional outputsにより入出力が複数だったり、Transform内で外部リソースにアクセスするような処理の場合は、さすがにテストのセットアップ処理もより複雑になると思いますが、そういったケースはまた別記事にしたいと思います。
まとめ
Beamで実装したアプリのユニットテストとして、単一Transformのテストと、複数TransformをつなげたPipeline全体のテストを行いました。