Scala
Kafka
Flink

Apache FlinkとScalaでセンサーデータをウィンドウ集計をする

 Spark StreamingKafka Streamsに続いてストリーム処理フレームワークのApache Flinkを試します。Spark StreamingはPython、Kafka StreamsはJavaで書いたのでApache FlinkはScalaで書いてみようと思います。

 Apache FlinkもKafkaと同様にScalaで書かれています。Scalaに特徴的な後方互換性を重視せずアグレッシブな開発をしています。そのためネットで検索できる情報もどんどん古くなりAPIもDeprecatedやPublicEvolvingになりがちで初学者には少し入りづらい状況です。なかなか学習用の良い記事が見つかりませんでしたが、センサーデータのウィンドウ集計の書き方はTHE RISE OF BIG DATA STREAMINGを参考にさせていただきました。

プロジェクトテンプレート

 Apache FlinkのプロジェクトはA flink project template using Scala and SBTをテンプレートにすると便利です。最初にテンプレートのWordCountを例に使い方を確認します。テンプレートをcloneします。

$ cd ~/scala_apps
$ git clone https://github.com/tillrohrmann/flink-project.git

 いくつか例がありますがここではWordCount.scalaを使います。

$ tree flink-project
flink-project/
├── build.sbt
├── idea.sbt
├── project
│   ├── assembly.sbt
│   └── build.properties
├── README
└── src
    └── main
        ├── resources
        │   └── log4j.properties
        └── scala
            └── org
                └── example
                    ├── Job.scala
                    ├── SocketTextStreamWordCount.scala
                    └── WordCount.scala

 ENSIMEを使う場合はこちらを参考にしてプロジェクトに.ensimeファイルを作成してEmacsからM-x ensimeしてください。

$ cd ~/scala_apps/flink-project
$ sbt
> ensimeConfig

 WordCount.scalaのコードです。例のテキストに含まれる英単語をカウントします。

WordCount.scala
package org.example
import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val text = env.fromElements("To be, or not to be,--that is the question:--",
      "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
      "Or to take arms against a sea of troubles,")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.print()

  }
}

 プロジェクトのディレクトリからsbtのrunコマンドを実行します。mainメソッドを実装したクラスがいくつかあるのでWordCountの3を入力します。

$ cd ~/scala_apps/flink-project
$ sbt
> run
Multiple main classes detected, select one to run:

 [1] org.example.Job
 [2] org.example.SocketTextStreamWordCount
 [3] org.example.WordCount

Enter number:3

 実行すると以下のようにテキストに含まれる英単語を数えて出力します。

 (a,1)
 (fortune,1)
 (in,1)
 (mind,1)
 (or,2)
 (question,1)
 (slings,1)
 (suffer,1)
 (take,1)
 (that,1)
 (to,4)

 テキストデータはExecutionEnvironmentからfromElementsして作成したDataSourceです。

    val env = ExecutionEnvironment.getExecutionEnvironment

    val text = env.fromElements("To be, or not to be,--that is the question:--",
      "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
      "Or to take arms against a sea of troubles,")

 Apach FlinkのScalaではコードを短く書ける反面でアンダースコアやmapgroupByに登場する1や0が何を指しているのかわかりにくいことがあります。Apache FlinkのTupleはfieldで指定する場合はzero indexedなので順番に0, 1となります。

    val counts = text.flatMap { _.toLowerCase.split("\\W+") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

 テキストデータをflatMapで正規表現を使い単語に区切りDataSetを作成します。map()で単語(_)と数(1)でTupleのDataSetを作成します。groupBy()はfieldの0を指定して単語でグループ化したGroupedDataSetを作成します。最後にsum()の引数にfieldの1を指定して単語と単語の合計数をTupleにしたAggregateDataSetを作成します。

ウィンドウ集計

 このテンプレートプロジェクトを使いセンサーデータを60秒のタンブリングウィンドウで集計して周囲温度(ambient)の平均値を計算するプログラムを書きます。KafkaをSourceにするのでこちらを参考にRaspberry Pi 3からSensorTagのデータをKafkaに送信します。

Raspberry Pi 3 -> Source (Kafka) -> ストリーム処理 -> Sink (Kafka)

 テンプレートプロジェクトをcloneした後に既存のファイルを削除します。

$ cd ~/scala_apps
$ git clone https://github.com/tillrohrmann/flink-project.git streams-flink-scala-examples
$ cd streams-flink-scala-examples
$ rm -fr src/main/scala/org/

 Scalaのパッケージディレクトリを作成します。

$ mkdir -p src/main/scala/com/github/masato/streams/flink

build.sbt

 Kafkaはこちらと同様にlandoop/fast-data-devのDockerイメージを利用します。バージョンは0.10.2.1です。Kafka 0.10に対応したパッケージを追加します。

build.sbt
val flinkVersion = "1.3.2"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion)

val otherDependencies = Seq(
   "com.typesafe" % "config" % "1.3.1"
)

lazy val root = (project in file(".")).
  settings(
    libraryDependencies ++= flinkDependencies,
    libraryDependencies ++= otherDependencies
  )

mainClass in assembly := Some("com.github.masato.streams.flink.App")

App.scala

 メインメソッドを実装したプログラムの全文です。Kafkaへの接続情報などはconfigを使い設定ファイルに定義します。ソースコードはリポジトリにもあります。

App.scala
package com.github.masato.streams.flink

import java.util.Properties
import java.time.ZoneId;
import java.time.format.DateTimeFormatter
import java.util.Date

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010,FlinkKafkaProducer010}
import org.apache.flink.streaming.util.serialization.{JSONDeserializationSchema,SimpleStringSchema}
import org.apache.flink.api.common.functions.AggregateFunction

import org.apache.flink.util.Collector
import com.fasterxml.jackson.databind.node.ObjectNode
import scala.util.parsing.json.JSONObject
import com.typesafe.config.ConfigFactory

case class Accumulator(time: Long, bid: String, var sum: Double, var count: Int)

class Aggregate extends AggregateFunction[(String, Double), Accumulator,Accumulator] {

  override def createAccumulator(): Accumulator = {
    return Accumulator(0L, "", 0.0, 0)
  }

  override def merge(a: Accumulator, b: Accumulator): Accumulator = {
    a.sum += b.sum
    a.count += b.count
    return a
  }

  override def add(value: (String, Double), acc: Accumulator): Unit = {
    acc.sum += value._2
    acc.count += 1
  }

  override def getResult(acc: Accumulator): Accumulator = {
    return acc
  }
}

object App {
  val fmt = DateTimeFormatter.ISO_OFFSET_DATE_TIME

  val conf = ConfigFactory.load()
  val bootstrapServers = conf.getString("app.bootstrap-servers")
  val groupId = conf.getString("app.group-id")
  val sourceTopic = conf.getString("app.source-topic")
  val sinkTopic = conf.getString("app.sink-topic")

  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.setProperty("bootstrap.servers", bootstrapServers)
    props.setProperty("group.id", groupId)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val source = new FlinkKafkaConsumer010[ObjectNode](
      sourceTopic, new JSONDeserializationSchema(), props)

    val events = env.addSource(source).name("events")

    val timestamped = events.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[ObjectNode](Time.seconds(10)) {
        override def extractTimestamp(element: ObjectNode): Long = element.get("time").asLong * 1000
      })

    timestamped
      .map { v =>
        val key =  v.get("bid").asText
        val ambient = v.get("ambient").asDouble
        (key, ambient)
      }
      .keyBy(v => v._1)
      .timeWindow(Time.seconds(60))
      .aggregate(new Aggregate(),
        ( key: String,
          window: TimeWindow,
          input: Iterable[Accumulator],
          out: Collector[Accumulator] ) => {
            var in = input.iterator.next()
            out.collect(Accumulator(window.getEnd, key, in.sum/in.count, in.count))
          }
      )
      .map { v =>
        val zdt = new Date(v.time).toInstant().atZone(ZoneId.systemDefault())
        val time = fmt.format(zdt)
        val json = Map("time" -> time, "bid" -> v.bid, "ambient" -> v.sum)
        val retval = JSONObject(json).toString()
        println(retval)
        retval
      }
      .addSink(new FlinkKafkaProducer010[String](
        bootstrapServers,
        sinkTopic,
        new SimpleStringSchema)
      ).name("kafka")
    env.execute()
  }
}

 main()の処理を順番にみていきます。最初にKafkaに接続する設定を行います。接続するKafkaが0.10のためFlinkKafkaConsumer010を使います。Raspberry Pi 3から届くセンサーデータは以下のようなJSONフォーマットです。

{'bid': 'B0:B4:48:BE:5E:00', 'time': 1503527847, 'humidity': 26.55792236328125, 'objecttemp': 22.3125, 'ambient': 26.375, 'rh': 76.983642578125}

 JSONDeserializationSchemaでデシリアライズします。

    val props = new Properties()
    props.setProperty("bootstrap.servers", bootstrapServers)
    props.setProperty("group.id", groupId)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val source = new FlinkKafkaConsumer010[ObjectNode](
      sourceTopic, new JSONDeserializationSchema(), props)

 Apache Flinkの時間モデルはイベント時間 (TimeCharacteristic.EventTime)に設定しています。センサーデータのtimeフィールドをタイムスタンプとウォーターマークに使います。

    val events = env.addSource(source).name("events")

    val timestamped = events.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[ObjectNode](Time.seconds(10)) {
        override def extractTimestamp(element: ObjectNode): Long = element.get("time").asLong * 1000
      })

 センサーからはいくつかのデータが取得できていますがここでは周囲温度(ambient)の値だけ利用します。map()でSensorTagのBDアドレスをキーにして新しいTupleを作成します。

    timestamped
      .map { v =>
        val key =  v.get("bid").asText
        val ambient = v.get("ambient").asDouble
        (key, ambient)
      }

 DataStreamkeyBy()にTupleのインデックス1を指定してBDアドレスをキーにしたKeyedStreamを作成します。

      .keyBy(v => v._1)

 timeWindow()で60秒に設定したタンブリングウィンドウのWindowedStreamを作成します。

      .timeWindow(Time.seconds(60))

 バージョン1.3ではapply()はDeprecatedになっています。以前は以下のように書けました。

      .apply(
        (0L, "", 0.0, 0),
        (acc: (Long, String, Double, Int),
         v: (String, Double)) => { (0L, v._1, acc._3 + v._2, acc._4 + 1) },
        ( window: TimeWindow,
          counts: Iterable[(Long, String, Double, Int)],
          out: Collector[(Long, String, Double, Int)] ) => {
            var count = counts.iterator.next()
            out.collect((window.getEnd, count._2, count._3/count._4, count._4))
          }
      )

 さらにfold()もDeprecatedなので推奨されているaggregate()を使ってみます。AggregateAggregateFunctionを実装しています。apply()の例のようにTupleを使っても良いですがcaseクラスにすると少し読みやすくなります。

      .aggregate(new Aggregate(),
        ( key: String,
          window: TimeWindow,
          input: Iterable[Accumulator],
          out: Collector[Accumulator] ) => {
            var in = input.iterator.next()
            out.collect(Accumulator(window.getEnd, key, in.sum/in.count, in.count))
          }
      )

 外部システムと連携しやすいようにデータストリームはUNIXタイムはタイムゾーンをつけたISO-8601にフォーマットしたJSON文字列にmap()します。ここではデバッグ用にJSON文字列は標準出力しています。

      .map { v =>
        val zdt = new Date(v.time).toInstant().atZone(ZoneId.systemDefault())
        val time = fmt.format(zdt)
        val json = Map("time" -> time, "bid" -> v.bid, "ambient" -> v.sum)
        val retval = JSONObject(json).toString()
        println(retval)
        retval
      }

 最後にデータストリームをKafkaにSinkします。name("kafka")のようにSinkに名前をつけると実行時のログに表示されます。

      .addSink(new FlinkKafkaProducer010[String](
        bootstrapServers,
        sinkTopic,
        new SimpleStringSchema)
      ).name("kafka")
    env.execute()

sbtのrun

 プロジェクトに移動してsbtのrunコマンドを実行します。

$ cd ~/scala_apps/streams-flink-scala-examples
$ sbt
> run

 周囲温度(ambient)を60秒のタンブリングウィンドウで集計した平均値が標準出力されました。

{"time" : "2017-08-24T08:10:00+09:00", "bid" : "B0:B4:48:BE:5E:00", "ambient" : 26.203125}
{"time" : "2017-08-24T08:11:00+09:00", "bid" : "B0:B4:48:BE:5E:00", "ambient" : 26.234375}
{"time" : "2017-08-24T08:12:00+09:00", "bid" : "B0:B4:48:BE:5E:00", "ambient" : 26.26875}