Spark Streaming、Kafka Streamsに続いてストリーム処理フレームワークのApache Flinkを試します。Spark StreamingはPython、Kafka StreamsはJavaで書いたのでApache FlinkはScalaで書いてみようと思います。
Apache FlinkもKafkaと同様にScalaで書かれています。Scalaに特徴的な後方互換性を重視せずアグレッシブな開発をしています。そのためネットで検索できる情報もどんどん古くなりAPIもDeprecatedやPublicEvolvingになりがちで初学者には少し入りづらい状況です。なかなか学習用の良い記事が見つかりませんでしたが、センサーデータのウィンドウ集計の書き方はTHE RISE OF BIG DATA STREAMINGを参考にさせていただきました。
プロジェクトテンプレート
Apache FlinkのプロジェクトはA flink project template using Scala and SBTをテンプレートにすると便利です。最初にテンプレートのWordCountを例に使い方を確認します。テンプレートをcloneします。
$ cd ~/scala_apps
$ git clone https://github.com/tillrohrmann/flink-project.git
いくつか例がありますがここではWordCount.scalaを使います。
$ tree flink-project
flink-project/
├── build.sbt
├── idea.sbt
├── project
│ ├── assembly.sbt
│ └── build.properties
├── README
└── src
└── main
├── resources
│ └── log4j.properties
└── scala
└── org
└── example
├── Job.scala
├── SocketTextStreamWordCount.scala
└── WordCount.scala
ENSIMEを使う場合はこちらを参考にしてプロジェクトに.ensime
ファイルを作成してEmacsからM-x ensime
してください。
$ cd ~/scala_apps/flink-project
$ sbt
> ensimeConfig
WordCount.scalaのコードです。例のテキストに含まれる英単語をカウントします。
package org.example
import org.apache.flink.api.scala._
object WordCount {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements("To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
val counts = text.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }
.groupBy(0)
.sum(1)
counts.print()
}
}
プロジェクトのディレクトリからsbtのrunコマンドを実行します。mainメソッドを実装したクラスがいくつかあるのでWordCountの3
を入力します。
$ cd ~/scala_apps/flink-project
$ sbt
> run
Multiple main classes detected, select one to run:
[1] org.example.Job
[2] org.example.SocketTextStreamWordCount
[3] org.example.WordCount
Enter number:3
実行すると以下のようにテキストに含まれる英単語を数えて出力します。
(a,1)
(fortune,1)
(in,1)
(mind,1)
(or,2)
(question,1)
(slings,1)
(suffer,1)
(take,1)
(that,1)
(to,4)
テキストデータはExecutionEnvironmentからfromElementsして作成したDataSourceです。
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements("To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
Apach FlinkのScalaではコードを短く書ける反面でアンダースコアやmap
やgroupBy
に登場する1や0が何を指しているのかわかりにくいことがあります。Apache FlinkのTupleはfieldで指定する場合はzero indexed
なので順番に0, 1
となります。
val counts = text.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }
.groupBy(0)
.sum(1)
テキストデータをflatMap
で正規表現を使い単語に区切りDataSetを作成します。map()で単語(_
)と数(1
)でTupleのDataSetを作成します。groupBy()はfieldの0
を指定して単語でグループ化したGroupedDataSetを作成します。最後にsum()の引数にfieldの1
を指定して単語と単語の合計数をTupleにしたAggregateDataSetを作成します。
ウィンドウ集計
このテンプレートプロジェクトを使いセンサーデータを60秒のタンブリングウィンドウで集計して周囲温度(ambient)の平均値を計算するプログラムを書きます。KafkaをSourceにするのでこちらを参考にRaspberry Pi 3からSensorTagのデータをKafkaに送信します。
Raspberry Pi 3 -> Source (Kafka) -> ストリーム処理 -> Sink (Kafka)
テンプレートプロジェクトをcloneした後に既存のファイルを削除します。
$ cd ~/scala_apps
$ git clone https://github.com/tillrohrmann/flink-project.git streams-flink-scala-examples
$ cd streams-flink-scala-examples
$ rm -fr src/main/scala/org/
Scalaのパッケージディレクトリを作成します。
$ mkdir -p src/main/scala/com/github/masato/streams/flink
build.sbt
Kafkaはこちらと同様にlandoop/fast-data-devのDockerイメージを利用します。バージョンは0.10.2.1
です。Kafka 0.10に対応したパッケージを追加します。
val flinkVersion = "1.3.2"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion)
val otherDependencies = Seq(
"com.typesafe" % "config" % "1.3.1"
)
lazy val root = (project in file(".")).
settings(
libraryDependencies ++= flinkDependencies,
libraryDependencies ++= otherDependencies
)
mainClass in assembly := Some("com.github.masato.streams.flink.App")
App.scala
メインメソッドを実装したプログラムの全文です。Kafkaへの接続情報などはconfigを使い設定ファイルに定義します。ソースコードはリポジトリにもあります。
package com.github.masato.streams.flink
import java.util.Properties
import java.time.ZoneId;
import java.time.format.DateTimeFormatter
import java.util.Date
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010,FlinkKafkaProducer010}
import org.apache.flink.streaming.util.serialization.{JSONDeserializationSchema,SimpleStringSchema}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.util.Collector
import com.fasterxml.jackson.databind.node.ObjectNode
import scala.util.parsing.json.JSONObject
import com.typesafe.config.ConfigFactory
case class Accumulator(time: Long, bid: String, var sum: Double, var count: Int)
class Aggregate extends AggregateFunction[(String, Double), Accumulator,Accumulator] {
override def createAccumulator(): Accumulator = {
return Accumulator(0L, "", 0.0, 0)
}
override def merge(a: Accumulator, b: Accumulator): Accumulator = {
a.sum += b.sum
a.count += b.count
return a
}
override def add(value: (String, Double), acc: Accumulator): Unit = {
acc.sum += value._2
acc.count += 1
}
override def getResult(acc: Accumulator): Accumulator = {
return acc
}
}
object App {
val fmt = DateTimeFormatter.ISO_OFFSET_DATE_TIME
val conf = ConfigFactory.load()
val bootstrapServers = conf.getString("app.bootstrap-servers")
val groupId = conf.getString("app.group-id")
val sourceTopic = conf.getString("app.source-topic")
val sinkTopic = conf.getString("app.sink-topic")
def main(args: Array[String]): Unit = {
val props = new Properties()
props.setProperty("bootstrap.servers", bootstrapServers)
props.setProperty("group.id", groupId)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val source = new FlinkKafkaConsumer010[ObjectNode](
sourceTopic, new JSONDeserializationSchema(), props)
val events = env.addSource(source).name("events")
val timestamped = events.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[ObjectNode](Time.seconds(10)) {
override def extractTimestamp(element: ObjectNode): Long = element.get("time").asLong * 1000
})
timestamped
.map { v =>
val key = v.get("bid").asText
val ambient = v.get("ambient").asDouble
(key, ambient)
}
.keyBy(v => v._1)
.timeWindow(Time.seconds(60))
.aggregate(new Aggregate(),
( key: String,
window: TimeWindow,
input: Iterable[Accumulator],
out: Collector[Accumulator] ) => {
var in = input.iterator.next()
out.collect(Accumulator(window.getEnd, key, in.sum/in.count, in.count))
}
)
.map { v =>
val zdt = new Date(v.time).toInstant().atZone(ZoneId.systemDefault())
val time = fmt.format(zdt)
val json = Map("time" -> time, "bid" -> v.bid, "ambient" -> v.sum)
val retval = JSONObject(json).toString()
println(retval)
retval
}
.addSink(new FlinkKafkaProducer010[String](
bootstrapServers,
sinkTopic,
new SimpleStringSchema)
).name("kafka")
env.execute()
}
}
main()の処理を順番にみていきます。最初にKafkaに接続する設定を行います。接続するKafkaが0.10のためFlinkKafkaConsumer010を使います。Raspberry Pi 3から届くセンサーデータは以下のようなJSONフォーマットです。
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1503527847, 'humidity': 26.55792236328125, 'objecttemp': 22.3125, 'ambient': 26.375, 'rh': 76.983642578125}
JSONDeserializationSchemaでデシリアライズします。
val props = new Properties()
props.setProperty("bootstrap.servers", bootstrapServers)
props.setProperty("group.id", groupId)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val source = new FlinkKafkaConsumer010[ObjectNode](
sourceTopic, new JSONDeserializationSchema(), props)
Apache Flinkの時間モデルはイベント時間 (TimeCharacteristic.EventTime)に設定しています。センサーデータのtime
フィールドをタイムスタンプとウォーターマークに使います。
val events = env.addSource(source).name("events")
val timestamped = events.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[ObjectNode](Time.seconds(10)) {
override def extractTimestamp(element: ObjectNode): Long = element.get("time").asLong * 1000
})
センサーからはいくつかのデータが取得できていますがここでは周囲温度(ambient)の値だけ利用します。map()でSensorTagのBDアドレスをキーにして新しいTupleを作成します。
timestamped
.map { v =>
val key = v.get("bid").asText
val ambient = v.get("ambient").asDouble
(key, ambient)
}
DataStreamのkeyBy()にTupleのインデックス1
を指定してBDアドレスをキーにしたKeyedStreamを作成します。
.keyBy(v => v._1)
timeWindow()で60秒に設定したタンブリングウィンドウのWindowedStreamを作成します。
.timeWindow(Time.seconds(60))
バージョン1.3ではapply()はDeprecatedになっています。以前は以下のように書けました。
.apply(
(0L, "", 0.0, 0),
(acc: (Long, String, Double, Int),
v: (String, Double)) => { (0L, v._1, acc._3 + v._2, acc._4 + 1) },
( window: TimeWindow,
counts: Iterable[(Long, String, Double, Int)],
out: Collector[(Long, String, Double, Int)] ) => {
var count = counts.iterator.next()
out.collect((window.getEnd, count._2, count._3/count._4, count._4))
}
)
さらにfold()もDeprecatedなので推奨されているaggregate()を使ってみます。Aggregate
はAggregateFunctionを実装しています。apply()
の例のようにTupleを使っても良いですがcaseクラスにすると少し読みやすくなります。
.aggregate(new Aggregate(),
( key: String,
window: TimeWindow,
input: Iterable[Accumulator],
out: Collector[Accumulator] ) => {
var in = input.iterator.next()
out.collect(Accumulator(window.getEnd, key, in.sum/in.count, in.count))
}
)
外部システムと連携しやすいようにデータストリームはUNIXタイムはタイムゾーンをつけたISO-8601にフォーマットしたJSON文字列にmap()します。ここではデバッグ用にJSON文字列は標準出力しています。
.map { v =>
val zdt = new Date(v.time).toInstant().atZone(ZoneId.systemDefault())
val time = fmt.format(zdt)
val json = Map("time" -> time, "bid" -> v.bid, "ambient" -> v.sum)
val retval = JSONObject(json).toString()
println(retval)
retval
}
最後にデータストリームをKafkaにSinkします。name("kafka")
のようにSinkに名前をつけると実行時のログに表示されます。
.addSink(new FlinkKafkaProducer010[String](
bootstrapServers,
sinkTopic,
new SimpleStringSchema)
).name("kafka")
env.execute()
sbtのrun
プロジェクトに移動してsbtのrunコマンドを実行します。
$ cd ~/scala_apps/streams-flink-scala-examples
$ sbt
> run
周囲温度(ambient)を60秒のタンブリングウィンドウで集計した平均値が標準出力されました。
{"time" : "2017-08-24T08:10:00+09:00", "bid" : "B0:B4:48:BE:5E:00", "ambient" : 26.203125}
{"time" : "2017-08-24T08:11:00+09:00", "bid" : "B0:B4:48:BE:5E:00", "ambient" : 26.234375}
{"time" : "2017-08-24T08:12:00+09:00", "bid" : "B0:B4:48:BE:5E:00", "ambient" : 26.26875}