1.Amazon Kinesisとは#
ストリームデータを収集・処理するためのフルマネージド型サービスです。
1-1 Amazon Kinesis Streams##
ストリームデータ処理用の分析システムやアプリケーションを構築する。
- 順序付きイベントストリームとして複数のアプリケーションから同時アクセス可能。シーケンス番号が付与される。
- データの種類や処理の応じて「ストリーム」を作成し、1つ以上のの「シャード」で構成され、この数でスループットを調整する。
- 保存期間はデフォルトで24時間、最長7日間。
- 1データレコードのサイズは最大1MB。
- Amazon Kinesis Streamsのサービス制限について
プロデューサー (データ送信側)###
Amazon Kinesis Agent
Amazon Kinesis サービスにデータを簡単に収集して取り込む OSS のスタンドアロンJava アプリケーション
Amazon Kinesis Producer Library (KPL)
Amazon Kinesis Streams にデータを送信する OSS の補助ライブラリ
Fluent plugin for Amazon Kinesis
Amazon Kinesis Streams と AmazonKinesis Firehoseにイベントを送信するOSSのFluentd 出力プラグイン
Amazon Kinesis Data Generator (KDG)
Amazon Kinesis Data Generator (KDG) を利用して Amazon Kinesis Streams または Amazon Kinesis Firehose にテストデータを簡単に送信できる
コンシューマー (データ処理側)###
Amazon Kinesis Client Library (KCL)
KCL を利用して Kinesis アプリケーションを作成できる
暗号化###
- KMSによる保存中データの暗号化
- httpsエンドポイントを使用した転送データの暗号化
重複レコードの処理###
レコードが複数回 Amazon Kinesis Data Streams applicationに配信される理由は、
- プロデューサーの再試行
- コンシューマーの再試行
の 2 つになります。
アプリケーションは、各レコードの複数回処理を予測して適切に処理する必要があります。
プロデューサーで PutRecord を呼び出してから Amazon Kinesis Data Streams の受信確認を受け取るまでの間に、ネットワーク関連のタイムアウトを発生する場合があります。
この場合、プロデューサーはレコードが Kinesis Data Streams に配信されたかどうかを確認できません。
各レコードがアプリケーションにとって重要であれば、同じデータを使用して呼び出しを再試行するようにプロデューサーが定義されているはずです。
同じデータを使用した PutRecord の呼び出しが両方とも Kinesis Data Streams に正常にコミットされると、Kinesis Data Streams レコードは 2 つになります。
2 つのレコードは、データは同じでも、一意のシーケンス番号が付けられています。
厳密な保証を必要とするアプリケーションは、後で処理するときに重複を削除するようにレコード内にプライマリキーを埋め込む必要があります。
プロデューサーの再試行に起因する重複の数が、コンシューマーの再試行に起因する重複の数より通常は少ないことに注意してください。
したがって、レコード内にプライマリキーを埋め込み、後で処理するときに重複を削除するという対応が必要となります。
リシャーディング###
データ転送速度が増加した場合、データフローのシャード数を調整して、データフローのレートの変化に適用させる。
- シャードの分割
- シャードの結合
の2種類の操作が行われます。
シャードの分割
1つのシャードを2つのシャードに分割して、ストリームのスループットを向上させます
シャードの結合
2つのシャードを1つのシャードに結合して、ストリームのスループットを低下させます
1-2 Amazon Kinesis Firehose##
- "配信先に応じて"「配信ストリーム」を作成
- シャードの作成は不要。
- 1 データレコードの最大サイズは 1 MB
- 制限なしにスケールするよう設計
- Amazon Kinesis Streams から Firehose へ直接ストリームデータを送ることが可能
1-3 Amazon Kinesis Analytics##
ストリームデータを標準的なSQLクエリでリアルタイムに分析する。
- 分析単位に「アプリケーション」を作成し、「ソース/デスティネーション」を設定
- SQL クエリ実行の前処理として、Lambda 関数の指定が可能
- 1 入力行の最大サイズは 50 KB/参照ソースの最大サイズは 1 GB
2.Amazon KinesisとSQSの比較#
比較表 | SQS(Amazon Simple Queue Service) | Kinesis |
---|---|---|
概要 | シンプルなメッセージングシステム。 | ストリーミング処理(ビッグデータなど) |
仕組み | ・シンプル(投入、取り出し、削除)・シーケンス番号なし。 | ・レコード、シーケンス番号、パーティション、データBLOBなど構成や操作はSQSと比較して複雑。・シーケンス番号あり。 |
目的 | 主にコンピュータ間の処理の連結部分。(Kinesisのような瞬発的な大量データの処理には向かない) | 大量データの処理が可能。1シャドーで最大 1000/秒のPUTの処理が可能。(シャドーを増やすことにより性能アップ)。一般大衆向けのWeb投票システムなど。 |
保存期間 | 開封後30秒間処理されない場合は未開封となる。デフォルトで14日間保存される。 | デフォルトで24時間保存。何度でも取得が可能。 |
入れるもの | 主にコマンドを入れる。 | 主にデータを入れる。 |
価格 | Kinesisより高 | SQSより安 |
3.Demo#
1.Amazon Kinesis Streams の構築##
- Kinesis ストリームの名前:任意
- シャード数:5 → 大きくすれば高性能になります。
基本的にKinesis Streamsの設定はこれだけです。
後は取得するデータと連携させると、データが流れ込み、分散処理をしてくれます。
2.Amazon Kinesis Firehose の作成##
Step 1: Name and source###
- Delivery stream name:任意
- Source:Kinesis stream → ダイレクトにデータを送るか(Direct PUT or other sourcesKinesis stream)もしくは(Kinesis stream)を選択することできます。
- Kinesis stream:「1.Amazon Kinesis Streams の構築」で作成したものを指定。
Step 2: Process records###
Lambdaに繋げていき、Lambdaが変換するといったような処理や、レコードフォーマットを変えていくというような設定もできますが、今回はいずれも Disabled にしておきます。
Step 3: Choose destination###
- Destination:S3 データの送り先を選択することができます。
- S3 bucket:S3バケットを指定。
Step 4: Configure settings###
S3のバッファを作成したり、暗号化したりできますが、今回は特に何もしません。
エラーのログを取得したり、タグをつけることができますが、今回は何もしません。
IAMロールのみ作成して、付与します。
これで、Data StreamsからS3に流れるよといった設定ができました。
Step 5: Review###
「Create delivery stream」をクリック。
3.Amazon Kinesis Analyticsの作成##
- Application name:任意
- Description:任意
- Runtime:SQL → SQLで解析するようにします。
「Connect streaming data」をクリック。
しかし、KinesisStreamにはまだデータがないので、「Configure a new stream」でDemo環境を作成していきます。
「Create demo stream」をクリックします。
「Choose source」に戻ります。
- Source:Kinesis stream
- Kinesis stream:先のDemo環境のものを指定
画面下までいくと、Demo環境のデータのスキーマが完了していることがわかります。
「Go to SQL Editor」をクリックします。
一から作成することも可能ですが、「Add SQL from templates」から用意されたものを使うことも可能です。
「Save and run SQL」で解析ができるようになり、フィルタリングされたデータが表示されるようになります。
データを使った解析ができるようにになりました。これが、Amazon Kinesis Analyticsの仕事です。