13
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

ScalaでKinesisのConsumerを作るチュートリアル

Last updated at Posted at 2015-12-10

概要

ストリームを作り、Scalaで内容をtailするところまでを。書いているとおりに実行すれば動くことを目標にしてます。

Kinesisはざっくり言うと大規模OK Pub/Subメッセージキューという理解でいいと思います。

ストリームがPub/Subキューみたいなもので、レコードは24時間保持されます(現在は24時間以上設定で変更可能)。シャードをいくつか束ねて構成されていて、シャードを増やすことでPut/GetのQPSを増やすことが出来ます。パーティションキーによりどのシャードに入るか決まります。なのでホットシャードが起きないパーティションキーとなるように設計する必要があります。迷ったら乱数を入れるといいと思います。

偏りができて、あるシャードだけが忙しい、みたいな状況になるとシャードを増やしてもパフォーマンスを向上させることができなくなります。

Put後にシャード振り分ける仕組み

レコードのPutはストリームに対して行います。Putしたとき、パーティションキーはmd5でhash化され10進数化されます。シャードはパーティションキーのRangeを持っていて、Rangeがにあうシャードにレコードが格納されます。

Recordはデータ + パーティションキーの構造です。

Getするとき

レコードはシャードに分散されて格納されているので、Getはストリームではなくシャードに対して行います。シャードに入ったレコードはシーケンス番号を持っています。このシーケンス番号により、何処までレコードを読み込んだかを管理します。

KinesisはPull型のPub/Subです。Google Pub/Subはエンドポイントを用意すればそこにPushする仕組みを取れるらしいです。

KCLがやってくれること

何処までレコードを読み込んだのか、複数サーバーで処理するときとか・・、シャードを増やしたときとか・・、ポーリングしなきゃ?・・・なんか実装めんどくさそうです・・・が、この辺はKinesis Clinet Libraryが全部やってくれます。

  • チェックポイント処理
    • 何処まで処理したかをDynamoDBの保存する
  • ワーカーの管理
    • そのシャードはどのワーカーが担当しているのか
    • ワーカーが死んでいたら開いてるワーカーにスイッチ

(あれ、意外と長くなってきた。)詳しいことはAmazonのドキュメントに日本語でのっています。

なので我々は「レコードをどう処理するか」を実装します。具体的にはIRecordProcessorを継承したRecordProcessorを作ります。

KCLを動かす

まずストリームを作る

# stream名をきめておく
stream_name="kinesis-test-stream"

# streamを作成する
aws kinesis create-stream --stream-name $stream_name --shard-count 1

# StreamStatusがCREATINGからACTIVEになるのを確認する。
aws kinesis describe-stream --stream-name $stream_name

scalaの環境を作ります

ストリームが作られるのを待っている間にScalaでKinesis Consumerの準備をします。

1.ディレクトリ作成

mkdir kinesis-consumer
cd kinesis-consumer
vim build.sbt

2.build.sbt作成

build.sbt
name := "kinesis-client-library-start"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
  "com.amazonaws" % "amazon-kinesis-client" % "1.6.1"
)

3.scala
src/main/scala/Main.scala

import java.net.InetAddress
import java.util
import java.util.UUID
import com.amazonaws.internal.StaticCredentialsProvider
import com.amazonaws.auth.{BasicAWSCredentials, AWSCredentialsProvider}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessorCheckpointer, IRecordProcessor, IRecordProcessorFactory}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{Worker, KinesisClientLibConfiguration}
import com.amazonaws.services.kinesis.clientlibrary.types.{UserRecord, ShutdownReason}
import com.amazonaws.services.kinesis.model.Record

object Main {
  val accessKeyId = System.getProperty("accessKeyId")
  val secretAccessKey = System.getProperty("secretAccessKey")

  val appName = "kinesis-test-app"
  val streamName = "kinesis-test-stream"

  val initialPosition = "LATEST"
  val region = "ap-northeast-1"
  val idleTimeBetweenReadsInMillis = 3000

  def main(args: Array[String]): Unit = {
    val workerId = InetAddress.getLocalHost.getCanonicalHostName + ":" + UUID.randomUUID
    val credentialsProvider: AWSCredentialsProvider = new StaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, secretAccessKey))

    val kclConf = new KinesisClientLibConfiguration(appName, streamName, credentialsProvider, workerId)
      .withInitialPositionInStream(InitialPositionInStream.valueOf(initialPosition))
      .withRegionName(region)
      .withIdleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis)

    val worker = new Worker(StreamTailProcessor.processorFactory, kclConf)
    println(s"worker start. name:$appName stream:$streamName workerId:$workerId")
    worker.run()

  }
}

class StreamTailProcessor extends IRecordProcessor{
  override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason): Unit = {
    println(s"Shutting down record processor")
  }

  override def initialize(shardId: String): Unit = {
    println(s"Initialising record processor for shard: $shardId")
  }

  override def processRecords(records: util.List[Record], checkpointer: IRecordProcessorCheckpointer): Unit = {
    import scala.collection.JavaConversions._
    records foreach { r =>
      val line = new String(r.getData.array)
      println(s"[stream-tail] $line")
    }
  }
}

object StreamTailProcessor {
  def processorFactory = new IRecordProcessorFactory {
    def createProcessor(): IRecordProcessor = new StreamTailProcessor
  }
}

実行します

ストリームがACTIVEになったら、scalaのkinesis consumerを起動します。

1.実行します

sbt run -DaccessKeyId=XXX -DsecretAccessKey=XXX

2.ストリームにデータを投入します

aws kinesis put-record --stream-name $stream_name --data hogehoge_$RANDOM --partition-key $RANDOM

3.正座待機

kinesisのinfoログと共に以下の様な標準出力が得られました。

[stream-tail] hogehoge_18591

4.クロージング

満足したらストリームを削除します。放置すると課金されます。

aws kinesis delete-stream --stream-name $stream_name

終わり

Kinesisにデータを入れて(Producer)、処理する(Consumer)ところまで体験できました。
サンプルコードは以下においてあります。

13
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?