はじめに
現状ではFlinkのDataStreamAPIを使用して実装しているストリーム処理について、Apache Beamを使用する場合はどのように実装すればよいのかを調べています。その中で、ストリーム処理中でデータストアからデータを取得し、ストリームイベントのエンリッチメントを行う方法を試しています。
前提として、データストアにはアプリケーションのメモリ内では保持しきれないレベルの大量データが保存されており、ストリームイベント内の項目をキーとしてデータストアからキーに紐づくデータのみを取得する必要があるケースを想定しています。
FlinkのDataStreamAPIの場合はAsync I/Oを使用していましたが、Beamのドキュメント内ではFlinkのAsync I/O相当の機能の記載が見つけられませんでした。
ググって見つけた以下の記事では、エンリッチメント方法として3つのパターンが紹介されていました。
- IO.ReadAll Pattern
- Side Inputs
- Custom DoFn
まだ理解不足な部分が多々あるので思い込みの可能性も高いですが、IO.ReadAllについては名称から想像するにデータストアに大量データがあるケースには不向きと思われたこと、また、Side InputsについてもBeamの公式ドキュメントの記載からは今回のユースケースに合うか判断できなかったことから、まずは単純にDoFn内で都度データストアからデータ取得する方法を試してみました。
前提
実行環境
$ java -version
openjdk version "11.0.12" 2021-07-20
OpenJDK Runtime Environment Homebrew (build 11.0.12+0)
OpenJDK 64-Bit Server VM Homebrew (build 11.0.12+0, mixed mode)
$ mvn -version
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /usr/local/Cellar/maven/3.6.3_1/libexec
Java version: 14.0.1, vendor: N/A, runtime: /usr/local/Cellar/openjdk/14.0.1/libexec/openjdk.jdk/Contents/Home
Default locale: ja_JP, platform encoding: UTF-8
OS name: "mac os x", version: "10.16", arch: "x86_64", family: "mac"
本記事では以下の記事のコードをベースとして、パイプラインの真ん中のDoFn実装クラスを今回作成したものに差し替えて実行しています。
p
.apply("source", config.readTransform())
.apply("enrichment with redis", ParDo.of(new RedisEnrichFn()))
.apply("sink", config.writeTransform());
また、本記事の例ではデータストアとしてローカルで起動したRedisを使用しています。また、JavaのRedisクライアントとしてはLettuceを使用しています。(バージョンが古いのは他で使っていたpom.xmlからコピペしただけで他意はありません。)
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.3.4.RELEASE</version>
</dependency>
Redisには、事前にredis-cliで以下の1レコードだけ登録してあります。
127.0.0.1:6379> set test:ping pong
OK
127.0.0.1:6379> get test:ping
"pong"
データストアに同期アクセス
はじめに、ドキュメント記載の4.2.1.4. DoFn lifecycleを参考にしつつ、setUp
メソッドとtearDown
メソッドでそれぞれRedisクライアントの生成・削除を行い、processElement
メソッドでデータ取得を行うようにしました。
後述の非同期版のコードと合わせてRedisAsyncCommands
を使用しているものの、これでは結局future.get()
の部分がブロッキングになるので、Redisアクセスは同期的になると思われます。
public class RedisEnrichFn extends DoFn<String, String> {
private RedisClient redisClient;
private StatefulRedisConnection<String, String> connection;
private RedisAsyncCommands<String, String> commands;
@Setup
public void setup() {
System.out.println("setUp");
System.out.println("createResource");
this.redisClient = RedisClient.create("redis://password@localhost:6379/0");
this.connection = redisClient.connect();
this.commands = connection.async();
}
@StartBundle
public void startBundle(DoFn<String, String>.StartBundleContext startBundleContext) {
System.out.println("startBundle");
}
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("processElement," + c.element());
String input = c.element();
RedisFuture<String> future = this.commands.get("test:" + input);
try {
String value = future.get();
if (StringUtils.isNotEmpty(value)) {
String result = input + ":" + value;
System.out.println("output," + result);
c.output(result);
} else {
String result = input + ":null";
System.out.println("output," + result);
c.output(input);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
@FinishBundle
public void finishBundle(DoFn<String, String>.FinishBundleContext finishBundleContext) {
System.out.println("finishBundleStart");
System.out.println("finishBundleEnd");
}
@Teardown
public void tearDown(){
System.out.println("tearDown");
if (this.connection != null) {
this.connection.close();
}
if (this.redisClient != null) {
this.redisClient.shutdown();
}
}
}
KDSを入出力として上記コードで実行し、イベントを10件流入させてみました。
$ mvn compile exec:java
-Dexec.mainClass=<パッケージ名>.BeamEnrichJob
-Dexec.cleanupDaemonThreads=false
-Dexec.args=" \
--ioType=kinesis \
--inputStreamName=<入力ストリーム名> \
--outputStreamName=<出力ストリーム名> \
--awsRegion=<KDSのリージョン> \
--targetParallelism=1 \
--runner=DirectRunner"
~略~
setUp
createResource
startBundle
processElement,telnet
output,telnet:null
processElement,ping
output,ping:pong
processElement,ping
output,ping:pong
processElement,ftp
output,ftp:null
processElement,telnet
output,telnet:null
processElement,ftp
output,ftp:null
processElement,ftp
output,ftp:null
processElement,telnet
output,telnet:null
processElement,telnet
output,telnet:null
processElement,ping
output,ping:pong
finishBundleStart
finishBundleEnd
1つのbundle内で複数イベントを処理していますが、processElement
メソッドはイベント毎に同期的に呼ばれています。(processElement
メソッド内でRedisデータ取得と出力(c.output
)が完了した後に、次のイベントに対するprocessElement
メソッドが呼ばれています。)
データストアに非同期アクセス
次に、Redisアクセスの非同期化を簡単にできる方法がないか調べたところ、Scioを使うとよいというコメントを見つけました。
そこで、試しにScioのJavaAsyncDoFn
、BaseAsyncDoFn
、DoFnWithResource
のクラスをコピペして使用してみました。コピペしたクラスには前述の同期版に合わせてProcessElement
、StartBundle
等の各ライフサイクルフックにデバッグプリントを追加しましたが、それ以外はそのまま使用しています。
JavaAsyncDoFn
を継承して実装したクラスが以下となります。
public class AsyncRedisEnrichFn extends JavaAsyncDoFn<String, String, RedisAsyncCommands<String, String>> {
@Override
public CompletableFuture<String> processElement(String input) {
RedisAsyncCommands<String, String> commands = getResource();
final RedisFuture<String> future = commands.get("test:" + input);
return future.thenApply(value -> {
if (StringUtils.isNotEmpty(value)) {
return input + ":" + value;
} else {
return input + ":null";
}
}).toCompletableFuture();
};
@Override
public DoFnWithResource.ResourceType getResourceType() {
return DoFnWithResource.ResourceType.PER_CLONE;
};
@Override
public RedisAsyncCommands<String, String> createResource() {
RedisClient redisClient = RedisClient.create("redis://password@localhost:6379/0");
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisAsyncCommands<String, String> commands = connection.async();
return commands;
};
}
パイプラインのRedisEnrichFn
をAsyncRedisEnrichFn
に差し替えて実行すると以下のようになりました。
$ mvn compile exec:java
-Dexec.mainClass=<パッケージ名>.BeamEnrichJob
-Dexec.cleanupDaemonThreads=false
-Dexec.args=" \
--ioType=kinesis \
--inputStreamName=<入力ストリーム名> \
--outputStreamName=<出力ストリーム名> \
--awsRegion=<KDSのリージョン> \
--targetParallelism=1 \
--runner=DirectRunner"
~略~
setUp
createResource
startBundle
processElement,telnet
processElement,telnet
processElement,telnet
processElement,ping
processElement,telnet
processElement,telnet
processElement,ping
processElement,telnet
processElement,ping
processElement,telnet
finishBundleStart
output,telnet:null
output,telnet:null
output,telnet:null
output,ping:pong
output,telnet:null
output,telnet:null
output,ping:pong
output,telnet:null
output,ping:pong
output,telnet:null
finishBundleEnd
コピペしたクラスをざっと読んだ限りの理解ですが、processElement
内では、非同期呼び出し(future生成とfutureリストへの登録)と、完了済futureからの結果取得と出力(c.output
)を行っているようです。
また、finishBundle
では全イベントのfutureの完了を待ち合わせてから、完了済futureからの結果取得と出力を行っているようです。
なお、イベント処理間の共有リソース(今回の例ではRedisクライアント)の管理をDoFnWithResource
がやっており、ResourceType
で共有範囲(PER_CLASS
,PER_INSTANCE
,PER_CLONE
)の指定もできるようです。
Scioはコピペでなくmavenリポジトリから取得でも同様に動作しました。(ScalaやJacksonのバージョンとの兼ね合いで古いバージョンを使用したので、前述のgithubの最新コードとは少し異なっていましたが、機能的はほぼ同じに見えました。)
<dependency>
<groupId>com.spotify</groupId>
<artifactId>scio-core_2.11</artifactId>
<version>0.8.4</version>
</dependency>
前述の同期版との性能比較としては、ローカルで 数十tps × 数十秒 程度の負荷での簡易検証しかできていませんが、非同期版の方が、future完了待ち合わせがある分レイテンシは多少長くなるものの、スループットは高めになる傾向となりました。
とは言え、Redisアクセス自体が数msecと低レイテンシなこともあり、大した差が出なかったので、参考程度の情報となります。すみません。
なお、ここで言っているレイテンシ・スループットは以下の定義で取得したものです。
AsyncRedisEnrichFn
のTransform単位での入出力時刻で見るとbundle単位でまとまって出力されてしまいわかりづらいので、イベント単位で非同期化の効果を見るため以下としています。
- レイテンシ: 各イベントの
processElement
呼びだし時刻 〜c.output
呼び出し時刻 の間の時間 - スループット: 一定時間あたりの
c.output
呼び出し回数
リファクタリング
Redisアクセスを抽象化してテストしやすくする
上記の非同期アクセスのコードについて、テストコードを書こうとすると、Redisクライアントとの結合度が強く、ユニットテストとしての実行が難しいため、リファクタリングを行いました。
DoFnクラスのコード
まずは、Redisクライアントの生成・実行をResourceクラスとそのFactoryクラスに切り出して移譲し、interfaceを挟んで抽象化することによりDIを可能にしました。
public class StringAsyncFn
extends JavaAsyncDoFn<String, String, IStringAsyncResource> {
protected final IStringAsyncResourceFactory resourceFactory;
// ResourceのFactoryをDIする
public StringAsyncFn(IStringAsyncResourceFactory resourceFactory) {
super();
this.resourceFactory = resourceFactory;
}
// 非同期処理の実行はResourceに移譲する
// getResourceメソッドは親クラスであるscioのDoFnWithResourceのもの
@Override
public CompletableFuture<String> processElement(String input) {
return this.getResource().get(input);
}
@Override
public ResourceType getResourceType() {
return ResourceType.PER_CLONE;
}
// Resourceの生成はFactoryに移譲する
@Override
public IStringAsyncResource createResource() {
return this.resourceFactory.createResource();
}
}
Resourceクラスは、Redisクライアントを内包し、非同期でデータ取得が可能なget
メソッドを実装します。
public interface IStringAsyncResource {
CompletableFuture<String> get(String input);
}
public class RedisEnrichStringAsyncResource implements IStringAsyncResource {
private final String clusterEndpoint;
private RedisAsyncCommands<String, String> commands;
// テスト時のRedisクライアント生成を避けるため、インスタンス生成と初期化は分ける
public RedisEnrichStringAsyncResource(String clusterEndpoint){
this.clusterEndpoint = clusterEndpoint;
}
// 初期化時にRedisクライアントを生成し、保持する
public void initialize() {
RedisClient redisClient = RedisClient.create(this.clusterEndpoint);
StatefulRedisConnection<String, String> connection = redisClient.connect();
this.commands = connection.async();
}
// 非同期でデータ取得を行い、エンリッチ後のデータを生成する
@Override
public CompletableFuture<String> get(String input) {
// 初期化忘れ時の保険
if (this.commands == null) {
initialize();
}
return this.commands.get("test:" + input)
.thenApply(value -> {
if (StringUtils.isNotEmpty(value)) {
return input + ":" + value;
} else {
return input + ":null";
}
})
.toCompletableFuture();
}
}
ResourceFactoryクラスは、DoFn内でcreateResource
メソッドが呼ばれるごとに、上記のResourceクラスを生成します。
DoFn内にfieldとして保持するため、java.io.Serializable
を実装しています。
public interface IStringAsyncResourceFactory extends Serializable {
IStringAsyncResource createResource();
}
public class RedisEnrichStringAsyncResourceFactory implements IStringAsyncResourceFactory {
private final String clusterEndpoint;
public RedisEnrichStringAsyncResourceFactory(String clusterEndpoint) {
this.clusterEndpoint = clusterEndpoint;
}
// Resourceクラスを生成してDoFnに渡す
public RedisEnrichStringAsyncResource createResource() {
RedisEnrichStringAsyncResource resource =
new RedisEnrichStringAsyncResource(this.clusterEndpoint);
resource.initialize();
return resource;
}
}
DoFnクラス利用側のコード
StringAsyncFn
クラス利用時は以下のような記述になります。
RedisEnrichStringAsyncResourceFactory resourceFactory =
new RedisEnrichStringAsyncResourceFactory("redis://localhost:6379/0");
PCollection<String> output =
input.apply(ParDo.of(new StringAsyncFn(resourceFactory)));
DoFnクラスのテストコード
StringAsyncFn
クラスのテストコードは以下となります。
ResourceクラスおよびそのFactoryクラスを、Redisアクセスを行わないテスト用クラスに差し替えることにより、ユニットテストが可能となっています。
public class StringAsyncFnTest {
static final String[] INPUT_ARRAY = new String[] {
"ping",
"hoge",
"",
"",
"fuga",
"ping",
""};
static final List<String> INPUT = Arrays.asList(INPUT_ARRAY);
static final IStringAsyncResourceFactory TEST_RESOURCE_FACTORY =
new TestResourceFactory();
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
public void testTransform() {
PCollection<String> input = p.apply(Create.of(INPUT));
// Factoryをテスト用インスタンスに差し替えて実行
PCollection<String> output =
input.apply(ParDo.of(new StringAsyncFn(TEST_RESOURCE_FACTORY)));
PAssert.that(output)
.containsInAnyOrder(
"ping:pong",
"hoge:null",
":null",
":null",
"fuga:null",
"ping:pong",
":null");
p.run();
}
// テスト用のResourceクラス
// 実際にRedisアクセスは行わずに固定値を返す
private static class TestResource implements IStringAsyncResource {
@Override
public CompletableFuture<String> get(String input) {
if (input.equals("ping")) {
return CompletableFuture.completedFuture(input + ":pong");
} else {
return CompletableFuture.completedFuture(input + ":null");
}
}
}
// テスト用のResourceクラスを生成するFactory
private static class TestResourceFactory implements IStringAsyncResourceFactory {
@Override
public TestResource createResource() {
return new TestResource();
}
}
}
Beamのテストコードについての基本的な部分は、以下の記事もご参照ください。
エンリッチ処理を汎用化する
エンリッチ前後のデータ型を容易に差し替えられるように、さらにリファクタリングを行いました。
上記ではString固定だった部分を、ジェネリクスを使用して抽象化しました。
DoFnクラスのコード(抽象クラス)
DoFnクラスはジェネリクスを使用した抽象クラスに変更しました。
public abstract class MyBaseAsyncDoFn<InputT, OutputT>
extends JavaAsyncDoFn<InputT, OutputT, IAsyncResource<InputT, OutputT>> {
private final ResourceType resourceType;
protected final IAsyncResourceFactory<InputT, OutputT> resourceFactory;
public MyBaseAsyncDoFn(IAsyncResourceFactory<InputT, OutputT> resourceFactory) {
super();
this.resourceFactory = resourceFactory;
this.resourceType = ResourceType.PER_CLONE;
}
public MyBaseAsyncDoFn(IAsyncResourceFactory<InputT, OutputT> resourceFactory,
ResourceType resourceType) {
super();
this.resourceFactory = resourceFactory;
// ResourceTypeもコンストラクタで指定可能にした
this.resourceType = resourceType;
}
@Override
public CompletableFuture<OutputT> processElement(InputT input) {
return this.getResource().get(input);
}
@Override
public ResourceType getResourceType() {
return this.resourceType;
}
@Override
public IAsyncResource<InputT, OutputT> createResource() {
return this.resourceFactory.createResource();
}
}
ResourceおよびFactoryのinterfaceも型をジェネリクスに変更しています。
public interface IAsyncResource<InputT, OutputT> {
CompletableFuture<OutputT> get(InputT input);
}
public interface IAsyncResourceFactory<InputT, OutputT> extends Serializable {
IAsyncResource<InputT, OutputT> createResource();
}
Resourceクラスは、ジェネリクスの使用に加えて、Redisからのデータ取得用のキー値生成、取得データを使用したエンリッチ後データ生成の部分を抽象メソッドとして切り出した抽象クラスを用意しました。
public abstract class MyBaseRedisEnrichAsyncResource<InputT, OutputT>
implements IAsyncResource<InputT, OutputT> {
private final String clusterEndpoint;
private RedisAsyncCommands<String, String> commands;
public MyBaseRedisEnrichAsyncResource(String clusterEndpoint){
this.clusterEndpoint = clusterEndpoint;
}
public void initialize() {
RedisClient redisClient = RedisClient.create(clusterEndpoint);
StatefulRedisConnection<String, String> connection = redisClient.connect();
commands = connection.async();
}
@Override
public CompletableFuture<OutputT> get(InputT input) {
if (commands == null) {
initialize();
}
return commands.get(createKey(input))
.thenApply(value -> enrich(input, value))
.toCompletableFuture();
}
protected abstract String createKey(InputT input);
protected abstract OutputT enrich(InputT input, String valueToAdd);
}
DoFnクラスのコード(実装クラス)
上記抽象クラスを継承した実装クラスは、エンリッチ前後の型に応じて、差分のみ実装すればよくなりました。
String -> String
前述と同じStringの場合は、以下となります。
public class StringRedisEnrichFn extends MyBaseAsyncDoFn<String, String> {
public StringRedisEnrichFn(IAsyncResourceFactory<String, String> resourceFactory) {
super(resourceFactory);
}
}
public class StringRedisEnrichResource
extends MyBaseAsyncRedisEnrichResource<String, String> {
public StringRedisEnrichResource(String clusterEndpoint) {
super(clusterEndpoint);
}
@Override
protected String createKey(String input) {
return input;
}
@Override
protected String enrich(String input, String valueToAdd) {
if (StringUtils.isNotEmpty(valueToAdd)) {
return input + ":" + valueToAdd;
} else {
return input + ":null";
}
}
}
public class StringRedisEnrichResourceFactory
implements IAsyncResourceFactory<String, String> {
private final String clusterEndpoint;
public StringRedisEnrichResourceFactory(String clusterEndpoint) {
this.clusterEndpoint = clusterEndpoint;
}
public StringRedisEnrichResource createResource() {
StringRedisEnrichResource resource =
new StringRedisEnrichResource(this.clusterEndpoint);
resource.initialize();
return resource;
}
}
SomeInput -> SomeOutput
任意のクラスのオブジェクトの場合は、例えば以下となります。
なお、SomeInput
クラスとSomeOutput
クラスもSerializable
を実装してあります。
public class ObjectRedisEnrichFn
extends MyBaseAsyncDoFn<SomeInput, SomeOutput> {
public ObjectRedisEnrichFn(IAsyncResourceFactory<SomeInput, SomeOutput> resourceFactory) {
super(resourceFactory);
}
}
public class ObjectRedisEnrichResource
extends MyBaseAsyncRedisEnrichResource<SomeInput, SomeOutput> {
public ObjectRedisEnrichResource(String clusterEndpoint) {
super(clusterEndpoint);
}
@Override
protected String createKey(SomeInput input) {
return input.someInputField;
}
@Override
protected SomeOutput enrich(SomeInput input, String valueToAdd) {
if (StringUtils.isNotEmpty(valueToAdd)) {
return input.enrichWith(valueToAdd);
} else {
return input.enrichWith("null");
}
}
}
public class ObjectRedisEnrichResourceFactory
implements IAsyncResourceFactory<SomeInput, SomeOutput> {
private final String clusterEndpoint;
public ObjectRedisEnrichResourceFactory(String clusterEndpoint) {
this.clusterEndpoint = clusterEndpoint;
}
public ObjectRedisEnrichResource createResource() {
ObjectRedisEnrichResource resource =
new ObjectRedisEnrichResource(this.clusterEndpoint);
resource.initialize();
return resource;
}
}
Resourceクラスのテストコード
DoFnクラスのテストコードは前述のものと同様となります。
Resourceクラスは、本リファクタリングで抽象クラスを切り出す前は、Redisクライアントの扱いを含むためユニットテストが難しかったですが、上記の抽象クラスを個別実装したクラスについては、業務依存の内容になると思われるcreateKey
メソッドとenrich
メソッドを個別にテストしやすくなりました。
public class StringRedisEnrichResourceTest {
StringRedisEnrichResource target;
@Before
public void setUp() {
// インスタンス生成時はまだ内部でRedisクライアント生成しないので
// ユニットテスト時でもエラーにならない.
// (Redisクライアント生成はinitializeメソッド実行時に行われる)
target = new StringRedisEnrichResource("dummy");
}
@After
public void tearDown() {
target = null;
}
@Test
public void testCreateKey() {
assertEquals("ping", target.createKey("ping"));
}
@Test
public void testEnrich() {
assertEquals("ping:pong", target.enrich("ping", "pong"));
}
@Test
public void testEnrichEmpty() {
assertEquals("ping:null", target.enrich("ping", ""));
}
@Test
public void testEnrichNull() {
assertEquals("ping:null", target.enrich("ping", null));
}
@Test
public void testEnrichEmptyInput() {
assertEquals(":null", target.enrich("", null));
}
}
まとめ
DoFn内で都度データストアからデータ取得する方法で、ストリームイベントのエンリッチ処理を実装してみました。また、Scioを使用して、データストアアクセスを非同期化しました。
ただし、1つのbundle内の複数イベントは非同期的に扱っているものの、bundle単位で見ると同期的に1bunndleずつ順次処理しているようなので、完全に非同期とも言いづらい状態と思われます。
冒頭の参考記事で紹介されていたIO.ReadAllやSide Inputsを含め、他により良い実装方法がないかは引き続き確認が必要と考えています。
2021/11/23追記: 非同期アクセスのコードに対してリファクタリングを行い、テスト容易性と汎用性を高めたサンプルコードを作成しました。