template作ってるときにぶち当たる。
Pipeline pipeline = Pipeline.create(options);
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
String[] targets = options.getTargets().split(",",0);
for (String target: targets){
ValueProvider<String> subscription = ValueProvider.NestedValueProvider.of(
options.getInputSubscriptionPrefix(),
(SerializableFunction<String, String>) input -> input + "-" + target);
PCollection<PubsubMessage> messages =
pipeline.apply(
"ReadPubSubSubscription" + targetAction,
PubsubIO.readMessagesWithAttributes()
.fromSubscription(subscription));
...
}
runtimeにOptionの値をsplitして動的にグラフ作るみたいなことは出来ないので、templateのコンパイル時にTargetsは渡している。
runtimeValueProviderのinputSubscriptionPrefixに"projects/hoge/subscription/fuga-piyp"みたいに食わせると、以下のエラー。
Workflow failed. Causes: The pubsub configuration contains errors: Subscription 'projects/hoge/subscriptions/fuga-piyo' is consumed by multiple stages, this will result in undefined behavior.
job graphを作るときに変なvalidationが動いている様子。SerializableFunctionでsuffixつけたものが渡されるはずなんだけど、executionの前ステージで何かが起きている。
beam sdkのPubSubIO周りのコード見てもわからんのでrunnerから呼ばれるgraph constructionのタイミングでvalidationしてるんだろうな。fromSubscriptionのタイミングでNestedValueProviderの値ではなく元のValueProviderの値でチェックしているとしか思えない。
bqに突っ込むだけなので一個のtopicでDynamicDestinations使えばよいという話はある。jsonのフィールドで振り分けができるようにはしているけど、流量に差があるのでtopic分けときたいんだよなぁ。
まぁ動かすだけならtemplateじゃなくすれば動くんだけど。