はじめに
S3上のファイルからデータを読み込み、ElastiCache(Redis)にロードする処理を行います。
アプリケーションはApacheBeamを使用して実装し、EMRでSparkアプリとして実行します。
Beamで実装したアプリをEMRで実行する手順については、以下の記事に記載しているため、
本記事では、主にElastiCache(Redis)を出力先とする際に行った対応について記載します。
アプリ実装
Redisへの出力には、Beam提供のI/O Transformsの1つであるRedisIOを使用しました。
https://beam.apache.org/documentation/io/built-in/
https://beam.apache.org/releases/javadoc/2.34.0/org/apache/beam/sdk/io/redis/RedisIO.html
まず依存ライブラリを追加します。
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-redis</artifactId>
<version>${beam.version}</version>
</dependency>
次にアプリのコードを記述します。
なお、今回は下記のようなCSVファイルを読み込んで、そのままkey/valueとしてRedisに登録するのみです。
ロード対象のCSVファイルは複数ファイルを全て1つのディレクトリ直下に配置してあり、keyの値はkey{ファイル連番}-{レコード連番}
のような形式で、valueも同様です。
$ ls /path/to/input/
1.csv 2.csv 3.csv 4.csv 5.csv 6.csv 7.csv 8.csv
$ head /path/to/input/1.csv
key1-1,val1-1
key1-2,val1-2
key1-3,val1-3
key1-4,val1-4
key1-5,val1-5
key1-6,val1-6
key1-7,val1-7
key1-8,val1-8
key1-9,val1-9
key1-10,val1-10
メインクラス
public class RedisLoadJob {
// 入出力設定を抽象化してIOConfigurationに切り出す
public static void main(String[] args) {
IOConfiguration config = getConfig(args);
Pipeline p = Pipeline.create(config.options());
p
.apply("source", config.readTransform())
.apply("sink", config.writeTransform());
p.run().waitUntilFinish();
}
// 引数に応じて入出力設定を切り替える
private static IOConfiguration getConfig(String[] args) {
switch (getIoType(args)) {
case "file-to-redis":
return new FileToRedisConfiguration(removeIoType(args));
case "s3-to-redis":
return new S3ToRedisConfiguration(removeIoType(args));
default:
throw new IllegalArgumentException("undefined ioType");
}
}
// 引数からioTypeのみを取得する
private static String getIoType(String[] args) {
Optional<String[]> maybeIoType =
Arrays.stream(args)
.map(arg -> arg.split("="))
.filter(param -> "--ioType".equals(param[0]))
.findFirst();
if (maybeIoType.isPresent()) {
return maybeIoType.get()[1];
} else {
return "not set";
}
}
// 余計なパラメータをOptionsパーサーに渡すと怒られるので、ioTypeは削除しておく
private static String[] removeIoType(String[] args) {
List<String> list = new ArrayList<>(Arrays.asList(args));
list.removeIf(arg -> "--ioType".equals(arg.split("=")[0]));
return list.toArray(new String[0]);
}
}
S3入力・Redis出力用のIO設定
ローカルファイルの場合とS3上のファイルの場合で、PipelineOptionsのawsRegionの要否が異なるので、それぞれのConfigurationクラスを用意してますが、PipelineOptionsの型以外は同じなのでローカルファイル用クラス(FileToRedisConfiguration)の記載は割愛します。
public interface IOConfiguration {
PipelineOptions options();
PTransform<PBegin, PCollection<String>> readTransform();
PTransform<PCollection<String>, PDone> writeTransform();
}
public class S3ToRedisConfiguration implements IOConfiguration {
private final S3ToRedisOptions options;
public S3ToRedisConfiguration(String[] args) {
this.options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(S3ToRedisOptions.class);
}
public S3ToRedisOptions options() {
return this.options;
}
public PTransform<PBegin, PCollection<String>> readTransform() {
return TextIO.read().from(this.options.getInputPath());
}
public PTransform<PCollection<String>, PDone> writeTransform() {
return new WriteAdaptor(this.options);
}
private static class WriteAdaptor
extends PTransform<PCollection<String>, PDone> {
private final PTransform<PCollection<KV<String, String>>, PDone> delegate;
public WriteAdaptor(S3ToRedisOptions options) {
delegate = RedisIO.write().withEndpoint(options.getEndpoint(), 6379);
}
@Override
public PDone expand(final PCollection<String> input) {
return input
.apply(ParDo.of(new DoFn<String, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
String[] input = Objects.requireNonNull(c.element()).split(",");
String key = input[0];
String value = input[1];
c.output(KV.of(key, value));
}
}))
.apply(delegate);
}
}
}
ちなみに、CSVからkey/valueへの変換処理は、個別に変換用Transformを用意してパイプラインに挟んだ方が単純でよいと思いますが、前述の記事等で紹介している他の入出力パターンと共有しているIOConfiguration
インターフェースをそのまま使いたかったので、RedisIOとのComposite Transformのかたちにしているだけです。他意はありません。Composite Transformについては下記参照。
パイプラインオプション
public interface S3ToRedisOptions extends FileToRedisOptions, AwsOptions { }
public interface FileToRedisOptions extends PipelineOptions {
@Description("redis endpoint.")
@Required
String getEndpoint();
void setEndpoint(String value);
@Description("Path of the input file to read from.")
@Required
String getInputPath();
void setInputPath(String value);
}
ローカルでの実行
ローカルで起動したRedisを出力先として、DirectRunnerで実行する場合のコマンド例は以下となります。
# 入力がローカルファイルの場合
mvn compile exec:java \
-Dexec.mainClass=<パッケージ名>.RedisLoadJob \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--ioType=file-to-redis \
--inputPath=/path/to/input/*.csv \
--endpoint=localhost \
--runner=DirectRunner"
# 入力がS3の場合
mvn compile exec:java \
-Dexec.mainClass=<パッケージ名>.RedisLoadJob \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--ioType=s3-to-redis \
--inputPath=s3://<バケット名>/path/to/input/*.csv \
--awsRegion=ap-northeast-1 \
--endpoint=localhost \
--runner=DirectRunner"
ちなみにローカルのRedisはdockerで起動しています。
version: '3'
services:
redis:
image: "redis:latest"
ports:
- "6379:6379"
volumes:
- "./data/redis:/data"
command: redis-server
AWS上での実行
前提として、PublicサブネットとPrivateサブネットを1つずつ持つVPCが用意されているものとします。
ElastiCacheクラスタ作成
マネジメントコンソールで作成しており、下記設定以外はデフォルトのままとしています。
- クラスターエンジン:Redis
- ノードタイプ:cache.t2.small
- マルチAZ:無効(1つのAZにしかサブネット用意してなかったので)
- サブネットグループ:上記のPrivateサブネットを含むもの
- セキュリティグループ:インバウンドでVPC内からの6379ポートアクセスを許可したもの
- 自動バックアップ:無効(お試し用で不要なので)
また、ElastiCacheにredis-cliでアクセスするためのEC2を上記のPublicサブネットに構築しています。下記ドキュメントに従ってredis-cliをインストールしました。
EMRクラスタ作成
基本的に、冒頭で紹介した記事と同じ構成で作成していますが、上記のPrivateサブネット内にクラスタを作成している点が異なります。
PrivateサブネットからS3にアクセスするために、VPCエンドポイントが必要となります。
EMRもマネジメントコンソールで作成していたため、ガイドに従って「S3エンドポイントとNATインスタンスの追加」で作成しました。(ブートストラップアクションでyum install
をしてたりするので念の為NATも作成しましたが、もしかしたら不要だったかもしれません。)
EMRステップ実行
ステップの設定は以下となります。
-
ステップタイプ:Sparkアプリケーション
-
デプロイモード:クラスター
-
Spark-submitオプション:
--class <パッケージ名>.RedisLoadJob
-
アプリケーションの場所:
s3://<バケット名>/app/basic-beam-app-1.0.jar
-
引数:
--ioType=s3-to-redis --inputPath=s3://<バケット名>/path/to/input/*.csv --awsRegion=ap-northeast-1 --endpoint=<クラスター名>.<ID>.ng.0001.apne1.cache.amazonaws.com --runner=SparkRunner
-
失敗時の操作:キャンセルして待機
800万レコード(100万レコード×8ファイル)で実行したところ、処理時間は1分強でした。
ちなみにローカルファイル→ローカルRedisでDirectRunnerで実行した際は4〜5分でした。
まとめ
S3を入力としElastiCache(Redis)を出力とする処理をBeamで実装し、EMR上で実行することができました。
Redisへの出力にはRedisIOを使用しましたが、RedisIO.WriteはPTransform<PCollection<KV<java.lang.String,java.lang.String>>,PDone>
を継承したものとなっており、valueの形式がStringしか対応していないように見えるので、他のコレクション系のデータ型等を使用したい場合はI/O Transformを自作する等、他の対応方法を検討する必要がありそうです。
追記:RedisIOのコードを見たところ、他のデータ型向けのメソッドも用意されているようなので、RedisIOをうまく拡張して使用することで対応できるかもしれません。