Amazon Kinesis Data Analytics(以降、KDAと記載)がApache Beam(以降、Beamと記載)のサポートを開始したこと、また、最近触り始めたGCPのDataflowがBeamによる実装を前提としていることを知りました。
そこで、比較的小さい学習コストで、KDAとDataflowの両方のストリーム処理に加え、並列分散バッチ処理まで実装できるようになったり、プラットフォームをまたいでコードを再利用できるようになることを期待し、Beamの学習を始めてみました。
学習開始にあたり、KDAおよびDataflowで動作させる際の違いを確認するため、KDAとDataflowそれぞれのドキュメントで提供されているチュートリアルから始めました。
次に、ストリーム処理のロジック部分と、入出力や実行環境に依存する部分を切り離し、KDAとDataflowのどちらでも実行できるサンプルコードを作成してみました。
なお、各チュートリアル実施やサンプルコード作成ではJava,Mavenを使用しています。
$ java -version
openjdk version "11.0.12" 2021-07-20
OpenJDK Runtime Environment Homebrew (build 11.0.12+0)
OpenJDK 64-Bit Server VM Homebrew (build 11.0.12+0, mixed mode)
$ mvn -version
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /usr/local/Cellar/maven/3.6.3_1/libexec
Java version: 14.0.1, vendor: N/A, runtime: /usr/local/Cellar/openjdk/14.0.1/libexec/openjdk.jdk/Contents/Home
Default locale: ja_JP, platform encoding: UTF-8
OS name: "mac os x", version: "10.16", arch: "x86_64", family: "mac"
チュートリアルの実施
KDA 「Example: Using Apache Beam」
依存ライブラリのバージョン不一致がらみで、正常動作させるまでに少し苦戦しました。そのときの経緯は以下の記事に記載しています。
Kinesis Data AnalyticsのApache Beamチュートリアルをやってみた
チュートリアル資材と実施時点での最新コミットは以下です。
$ git log
commit 230b503727eb0f8813365dcca6e46280c7f4aea4 (HEAD -> origin/master, origin/HEAD, master)
Author: Pat Fletcher <fletpatr@amazon.com>
Date: Thu Mar 25 16:09:14 2021 +0000
Python examples
最終的に動作確認できた時点のpom.xml(変更箇所のみ抜粋)は以下となります。
<!-- 以上省略 -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.11</java.version>
<scala.binary.version>2.12</scala.binary.version>
<kda-runtime.version>1.2.0</kda-runtime.version>
<beam.version>2.25.0</beam.version> <!-- 2.23.0から2.25.0に変更 -->
<jackson.version>2.10.2</jackson.version>
<flink.version>1.11.1</flink.version> <!-- 追加 -->
<flink.version.minor>1.11</flink.version.minor> <!-- 追加 -->
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<!-- Get the latest SDK version from https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bom -->
<version>1.11.1034</version> <!-- 1.11.903から1.11.1034に変更 -->
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<!-- 以下省略 -->
Dataflow 「Java と Apache Maven を使用したクイックスタート」
GCS上のファイルを入力として、WordCountの結果をローカルやGCSにファイル出力する、というバッチ処理のサンプルでした。
特にはまることなく記載通りの手順で動作確認できました。
ちなみに、Beam公式ページのチュートリアルと同じ内容のようです。
PubSub 「クイックスタート: Dataflow によるストリーム処理」
こちらは、PubSubを入力として、処理結果をGCSに出力するサンプルでした。
こちらも記載通りの手順で特に問題なく動作しました。
処理ロジック共通化のサンプルコード
KDAのチュートリアルのコードをベースとしつつ、他のチュートリアルのコードをツギハギして、複数の入出力・実行環境で動作するサンプルコードを作成してみました。
KDAのチュートリアルで出てきた PingPongFn
のTransformを共通で使いつつ、以下の入出力パターンに対応させています。
入力 | 出力 |
---|---|
KDS | KDS |
PubSub | PubSub |
File (local or GCS) | File (local or GCS) |
PubSub | KDS |
コード内容
共通部分
PubSubIOの使用およびDataflowやDirectRunnerでの実行を可能とするために、以下のライブラリを追加しました。
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
メインクラスは以下のようにしました。
入出力に依存する部分を抽象化してIOConfiguration
インターフェースとし、入出力パターンごとにIOConfiguration
を実装した各クラスを、実行時引数によって切り替え可能にしています。
public class GeneralBeamPingPongJob {
// 入出力設定を抽象化してIOConfigurationに切り出す
public static void main(String[] args) {
IOConfiguration config = getConfig(args);
Pipeline p = Pipeline.create(config.options());
p
.apply("source", config.readTransform())
.apply("Pong transform", ParDo.of(new StringPingPongFn()))
.apply("sink", config.writeTransform());
p.run().waitUntilFinish();
}
// 引数に応じて入出力設定を切り替える
private static IOConfiguration getConfig(String[] args) {
switch (getIoType(args)) {
case "kinesis":
return new KdsToKdsConfiguration(removeIoType(args));
case "pubsub":
return new PubSubToPubSubConfiguration(removeIoType(args));
case "file":
return new FileToFileConfiguration(removeIoType(args));
case "pubsub-to-kds":
return new PubSubToKdsConfiguration(removeIoType(args));
default:
throw new IllegalArgumentException("undefined ioType");
}
}
// 引数からioTypeのみを取得する
private static String getIoType(String[] args) {
Optional<String[]> maybeIoType =
Arrays.stream(args)
.map(arg -> arg.split("="))
.filter(param -> "--ioType".equals(param[0]))
.findFirst();
if (maybeIoType.isPresent()) {
return maybeIoType.get()[1];
} else {
// KDA実行時は引数指定ができないため、デフォルトをKDA用のkinesisにしておく
return "kinesis";
}
}
// 余計なパラメータをOptionsパーサーに渡すと怒られるので、ioTypeは削除しておく
private static String[] removeIoType(String[] args) {
List<String> list = new ArrayList<>(Arrays.asList(args));
list.removeIf(arg -> "--ioType".equals(arg.split("=")[0]));
return list.toArray(new String[0]);
}
}
public interface IOConfiguration {
PipelineOptions options();
PTransform<PBegin, PCollection<String>> readTransform();
PTransform<PCollection<String>, PDone> writeTransform();
}
なお、元の PingPongFn
は、入出力データ型がKinesis依存の型になっていたので、入出力をStringにして汎用化しています。
public class StringPingPongFn extends DoFn<String, String> {
private static final Logger LOG = LoggerFactory.getLogger(StringPingPongFn.class);
@ProcessElement
public void processElement(ProcessContext c) {
String content = c.element();
if (content.trim().equalsIgnoreCase("ping")) {
LOG.info("Ponged!");
c.output("pong\n");
} else {
LOG.info("No action for: " + content);
c.output(c.element());
}
}
}
KDS to KDS
KDS入出力用の IOConfiguration
実装は以下となります。
元のBasicBeamStreamingJob
内で実装されていた以下のKinesis固有処理を記述しています。
- KDAのRuntimePropertiesを、
PipelineOptionsFactory
で合わせてパースできるようにJava実行時引数の形式に変換 - AWSリージョンの取得
-
KinesisIO
を使用したPipeline I/Oの設定- PartitionKeyの設定
public class KdsToKdsConfiguration implements IOConfiguration {
private static final String BEAM_APPLICATION_PROPERTIES = "BeamApplicationProperties";
private final BasicBeamStreamingJobOptions options;
private final Regions region;
public KdsToKdsConfiguration(String[] args) {
String[] kinesisArgs;
try {
kinesisArgs = BasicBeamStreamingJobOptionsParser.argsFromKinesisApplicationProperties(args, BEAM_APPLICATION_PROPERTIES);
} catch(IllegalStateException e) {
e.printStackTrace();
kinesisArgs = new String[0];
}
this.options =
PipelineOptionsFactory.fromArgs(ArrayUtils.addAll(args, kinesisArgs)).withValidation().as(BasicBeamStreamingJobOptions.class);
this.region = Optional
.ofNullable(Regions.getCurrentRegion())
.map(r -> Regions.fromName(r.getName()))
.orElse(Regions.fromName(options.getAwsRegion()));
}
public BasicBeamStreamingJobOptions options() {
return this.options;
}
public PTransform<PBegin, PCollection<String>> readTransform() {
return new ReadAdaptor(this.options, this.region);
}
public PTransform<PCollection<String>, PDone> writeTransform() {
return new WriteAdaptor(this.options, this.region);
}
private static class ReadAdaptor
extends PTransform<PBegin, PCollection<String>> {
private final PTransform<PBegin, PCollection<KinesisRecord>> delegate;
public ReadAdaptor(BasicBeamStreamingJobOptions options, Regions region) {
delegate = KinesisIO
.read()
.withStreamName(options.getInputStreamName())
.withAWSClientsProvider(new DefaultCredentialsProviderClientsProvider(region))
.withInitialPositionInStream(InitialPositionInStream.LATEST);
}
@Override
public PCollection<String> expand(final PBegin input) {
return delegate
.expand(input)
.apply(ParDo.of(new DoFn<KinesisRecord, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8);
c.output(content);
}
}));
}
}
private static class WriteAdaptor
extends PTransform<PCollection<String>, PDone> {
private final PTransform<PCollection<byte[]>, PDone> delegate;
public WriteAdaptor(BasicBeamStreamingJobOptions options, Regions region) {
delegate = KinesisIO
.write()
.withStreamName(options.getOutputStreamName())
.withAWSClientsProvider(new DefaultCredentialsProviderClientsProvider(region))
// for this to properly balance across shards, the keys would need to be supplied dynamically
.withPartitioner(new SimpleHashPartitioner());
}
@Override
public PDone expand(final PCollection<String> input) {
return input
.apply(ParDo.of(new DoFn<String, byte[]>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().getBytes(StandardCharsets.UTF_8));
}
}))
.apply(delegate);
}
}
private static final class SimpleHashPartitioner implements KinesisPartitioner {
@Override
public String getPartitionKey(byte[] value) {
return String.valueOf(Arrays.hashCode(value));
}
@Override
public String getExplicitHashKey(byte[] value) {
return null;
}
}
}
また、前述のとおり元のPingPongFn
をStringPingPongFn
に変えた部分については、ReadAdaptor
/WriteAdaptor
でデータ型変換を行って吸収しています。
ここではBeamのComposite Transformsという仕組みを使って、KinesisIOトランスフォームと入出力データ型のString変換トランスフォームを結合したものを、Pipeline I/Oとして返しています。
Beamのドキュメントだけだと、既存のPipeline I/Oと他のトランスフォームを結合する方法がよくわからなかったのですが、Talendのドキュメントでそれらしいサンプルコードを見つけたので参考にしました。
これを見て、Pipeline I/Oの場合は、PCollection<T>
の代わりにPBegin
、PDone
という型が使われていることに気づきました。
なお、コード内でoptions.setRunner(FlinkRunner.class);
のようにrunnerを指定していると、DirectRunnerなどFlink以外で実行した場合にエラーとなるので、該当の記述は削除しました。
PubSub to PubSub
PubSubの場合およびDataflow実行の場合は、Kinesisの場合にやっていたAWSリージョンとKDSのPartitionKeyの設定やデータ型変換などが不要なため、比較的単純になりました。
public class PubSubToPubSubConfiguration implements IOConfiguration {
private final PubSubToPubSubOptions options;
public PubSubToPubSubConfiguration(String[] args) {
this.options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToPubSubOptions.class);
this.options.setStreaming(true);
}
public PubSubToPubSubOptions options() {
return this.options;
}
public PTransform<PBegin, PCollection<String>> readTransform() {
return PubsubIO.readStrings().fromTopic(this.options.getInputTopic());
}
public PTransform<PCollection<String>, PDone> writeTransform() {
return PubsubIO.writeStrings().to(this.options.getOutputTopic());
}
}
public interface PubSubToPubSubOptions extends PipelineOptions, StreamingOptions {
@Description("The Cloud Pub/Sub topic to read from.")
@Required
String getInputTopic();
void setInputTopic(String value);
@Description("The Cloud Pub/Sub topic to publish to.")
@Required
String getOutputTopic();
void setOutputTopic(String outputTopic);
}
PubSubへの出力については、Google公式のDataflowテンプレートのコードも参考にしました。
ちなみに、今回はいったんPubSubの入力にはトピックを指定していますが、サブスクリプションを指定することも可能なようです。トピック指定の場合は実行毎にサブスクリプションが自動作成されるので、実際の開発では事前作成済のサブスクリプションを指定することの方が多いかもしれません。
また、テンプレートのコードだと、Optionsのデータ型が ValueProvide<T>
になっています。これはパラメータ値を実行時に読み取るようにしてテンプレートのパラメータとして指定可能にする場合に必要なようです。
File to File
ファイル入出力の場合も比較的単純です。バッチ処理やローカルでの結合試験等で使えそうです。
public class FileToFileConfiguration implements IOConfiguration {
private final FileToFileOptions options;
public FileToFileConfiguration(String[] args) {
this.options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(FileToFileOptions.class);
}
public FileToFileOptions options() {
return this.options;
}
public PTransform<PBegin, PCollection<String>> readTransform() {
return TextIO.read().from(this.options.getInputPath());
}
public PTransform<PCollection<String>, PDone> writeTransform() {
return TextIO.write().to(this.options.getOutputPath())
.withSuffix(this.options.getOutputSuffix());
}
}
public interface FileToFileOptions extends PipelineOptions, StreamingOptions {
@Description("Path of the input file to read from.")
@Required
String getInputPath();
void setInputPath(String value);
@Description("Path of the output file including its filename prefix.")
@Required
String getOutputPath();
void setOutputPath(String value);
@Description("Suffix of the output file.")
@Required
String getOutputSuffix();
void setOutputSuffix(String value);
}
PubSub to KDS
プラットフォームをまたいで入出力するパターンとなります。
基本的に前述のパターンを組み合わせたコードとなっていますが、別プラットフォームのリソースにアクセスするために認証情報の取得・保持方法を考える必要があります。
public class PubSubToKdsConfiguration implements IOConfiguration {
private static final String BEAM_APPLICATION_PROPERTIES = "BeamApplicationProperties";
private final PubSubToKdsOptions options;
private final Regions region;
public PubSubToKdsConfiguration(String[] args) {
String[] kinesisArgs;
try {
kinesisArgs = BasicBeamStreamingJobOptionsParser.argsFromKinesisApplicationProperties(args, BEAM_APPLICATION_PROPERTIES);
} catch(IllegalStateException e) {
e.printStackTrace();
kinesisArgs = new String[0];
}
this.options =
PipelineOptionsFactory.fromArgs(ArrayUtils.addAll(args, kinesisArgs)).withValidation().as(PubSubToKdsOptions.class);
this.region = Optional
.ofNullable(Regions.getCurrentRegion())
.map(r -> Regions.fromName(r.getName()))
.orElse(Regions.fromName(options.getAwsRegion()));
}
public PubSubToKdsOptions options() {
return this.options;
}
public PTransform<PBegin, PCollection<String>> readTransform() {
return PubsubIO.readStrings().fromTopic(this.options.getInputTopic());
}
public PTransform<PCollection<String>, PDone> writeTransform() {
return new WriteAdaptor(this.options, this.region);
}
private static class WriteAdaptor
extends PTransform<PCollection<String>, PDone> {
private final PTransform<PCollection<byte[]>, PDone> delegate;
public WriteAdaptor(PubSubToKdsOptions options, Regions region) {
delegate = KinesisIO
.write()
.withStreamName(options.getOutputStreamName())
.withAWSClientsProvider(options.getAwsAccessKeyId(), options.getAwsSecretAccessKey(), region)
// for this to properly balance across shards, the keys would need to be supplied dynamically
.withPartitioner(new SimpleHashPartitioner());
}
@Override
public PDone expand(final PCollection<String> input) {
return input
.apply(ParDo.of(new DoFn<String, byte[]>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().getBytes(StandardCharsets.UTF_8));
}
}))
.apply(delegate);
}
}
private static final class SimpleHashPartitioner implements KinesisPartitioner {
@Override
public String getPartitionKey(byte[] value) {
return String.valueOf(Arrays.hashCode(value));
}
@Override
public String getExplicitHashKey(byte[] value) {
return null;
}
}
}
public interface PubSubToKdsOptions extends FlinkPipelineOptions, AwsOptions, GcpOptions {
@Description("The Cloud Pub/Sub topic to read from.")
@Required
String getInputTopic();
void setInputTopic(String value);
@Description("AWS access key.")
String getAwsAccessKeyId();
void setAwsAccessKeyId(String value);
@Description("AWS secret key.")
String getAwsSecretAccessKey();
void setAwsSecretAccessKey(String value);
@Description("Name of the Kinesis Data Stream to write to")
@Required
String getOutputStreamName();
void setOutputStreamName(String value);
}
Dataflow実行時に必要となるKDSアクセスの認証情報については、いったん簡易的な方法としてアクセスキーとシークレットキーをOptionsで渡しています。ただし、あまりセキュリティ的によろしくないと思うので、AWS STSを使うようにAWSClientsProviderを実装したものを使った方がよさそうです。(Options指定だとCloud Consoleのジョブ情報で値が表示されてしまいます。)
一方で、KDA実行時に必要となるPubSubアクセスの認証情報については、うまくやる方法をまだ見つけられていません。
GCPの認証情報は、環境変数GOOGLE_APPLICATION_CREDENTIALS
にパス設定されたサービスアカウントのキーJSONファイルを取得するのが基本のようなのですが、KDAの場合は環境変数や外部ファイルを設定することができない認識です。
PubSubIO自体にはcredentialsを設定するようなメソッドが見当たらず、GcpOptions
を実装すれば以下のように環境変数を使わずパス指定可能なようですが、これもjsonファイルをKDAに渡す必要はあります。S3等からファイル取得する等は可能かもしれませんが、それもイマイチな気がします。
options.setGcpCredential(ServiceAccountCredentials.fromStream(new FileInputStream("key.json")).createScoped(SCOPES));
実行例
上記コードを各種の入出力・実行環境で実行してみます。
なお、前提として、ローカルの実行環境では認証情報として以下が設定されています。(いったん検証用のため権限が大きいですが、実用の際は最小権限に絞る必要があります。)
- AWSアカウントのAdministrator権限を持つIAMユーザのアクセスキー・シークレットキーが、
~/.aws/credentials
に設定されている - GCPプロジェクトのオーナー権限を持つサービスアカウントのJSONキーが配置されており、環境変数
GOOGLE_APPLICATION_CREDENTIALS
にパスが設定されている
KDS to KDS
KDAのチュートリアルと同様に、事前にKDSストリームを作成した状態で実行しました。
# ローカルでDirectRunnerで実行する場合
$ mvn compile exec:java \
-Dexec.mainClass=com.amazonaws.kinesisanalytics.beam.GeneralBeamPingPongJob \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--ioType=kinesis \
--inputStreamName=<入力ストリーム名> \
--outputStreamName=<出力ストリーム名> \
--awsRegion=ap-northeast-1 \
--runner=DirectRunner"
ローカルで起動したFlinkクラスタ上でも実行してみました
執筆時点でKDAがサポートするFlinkバージョンは1.11.1(Scala2.12)
以下からflink1.11.1バイナリをダウンロードしておく
https://archive.apache.org/dist/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.12.tgz
# ローカルのFlinkクラスタ上で実行する場合
$ tar zxvf flink-1.11.1-bin-scala_2.12.tgz
$ ./flink-1.11.1/bin/start-cluster.sh
$ ./flink-1.11.1/bin/flink run -c com.amazonaws.kinesisanalytics.beam.GeneralBeamPingPongJob ./target/basic-beam-app-1.0.jar \
--ioType=kinesis \
--inputStreamName=<入力ストリーム名> \
--outputStreamName=<出力ストリーム名> \
--awsRegion=ap-northeast-1
$ ./flink-1.11.1/bin/stop-cluster.sh
KDAで実行する場合は、マネジメントコンソールを使用し、S3にアップロードしたJARをKDAで指定して実行しました。KDAのチュートリアルと同じ手順ですが、コード内でrunner設定の記述を削除した代わりにRuntime Propertiesに以下を追加しています。
- グループ:
BeamApplicationProperties
- キー:
Runner
- 値:
FlinkRunner
PubSub to PubSub
PubSubのチュートリアルと同様に、事前にPubSubトピックとGCSバケットを作成した状態で実行しました。
# ローカルでDirectRunnerで実行する場合
$ mvn compile exec:java \
-Dexec.mainClass=com.amazonaws.kinesisanalytics.beam.GeneralBeamPingPongJob \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--ioType=pubsub \
--project=<プロジェクト名> \
--inputTopic=projects/<プロジェクト名>/topics/<入力トピック名> \
--outputTopic=projects/<プロジェクト名>/topics/<出力トピック名> \
--runner=DirectRunner"
# Dataflowで実行する場合
$ mvn compile exec:java \
-Dexec.mainClass=com.amazonaws.kinesisanalytics.beam.GeneralBeamPingPongJob \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--ioType=pubsub \
--project=<プロジェクト名> \
--inputTopic=projects/<プロジェクト名>/topics/<入力トピック名> \
--outputTopic=projects/<プロジェクト名>/topics/<出力トピック名> \
--region=asia-northeast1 \
--gcpTempLocation=gs://<ステージング用バケット名>/temp \
--runner=DataflowRunner"
KDAは常駐プロセスのイメージなのに対し、Dataflowはストリーム処理でもジョブというかたちで実行されます。今回はチュートリアルと同様にローカルからコマンド実行しましたが、本番運用時は事前にテンプレートを作成してGCS保存しておき、テンプレートを指定して実行するものなのかもしれません。
File to File
# ローカルファイル入出力
# ローカルでDirectRunnerで実行する場合
$ mvn compile exec:java \
-Dexec.mainClass=com.amazonaws.kinesisanalytics.beam.GeneralBeamPingPongJob \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--ioType=file \
--inputPath=data/input/*.csv \
--outputPath=data/output/out \
--outputSuffix=.csv \
--runner=DirectRunner"
# GCSファイル入出力
# ローカルでDirectRunnerで実行する場合
$ mvn compile exec:java \
-Dexec.mainClass=com.amazonaws.kinesisanalytics.beam.GeneralBeamPingPongJob \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--ioType=file \
--project=<プロジェクト名> \
--inputPath=gs://<入出力バケット名>/input/*.csv \
--outputPath=gs://<入出力バケット名>/output/out \
--outputSuffix=.csv \
--runner=DirectRunner"
# GCSファイル入出力
# Dataflowで実行する場合
$ mvn compile exec:java \
-Dexec.mainClass=com.amazonaws.kinesisanalytics.beam.GeneralBeamPingPongJob \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--ioType=file \
--project=<プロジェクト名> \
--inputPath=gs://<入出力バケット名>/input/*.csv \
--outputPath=gs://<入出力バケット名>/output/out \
--outputSuffix=.csv \
--region=asia-northeast1 \
--gcpTempLocation=gs://<ステージング用バケット名>/temp \
--runner=DataflowRunner"
PubSub to KDS
AWS認証情報をオプションで渡すようにしています。
# ローカルでDirectRunnerで実行する場合
$ mvn compile exec:java \
-Dexec.mainClass=com.amazonaws.kinesisanalytics.beam.GeneralBeamPingPongJob \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--ioType=pubsub-to-kds \
--project=<プロジェクト名> \
--inputTopic=projects/<プロジェクト名>/topics/<入力トピック名> \
--outputStreamName=<出力ストリーム名> \
--awsRegion=ap-northeast-1 \
--awsAccessKey=$(aws configure get aws_access_key_id) \
--awsSecretKey=$(aws configure get aws_secret_access_key) \
--runner=DirectRunner"
# Dataflowで実行する場合
$ mvn compile exec:java \
-Dexec.mainClass=com.amazonaws.kinesisanalytics.beam.GeneralBeamPingPongJob \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--ioType=pubsub-to-kds \
--project=<プロジェクト名> \
--inputTopic=projects/<プロジェクト名>/topics/<入力トピック名> \
--outputStreamName=<出力ストリーム名> \
--awsRegion=ap-northeast-1 \
--awsAccessKey=$(aws configure get aws_access_key_id) \
--awsSecretKey=$(aws configure get aws_secret_access_key) \
--region=asia-northeast1 \
--gcpTempLocation=gs://<ステージング用バケット名>/temp \
--runner=DataflowRunner"
今回は未対応ですが、逆のKDS to PubSubのパターンも同じ要領でできそうです。
また、本当はこのパターンをKDAでも実行できるようにしたかったのですが、前述のとおりGCP認証情報の扱いが面倒でまだできていません。
まとめ
チュートリアル用のサンプルコードを流用した単純な処理ではありますが、ストリーム処理の共通の1つのコード(Transform,Pipeline)を、KDAとDataflowの両方で動作させることができました。また、ファイル入出力のバッチ処理として動作させることもできました。
実際の開発では今回のサンプルコードのように入出力や実行環境を切り替えるユースケースはあまりないかと思いますが、1つ1つのTransformやそれらをつなげたPipelineの定義を、入出力や実行環境に依存しないかたちで実装しておくことで、プラットフォームをまたいだ移植性や再利用性を備えることができそうです。