0
0

More than 1 year has passed since last update.

Apache BeamでS3のデータをElastiCacheにロードする

Last updated at Posted at 2021-11-18

はじめに

S3上のファイルからデータを読み込み、ElastiCache(Redis)にロードする処理を行います。
アプリケーションはApacheBeamを使用して実装し、EMRでSparkアプリとして実行します。

Beamで実装したアプリをEMRで実行する手順については、以下の記事に記載しているため、
本記事では、主にElastiCache(Redis)を出力先とする際に行った対応について記載します。

アプリ実装

Redisへの出力には、Beam提供のI/O Transformsの1つであるRedisIOを使用しました。

https://beam.apache.org/documentation/io/built-in/
https://beam.apache.org/releases/javadoc/2.34.0/org/apache/beam/sdk/io/redis/RedisIO.html

まず依存ライブラリを追加します。

pom.xml
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-redis</artifactId>
    <version>${beam.version}</version>
</dependency>

次にアプリのコードを記述します。
なお、今回は下記のようなCSVファイルを読み込んで、そのままkey/valueとしてRedisに登録するのみです。
ロード対象のCSVファイルは複数ファイルを全て1つのディレクトリ直下に配置してあり、keyの値はkey{ファイル連番}-{レコード連番}のような形式で、valueも同様です。

$ ls /path/to/input/
1.csv   2.csv   3.csv   4.csv   5.csv   6.csv   7.csv   8.csv

$ head /path/to/input/1.csv
key1-1,val1-1
key1-2,val1-2
key1-3,val1-3
key1-4,val1-4
key1-5,val1-5
key1-6,val1-6
key1-7,val1-7
key1-8,val1-8
key1-9,val1-9
key1-10,val1-10

メインクラス

RedisLoadJob.java
public class RedisLoadJob {

    // 入出力設定を抽象化してIOConfigurationに切り出す
    public static void main(String[] args) {
        IOConfiguration config = getConfig(args);

        Pipeline p = Pipeline.create(config.options());

        p
                .apply("source", config.readTransform())
                .apply("sink", config.writeTransform());

        p.run().waitUntilFinish();
    }

    // 引数に応じて入出力設定を切り替える
    private static IOConfiguration getConfig(String[] args) {
        switch (getIoType(args)) {
            case "file-to-redis":
                return new FileToRedisConfiguration(removeIoType(args));
            case "s3-to-redis":
                return new S3ToRedisConfiguration(removeIoType(args));
            default:
                throw new IllegalArgumentException("undefined ioType");
        }
    }

    // 引数からioTypeのみを取得する
    private static String getIoType(String[] args) {
        Optional<String[]> maybeIoType =
                Arrays.stream(args)
                        .map(arg -> arg.split("="))
                        .filter(param -> "--ioType".equals(param[0]))
                        .findFirst();
        if (maybeIoType.isPresent()) {
            return maybeIoType.get()[1];
        } else {
            return "not set";
        }
    }

    // 余計なパラメータをOptionsパーサーに渡すと怒られるので、ioTypeは削除しておく
    private static String[] removeIoType(String[] args) {
        List<String> list = new ArrayList<>(Arrays.asList(args));
        list.removeIf(arg -> "--ioType".equals(arg.split("=")[0]));
        return list.toArray(new String[0]);
    }
}

S3入力・Redis出力用のIO設定

ローカルファイルの場合とS3上のファイルの場合で、PipelineOptionsのawsRegionの要否が異なるので、それぞれのConfigurationクラスを用意してますが、PipelineOptionsの型以外は同じなのでローカルファイル用クラス(FileToRedisConfiguration)の記載は割愛します。

IOConfiguration.java
public interface IOConfiguration {
    PipelineOptions options();
    PTransform<PBegin, PCollection<String>> readTransform();
    PTransform<PCollection<String>, PDone> writeTransform();
}
S3ToRedisConfiguration.java
public class S3ToRedisConfiguration implements IOConfiguration {
    private final S3ToRedisOptions options;

    public S3ToRedisConfiguration(String[] args) {
        this.options =
                PipelineOptionsFactory.fromArgs(args).withValidation().as(S3ToRedisOptions.class);
    }

    public S3ToRedisOptions options() {
        return this.options;
    }

    public PTransform<PBegin, PCollection<String>> readTransform() {
        return TextIO.read().from(this.options.getInputPath());
    }

    public PTransform<PCollection<String>, PDone> writeTransform() {
        return new WriteAdaptor(this.options);
    }

    private static class WriteAdaptor
            extends PTransform<PCollection<String>, PDone> {
        private final PTransform<PCollection<KV<String, String>>, PDone> delegate;

        public WriteAdaptor(S3ToRedisOptions options) {
            delegate = RedisIO.write().withEndpoint(options.getEndpoint(), 6379);
        }

        @Override
        public PDone expand(final PCollection<String> input) {
            return input
                    .apply(ParDo.of(new DoFn<String, KV<String, String>>() {
                        @ProcessElement
                        public void processElement(ProcessContext c) {
                            String[] input = Objects.requireNonNull(c.element()).split(",");
                            String key = input[0];
                            String value = input[1];
                            c.output(KV.of(key, value));
                        }
                    }))
                    .apply(delegate);
        }
    }
}

ちなみに、CSVからkey/valueへの変換処理は、個別に変換用Transformを用意してパイプラインに挟んだ方が単純でよいと思いますが、前述の記事等で紹介している他の入出力パターンと共有しているIOConfigurationインターフェースをそのまま使いたかったので、RedisIOとのComposite Transformのかたちにしているだけです。他意はありません。Composite Transformについては下記参照。

パイプラインオプション

S3ToRedisOptions.java
public interface S3ToRedisOptions extends FileToRedisOptions, AwsOptions { }
FileToRedisOptions.java
public interface FileToRedisOptions extends PipelineOptions {
    @Description("redis endpoint.")
    @Required
    String getEndpoint();

    void setEndpoint(String value);

    @Description("Path of the input file to read from.")
    @Required
    String getInputPath();

    void setInputPath(String value);
}

ローカルでの実行

ローカルで起動したRedisを出力先として、DirectRunnerで実行する場合のコマンド例は以下となります。

# 入力がローカルファイルの場合
mvn compile exec:java \
  -Dexec.mainClass=<パッケージ名>.RedisLoadJob \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --ioType=file-to-redis \
    --inputPath=/path/to/input/*.csv \
    --endpoint=localhost \
    --runner=DirectRunner"

# 入力がS3の場合
mvn compile exec:java \
  -Dexec.mainClass=<パッケージ名>.RedisLoadJob \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --ioType=s3-to-redis \
    --inputPath=s3://<バケット名>/path/to/input/*.csv \
    --awsRegion=ap-northeast-1 \
    --endpoint=localhost \
    --runner=DirectRunner"

ちなみにローカルのRedisはdockerで起動しています。

docker-compose.yml
version: '3'

services:
  redis:
    image: "redis:latest"
    ports:
      - "6379:6379"
    volumes:
      - "./data/redis:/data"
    command: redis-server

AWS上での実行

前提として、PublicサブネットとPrivateサブネットを1つずつ持つVPCが用意されているものとします。

ElastiCacheクラスタ作成

マネジメントコンソールで作成しており、下記設定以外はデフォルトのままとしています。

  • クラスターエンジン:Redis
  • ノードタイプ:cache.t2.small
  • マルチAZ:無効(1つのAZにしかサブネット用意してなかったので)
  • サブネットグループ:上記のPrivateサブネットを含むもの
  • セキュリティグループ:インバウンドでVPC内からの6379ポートアクセスを許可したもの
  • 自動バックアップ:無効(お試し用で不要なので)

また、ElastiCacheにredis-cliでアクセスするためのEC2を上記のPublicサブネットに構築しています。下記ドキュメントに従ってredis-cliをインストールしました。

EMRクラスタ作成

基本的に、冒頭で紹介した記事と同じ構成で作成していますが、上記のPrivateサブネット内にクラスタを作成している点が異なります。
PrivateサブネットからS3にアクセスするために、VPCエンドポイントが必要となります。
EMRもマネジメントコンソールで作成していたため、ガイドに従って「S3エンドポイントとNATインスタンスの追加」で作成しました。(ブートストラップアクションでyum installをしてたりするので念の為NATも作成しましたが、もしかしたら不要だったかもしれません。)

EMRステップ実行

ステップの設定は以下となります。

  • ステップタイプ:Sparkアプリケーション

  • デプロイモード:クラスター

  • Spark-submitオプション:

    --class <パッケージ名>.RedisLoadJob
    
  • アプリケーションの場所:s3://<バケット名>/app/basic-beam-app-1.0.jar

  • 引数:

    --ioType=s3-to-redis
    --inputPath=s3://<バケット名>/path/to/input/*.csv
    --awsRegion=ap-northeast-1
    --endpoint=<クラスター名>.<ID>.ng.0001.apne1.cache.amazonaws.com
    --runner=SparkRunner
    
  • 失敗時の操作:キャンセルして待機

800万レコード(100万レコード×8ファイル)で実行したところ、処理時間は1分強でした。
ちなみにローカルファイル→ローカルRedisでDirectRunnerで実行した際は4〜5分でした。

まとめ

S3を入力としElastiCache(Redis)を出力とする処理をBeamで実装し、EMR上で実行することができました。

Redisへの出力にはRedisIOを使用しましたが、RedisIO.WriteはPTransform<PCollection<KV<java.lang.String,java.lang.String>>,PDone>を継承したものとなっており、valueの形式がStringしか対応していないように見えるので、他のコレクション系のデータ型等を使用したい場合はI/O Transformを自作する等、他の対応方法を検討する必要がありそうです。

追記:RedisIOのコードを見たところ、他のデータ型向けのメソッドも用意されているようなので、RedisIOをうまく拡張して使用することで対応できるかもしれません。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0