LoginSignup
4
0

More than 5 years have passed since last update.

KafkaDirectStreamで正規表現を使う方法と注意点

Last updated at Posted at 2018-12-28

KafkaDirectStream生成する際にはコンシュームするtopicの情報を渡すと思いますが、
トピック名には正規表現を設定することができます。

コード例(KafkaDirectStream)

log20181201log20181202などのトピックが存在する場合は下記のように設定します。

KafkaUtils.createDirectStream(
      sparkStreamingContext,
      PreferConsistent,
      SubscribePattern[String, String]("log.+".r.pattern, kafkaParams))

注意点

順序が重要な処理では使えない

上記の例でlog20181201log20181202に同時にデータが入った場合topicを区別せずにコンシュームするためtopic間で順序を考慮しなければならない場合はこの方法は使用することが出来ません。

topicを削除するとオフセットのコミットで落ちる

KafkaDirectStreamでは取得済みオフセットの情報をKafkaにコミットしているのですが、その際には全く動きのなかったtopicの情報もコミットしています。
そのため、使用していない昔のtopicでも削除を行うと正規表現の対象になっているとコミット不能になり落ちる場合があります。

上記の様な問題に当たる場合Kafkaのauto.commitパラメーターをfalseに設定して
下記のようにオフセットに変化のあったもののみを明示的にコミットすると良いです。

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val commitOffsetRange = offsetRanges.filterNot(x => x.untilOffset == x.fromOffset)
kafkaDirectStream.asInstanceOf[CanCommitOffsets].commitAsync(commitOffsetRange)
4
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
0