KafkaDirectStream生成する際にはコンシュームするtopicの情報を渡すと思いますが、
トピック名には正規表現を設定することができます。
コード例(KafkaDirectStream)
log20181201
やlog20181202
などのトピックが存在する場合は下記のように設定します。
KafkaUtils.createDirectStream(
sparkStreamingContext,
PreferConsistent,
SubscribePattern[String, String]("log.+".r.pattern, kafkaParams))
注意点
順序が重要な処理では使えない
上記の例でlog20181201
とlog20181202
に同時にデータが入った場合topicを区別せずにコンシュームするためtopic間で順序を考慮しなければならない場合はこの方法は使用することが出来ません。
topicを削除するとオフセットのコミットで落ちる
KafkaDirectStreamでは取得済みオフセットの情報をKafkaにコミットしているのですが、その際には全く動きのなかったtopicの情報もコミットしています。
そのため、使用していない昔のtopicでも削除を行うと正規表現の対象になっているとコミット不能になり落ちる場合があります。
上記の様な問題に当たる場合Kafkaのauto.commitパラメーターをfalseに設定して
下記のようにオフセットに変化のあったもののみを明示的にコミットすると良いです。
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val commitOffsetRange = offsetRanges.filterNot(x => x.untilOffset == x.fromOffset)
kafkaDirectStream.asInstanceOf[CanCommitOffsets].commitAsync(commitOffsetRange)