0
0

More than 1 year has passed since last update.

Kinesis Data AnalyticsのApache Beamチュートリアルをやってみた

Last updated at Posted at 2021-09-26

以下のチュートリアルをやってみた際、主に依存ライブラリのバージョンの問題で、提供されているサンプルコードがそのままではうまく動作しなかったので、正常動作させるにあたり変更した部分をメモしておきます。

動作環境

IntelliJ IDEA 2021.2 (Community Edition)
Build #IC-212.4746.92, built on July 27, 2021
$ java -version
openjdk version "11.0.12" 2021-07-20
OpenJDK Runtime Environment Homebrew (build 11.0.12+0)
OpenJDK 64-Bit Server VM Homebrew (build 11.0.12+0, mixed mode)

$ mvn -version
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /usr/local/Cellar/maven/3.6.3_1/libexec
Java version: 14.0.1, vendor: N/A, runtime: /usr/local/Cellar/openjdk/14.0.1/libexec/openjdk.jdk/Contents/Home
Default locale: ja_JP, platform encoding: UTF-8
OS name: "mac os x", version: "10.16", arch: "x86_64", family: "mac"

Flinkバージョンの記述を追加

必須ではないと思われるが、intellijでビルドエラーになったのでpom.xmlにもFlinkのバージョンを記載した。

pom.xml
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.11</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kda-runtime.version>1.2.0</kda-runtime.version>
        <beam.version>2.23.0</beam.version>
        <jackson.version>2.10.2</jackson.version>
        <flink.version>1.11.1</flink.version> <!-- 追加 -->
        <flink.version.minor>1.8</flink.version.minor> <!-- 追加 -->
    </properties>

チュートリアル記載のコマンドが以下なので、バージョンはそれに合わせてみた。

mvn package -Dflink.version=1.11.1 -Dflink.version.minor=1.8

AWS SDKの記載バージョンが見つからない

それでも以下のビルドエラーが出た。

java: /Users/XXXX/.m2/repository/com/amazonaws/aws-java-sdk-core/1.11.903/aws-java-sdk-core-1.11.903.jarの読込みエラーです。zip file is empty

コメントに書かれているとおり、AWS SDKのバージョンをあげてみた。

pom.xml
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.amazonaws</groupId>
                <artifactId>aws-java-sdk-bom</artifactId>
                <!-- Get the latest SDK version from <https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bom> -->
                <version>1.11.1034</version>  <!-- 1.11.903から1.11.1034に変更 -->
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

とりあえずビルドは成功したので、チュートリアルに合わせてKDSやKDAを構築する。

flink.version.minorが古い

デプロイしてKDA実行したらエラー。

java.lang.VerifyError: Bad type on operand stack\n
Exception Details:\n
  Location:\n
    org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator.translateNode(Lorg/apache/beam/sdk/transforms/PTransform;Lorg/apache/beam/runners/flink/FlinkStreamingTranslationContext;)V @467: invokespecial\n
  Reason:\n
    Type 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' (current frame, stack[4]) is not assignable to 'org/apache/flink/streaming/api/transformations/StreamTransformation'\n
~以下略~

flink.version.minor1.11にする必要があるようだ。

pom.xml
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.11</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kda-runtime.version>1.2.0</kda-runtime.version>
        <beam.version>2.23.0</beam.version>
        <jackson.version>2.10.2</jackson.version>
        <flink.version>1.11.1</flink.version>
        <flink.version.minor>1.11</flink.version.minor> <!-- 1.11に変更 -->
    </properties>

Beamのバージョンが古い

再ビルド・デプロイしてKDA実行したらエラー。

java.lang.ClassNotFoundException: com.google.protobuf.GeneratedMessageV3\n
\tat java.net.URLClassLoader.findClass(URLClassLoader.java:471) ~[?:?]\n

Flink 1.11に対応しているのはBeam 2.25以降とのこと。

pom.xml
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.11</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kda-runtime.version>1.2.0</kda-runtime.version>
        <beam.version>2.25.0</beam.version> <!-- 2.23から2.25に変更 -->
        <jackson.version>2.10.2</jackson.version>
        <flink.version>1.11.1</flink.version>
        <flink.version.minor>1.11</flink.version.minor>
    </properties>

再ビルド・デプロイしてKDA実行したら成功した。

ちなみに、Beamのバージョンをあげすぎる(今回試したのは前述のissueに記載されていた2.28.0)と、今度は以下のエラーが発生したので、2.25に下げた。

java.lang.NoSuchMethodError: 'com.fasterxml.jackson.core.json.JsonWriteContext com.fasterxml.jackson.dataformat.cbor.CBORGenerator.getOutputContext()'\n

kinesis clientとjacksonのバージョン不一致のようだが、Beam(KinesisIOを含む)のバージョン調整で解決した。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0