はじめに
以下の記事では、Beamを使用して実装した1つのコードを、ストリーム処理が可能なマネージドサービスであるGCPのDataflowとAWSのKinesisDataAnalyticsそれぞれで実行することを試しました。
Beamはバッチ処理とストリーム処理の両方に対応したプログラミングモデルなので、今回は、バッチ処理をGCPとAWSそれぞれのマネージドサービス上で実行してみました。
GCPの場合、Dataflowでそのままバッチ処理も実行できるため、既に上記の記事でGCSをファイル入出力先としたバッチ処理をお試し済なので割愛します。
AWSの場合、ETLサービスまたはサーバレスなSpark実行環境としてDataflowと比較されることのあるサービスとして、AWS Glueが挙げられます。ただ、個人的にGlueについて勉強不足で、作成済のJARをデプロイして実行する方法がよくわからなかったので、まずはAmazon EMR上で実行してみることにしました。
なお、本記事のコードは、前述の記事で紹介したコードをベースに追加実装を行ったものとなっています。
Spark Runnerの追加
はじめに、Sparkで実行可能とするために、以下のページを参考にSparkRunnerを追加しました。
以下の依存ライブラリを追加したのみです。
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-spark</artifactId>
  <version>${beam.version}</version>
</dependency>
ここでいったんローカルのSpark環境で実行してみました。実行コマンドは以下となります。
$ spark-submit \
  --class <パッケージ名>.GeneralBeamPingPongJob \
  --master local \
  /tmp/data/basic-beam-app-1.0.jar \
  --ioType=file \
  --inputPath=/tmp/data/input/*.csv \
  --outputPath=/tmp/data/output/out \
  --outputSuffix=.csv \
  --runner=SparkRunner
なお、ローカルのSpark環境は、以下を利用してdockerで起動しています。
ただ、そのままだと実行時に以下のエラーが出てしまいました。
Exception in thread "main" java.lang.UnsupportedClassVersionError: path/to/GeneralBeamPingPongJob has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.spark.util.Utils$.classForName(Utils.scala:229)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:700)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
そこで、Dockerfileに以下の修正を行ったうえでビルドしたイメージに差し替えて実行したところ成功しました。
- ベースイメージをJava11のものに変更
- Hadoopのバージョンを3.2.0に変更
- Sparkのバージョンを3.2.0に変更
S3入出力の追加
次に、S3をファイル入出力先とするための設定部分の実装を追加しました。
PipelineOptionsについては、元々用意していたファイル入出力用のinterfaceにAwsOptionsの継承を追加しただけです。
public interface S3ToS3Options extends FileToFileOptions, AwsOptions { }
S3入出力用に追加した入出力設定の実装は以下です。使用するPipelineOptionsの型以外はローカルやGCSのファイル入出力用のクラスと同じです。
public class S3ToS3Configuration implements IOConfiguration {
    private final S3ToS3Options options;
    public S3ToS3Configuration(String[] args) {
        this.options =
                PipelineOptionsFactory.fromArgs(args).withValidation().as(S3ToS3Options.class);
    }
    public S3ToS3Options options() {
        return this.options;
    }
    public PTransform<PBegin, PCollection<String>> readTransform() {
        return TextIO.read().from(this.options.getInputPath());
    }
    public PTransform<PCollection<String>, PDone> writeTransform() {
        return TextIO.write().to(this.options.getOutputPath())
                .withSuffix(this.options.getOutputSuffix());
    }
}
メインクラスは以下のようになっています。
public class GeneralBeamPingPongJob {
    // 入出力設定を抽象化してIOConfigurationに切り出す
    public static void main(String[] args) {
        IOConfiguration config = getConfig(args);
        Pipeline p = Pipeline.create(config.options());
        p
                .apply("source", config.readTransform())
                .apply("Pong transform", ParDo.of(new StringPingPongFn()))
                .apply("sink", config.writeTransform());
        p.run().waitUntilFinish();
    }
    // 引数に応じて入出力設定を切り替える
    private static IOConfiguration getConfig(String[] args) {
        switch (getIoType(args)) {
            case "kinesis":
                return new KdsToKdsConfiguration(removeIoType(args));
            case "pubsub":
                return new PubSubToPubSubConfiguration(removeIoType(args));
            case "file":
                return new FileToFileConfiguration(removeIoType(args));
            case "pubsub-to-kds":
                return new PubSubToKdsConfiguration(removeIoType(args));
            case "s3":
                return new S3ToS3Configuration(removeIoType(args)); // 今回追加
            default:
                throw new IllegalArgumentException("undefined ioType");
        }
    }
    // 以下省略
}
こちらもまずローカルのSpark環境で実行してみました。
入出力先をS3のパスとし、オプションとしてawsRegionを追加しています。
また、AWSの認証情報(~/.aws/)をSpark環境のコンテナ上にマウントしています。
$ spark-submit \
  --class <パッケージ名>.GeneralBeamPingPongJob \
  --master local \
  /tmp/data/basic-beam-app-1.0.jar \
  --ioType=s3 \
  --inputPath=s3://<入出力バケット>/input/*.csv \
  --outputPath=s3://<入出力バケット>/output/out \
  --outputSuffix=.csv \
  --awsRegion=ap-northeast-1 \
  --runner=SparkRunner
EMR上で実行
最後に、上記のアプリをEMR上で実行しました。
最初に試した設定内容は以下です。
- クイックオプションでクラスタ構築
- リリース:emr-6.4.0
- アプリケーション:Spark: Spark 2.4.7 on Hadoop 2.10.1 YARN and Zeppelin 0.9.0
- 他はすべてデフォルトのまま
 
- ステップ
- 
ステップタイプ:Sparkアプリケーション 
- 
デプロイモード:クラスター 
- 
Spark-submitオプション --class <パッケージ名>.GeneralBeamPingPongJob
- 
アプリケーションの場所: s3://<バケット名>/app/basic-beam-app-1.0.jar
- 
引数 --ioType=s3 --inputPath=s3://<バケット名>/input/*.csv --outputPath=s3://<バケット名>/output/out --outputSuffix=.csv --awsRegion=ap-northeast-1 --runner=SparkRunner
- 
失敗時の操作:クラスターを終了 
 
- 
すると、前述のローカルSpark環境での実行時と同じエラーが出てしまいました。EMRの最新リリースでも実行環境はJava8になっているようです。
EMRをJava11に対応させるのに少しハマりました。
以下のサイト等を参考にして、ブートストラップアクションでJava11をインストールしてみたり(試した結果、既にインストールはされていたことがわかった)、ソフトウェア設定でJAVA_HOMEを変えてみたりしましたが、いずれもプロビジョニング失敗になってしまいました。
結論としては、以下のサイトの記載どおりに対応したら成功しました。
- クラスタの詳細オプションで、emr-6.4.0を選択し、Spark 3.1.2にチェック
- ステップ設定は上記と同じ
- ソフトウェア設定で一部のGCオプションを無視するよう設定する
- ブートストラップアクションでJava11への切り替えを行う
ちなみに、S3アクセスの権限設定を特にしていなかったですが、EMRのデフォルトロールにS3アクセス権限が含まれていたようです。
まとめ
Beamを使用して実装した処理を、ローカルおよびEMRのSpark上で実行することができました。
SparkRunnerを指定するだけで簡単にSpark上でも実行可能になりましたが、Java11ベースのバージョンで実装していたため、Spark実行環境もJava11に対応させる必要がありました。