Edited at

ScalaでKinesisのConsumerを作るチュートリアル


概要

ストリームを作り、Scalaで内容をtailするところまでを。書いているとおりに実行すれば動くことを目標にしてます。

Kinesisはざっくり言うと大規模OK Pub/Subメッセージキューという理解でいいと思います。

ストリームがPub/Subキューみたいなもので、レコードは24時間保持されます(現在は24時間以上設定で変更可能)。シャードをいくつか束ねて構成されていて、シャードを増やすことでPut/GetのQPSを増やすことが出来ます。パーティションキーによりどのシャードに入るか決まります。なのでホットシャードが起きないパーティションキーとなるように設計する必要があります。迷ったら乱数を入れるといいと思います。

偏りができて、あるシャードだけが忙しい、みたいな状況になるとシャードを増やしてもパフォーマンスを向上させることができなくなります。


Put後にシャード振り分ける仕組み

レコードのPutはストリームに対して行います。Putしたとき、パーティションキーはmd5でhash化され10進数化されます。シャードはパーティションキーのRangeを持っていて、Rangeがにあうシャードにレコードが格納されます。

Recordはデータ + パーティションキーの構造です。


Getするとき

レコードはシャードに分散されて格納されているので、Getはストリームではなくシャードに対して行います。シャードに入ったレコードはシーケンス番号を持っています。このシーケンス番号により、何処までレコードを読み込んだかを管理します。

KinesisはPull型のPub/Subです。Google Pub/Subはエンドポイントを用意すればそこにPushする仕組みを取れるらしいです。


KCLがやってくれること

何処までレコードを読み込んだのか、複数サーバーで処理するときとか・・、シャードを増やしたときとか・・、ポーリングしなきゃ?・・・なんか実装めんどくさそうです・・・が、この辺はKinesis Clinet Libraryが全部やってくれます。


  • チェックポイント処理


    • 何処まで処理したかをDynamoDBの保存する



  • ワーカーの管理


    • そのシャードはどのワーカーが担当しているのか

    • ワーカーが死んでいたら開いてるワーカーにスイッチ



(あれ、意外と長くなってきた。)詳しいことはAmazonのドキュメントに日本語でのっています。

なので我々は「レコードをどう処理するか」を実装します。具体的にはIRecordProcessorを継承したRecordProcessorを作ります。


KCLを動かす


まずストリームを作る

# stream名をきめておく

stream_name="kinesis-test-stream"

# streamを作成する
aws kinesis create-stream --stream-name $stream_name --shard-count 1

# StreamStatusがCREATINGからACTIVEになるのを確認する。
aws kinesis describe-stream --stream-name $stream_name


scalaの環境を作ります

ストリームが作られるのを待っている間にScalaでKinesis Consumerの準備をします。

1.ディレクトリ作成

mkdir kinesis-consumer

cd kinesis-consumer
vim build.sbt

2.build.sbt作成


build.sbt

name := "kinesis-client-library-start"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
"com.amazonaws" % "amazon-kinesis-client" % "1.6.1"
)


3.scala

src/main/scala/Main.scala

import java.net.InetAddress

import java.util
import java.util.UUID
import com.amazonaws.internal.StaticCredentialsProvider
import com.amazonaws.auth.{BasicAWSCredentials, AWSCredentialsProvider}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessorCheckpointer, IRecordProcessor, IRecordProcessorFactory}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{Worker, KinesisClientLibConfiguration}
import com.amazonaws.services.kinesis.clientlibrary.types.{UserRecord, ShutdownReason}
import com.amazonaws.services.kinesis.model.Record

object Main {
val accessKeyId = System.getProperty("accessKeyId")
val secretAccessKey = System.getProperty("secretAccessKey")

val appName = "kinesis-test-app"
val streamName = "kinesis-test-stream"

val initialPosition = "LATEST"
val region = "ap-northeast-1"
val idleTimeBetweenReadsInMillis = 3000

def main(args: Array[String]): Unit = {
val workerId = InetAddress.getLocalHost.getCanonicalHostName + ":" + UUID.randomUUID
val credentialsProvider: AWSCredentialsProvider = new StaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, secretAccessKey))

val kclConf = new KinesisClientLibConfiguration(appName, streamName, credentialsProvider, workerId)
.withInitialPositionInStream(InitialPositionInStream.valueOf(initialPosition))
.withRegionName(region)
.withIdleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis)

val worker = new Worker(StreamTailProcessor.processorFactory, kclConf)
println(s"worker start. name:$appName stream:$streamName workerId:$workerId")
worker.run()

}
}

class StreamTailProcessor extends IRecordProcessor{
override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason): Unit = {
println(s"Shutting down record processor")
}

override def initialize(shardId: String): Unit = {
println(s"Initialising record processor for shard: $shardId")
}

override def processRecords(records: util.List[Record], checkpointer: IRecordProcessorCheckpointer): Unit = {
import scala.collection.JavaConversions._
records foreach { r =>
val line = new String(r.getData.array)
println(s"[stream-tail] $line")
}
}
}

object StreamTailProcessor {
def processorFactory = new IRecordProcessorFactory {
def createProcessor(): IRecordProcessor = new StreamTailProcessor
}
}


実行します

ストリームがACTIVEになったら、scalaのkinesis consumerを起動します。

1.実行します

sbt run -DaccessKeyId=XXX -DsecretAccessKey=XXX

2.ストリームにデータを投入します

aws kinesis put-record --stream-name $stream_name --data hogehoge_$RANDOM --partition-key $RANDOM

3.正座待機

kinesisのinfoログと共に以下の様な標準出力が得られました。

[stream-tail] hogehoge_18591

4.クロージング

満足したらストリームを削除します。放置すると課金されます。

aws kinesis delete-stream --stream-name $stream_name


終わり

Kinesisにデータを入れて(Producer)、処理する(Consumer)ところまで体験できました。

サンプルコードは以下においてあります。

https://github.com/oshiro-kazuma/kinesis-client-library-start