概要
ストリームを作り、Scalaで内容をtailするところまでを。書いているとおりに実行すれば動くことを目標にしてます。
Kinesisはざっくり言うと大規模OK Pub/Subメッセージキューという理解でいいと思います。
ストリームがPub/Subキューみたいなもので、レコードは24時間保持されます(現在は24時間以上設定で変更可能)。シャードをいくつか束ねて構成されていて、シャードを増やすことでPut/GetのQPSを増やすことが出来ます。パーティションキーによりどのシャードに入るか決まります。なのでホットシャードが起きないパーティションキーとなるように設計する必要があります。迷ったら乱数を入れるといいと思います。
偏りができて、あるシャードだけが忙しい、みたいな状況になるとシャードを増やしてもパフォーマンスを向上させることができなくなります。
Put後にシャード振り分ける仕組み
レコードのPutはストリームに対して行います。Putしたとき、パーティションキーはmd5でhash化され10進数化されます。シャードはパーティションキーのRangeを持っていて、Rangeがにあうシャードにレコードが格納されます。
Recordはデータ + パーティションキー
の構造です。
Getするとき
レコードはシャードに分散されて格納されているので、Getはストリームではなくシャードに対して行います。シャードに入ったレコードはシーケンス番号を持っています。このシーケンス番号により、何処までレコードを読み込んだかを管理します。
KinesisはPull型のPub/Subです。Google Pub/Subはエンドポイントを用意すればそこにPushする仕組みを取れるらしいです。
KCLがやってくれること
何処までレコードを読み込んだのか、複数サーバーで処理するときとか・・、シャードを増やしたときとか・・、ポーリングしなきゃ?・・・なんか実装めんどくさそうです・・・が、この辺はKinesis Clinet Libraryが全部やってくれます。
- チェックポイント処理
- 何処まで処理したかをDynamoDBの保存する
- ワーカーの管理
- そのシャードはどのワーカーが担当しているのか
- ワーカーが死んでいたら開いてるワーカーにスイッチ
(あれ、意外と長くなってきた。)詳しいことはAmazonのドキュメントに日本語でのっています。
なので我々は「レコードをどう処理するか」を実装します。具体的にはIRecordProcessorを継承したRecordProcessorを作ります。
KCLを動かす
まずストリームを作る
# stream名をきめておく
stream_name="kinesis-test-stream"
# streamを作成する
aws kinesis create-stream --stream-name $stream_name --shard-count 1
# StreamStatusがCREATINGからACTIVEになるのを確認する。
aws kinesis describe-stream --stream-name $stream_name
scalaの環境を作ります
ストリームが作られるのを待っている間にScalaでKinesis Consumerの準備をします。
1.ディレクトリ作成
mkdir kinesis-consumer
cd kinesis-consumer
vim build.sbt
2.build.sbt作成
name := "kinesis-client-library-start"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"com.amazonaws" % "amazon-kinesis-client" % "1.6.1"
)
3.scala
src/main/scala/Main.scala
import java.net.InetAddress
import java.util
import java.util.UUID
import com.amazonaws.internal.StaticCredentialsProvider
import com.amazonaws.auth.{BasicAWSCredentials, AWSCredentialsProvider}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessorCheckpointer, IRecordProcessor, IRecordProcessorFactory}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{Worker, KinesisClientLibConfiguration}
import com.amazonaws.services.kinesis.clientlibrary.types.{UserRecord, ShutdownReason}
import com.amazonaws.services.kinesis.model.Record
object Main {
val accessKeyId = System.getProperty("accessKeyId")
val secretAccessKey = System.getProperty("secretAccessKey")
val appName = "kinesis-test-app"
val streamName = "kinesis-test-stream"
val initialPosition = "LATEST"
val region = "ap-northeast-1"
val idleTimeBetweenReadsInMillis = 3000
def main(args: Array[String]): Unit = {
val workerId = InetAddress.getLocalHost.getCanonicalHostName + ":" + UUID.randomUUID
val credentialsProvider: AWSCredentialsProvider = new StaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, secretAccessKey))
val kclConf = new KinesisClientLibConfiguration(appName, streamName, credentialsProvider, workerId)
.withInitialPositionInStream(InitialPositionInStream.valueOf(initialPosition))
.withRegionName(region)
.withIdleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis)
val worker = new Worker(StreamTailProcessor.processorFactory, kclConf)
println(s"worker start. name:$appName stream:$streamName workerId:$workerId")
worker.run()
}
}
class StreamTailProcessor extends IRecordProcessor{
override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason): Unit = {
println(s"Shutting down record processor")
}
override def initialize(shardId: String): Unit = {
println(s"Initialising record processor for shard: $shardId")
}
override def processRecords(records: util.List[Record], checkpointer: IRecordProcessorCheckpointer): Unit = {
import scala.collection.JavaConversions._
records foreach { r =>
val line = new String(r.getData.array)
println(s"[stream-tail] $line")
}
}
}
object StreamTailProcessor {
def processorFactory = new IRecordProcessorFactory {
def createProcessor(): IRecordProcessor = new StreamTailProcessor
}
}
実行します
ストリームがACTIVEになったら、scalaのkinesis consumerを起動します。
1.実行します
sbt run -DaccessKeyId=XXX -DsecretAccessKey=XXX
2.ストリームにデータを投入します
aws kinesis put-record --stream-name $stream_name --data hogehoge_$RANDOM --partition-key $RANDOM
3.正座待機
kinesisのinfoログと共に以下の様な標準出力が得られました。
[stream-tail] hogehoge_18591
4.クロージング
満足したらストリームを削除します。放置すると課金されます。
aws kinesis delete-stream --stream-name $stream_name
終わり
Kinesisにデータを入れて(Producer)、処理する(Consumer)ところまで体験できました。
サンプルコードは以下においてあります。