LoginSignup
0
0

More than 3 years have passed since last update.

PubSub の SubscriptionとNestedValueProviderの挙動

Last updated at Posted at 2019-10-09

template作ってるときにぶち当たる。

Pipeline pipeline = Pipeline.create(options);
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);

String[] targets = options.getTargets().split(",",0);
for (String target: targets){
    ValueProvider<String> subscription = ValueProvider.NestedValueProvider.of(
               options.getInputSubscriptionPrefix(),
               (SerializableFunction<String, String>) input -> input + "-" + target);
    PCollection<PubsubMessage> messages =
                pipeline.apply(
                        "ReadPubSubSubscription" + targetAction,
                        PubsubIO.readMessagesWithAttributes()
                                .fromSubscription(subscription));
    ...
}

runtimeにOptionの値をsplitして動的にグラフ作るみたいなことは出来ないので、templateのコンパイル時にTargetsは渡している。

runtimeValueProviderのinputSubscriptionPrefixに"projects/hoge/subscription/fuga-piyp"みたいに食わせると、以下のエラー。


Workflow failed. Causes: The pubsub configuration contains errors: Subscription 'projects/hoge/subscriptions/fuga-piyo' is consumed by multiple stages, this will result in undefined behavior.

job graphを作るときに変なvalidationが動いている様子。SerializableFunctionでsuffixつけたものが渡されるはずなんだけど、executionの前ステージで何かが起きている。

beam sdkのPubSubIO周りのコード見てもわからんのでrunnerから呼ばれるgraph constructionのタイミングでvalidationしてるんだろうな。fromSubscriptionのタイミングでNestedValueProviderの値ではなく元のValueProviderの値でチェックしているとしか思えない。

bqに突っ込むだけなので一個のtopicでDynamicDestinations使えばよいという話はある。jsonのフィールドで振り分けができるようにはしているけど、流量に差があるのでtopic分けときたいんだよなぁ。

まぁ動かすだけならtemplateじゃなくすれば動くんだけど。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0