LoginSignup
1
1

More than 1 year has passed since last update.

Kinesis Data Streamsのモニタリングを調べてみた

Posted at

背景・目的

Kinesis Data Streamsのボトルネック調査のため、確認した内容を整理します。
今回は、拡張ファンアウトはオフの状態で確認します。

まとめ

以下の順番で確認をする。

  • 書き込み、読み込みの成否
    • PutRecord.Success
    • GetRecords.Success
    • WriteProvisionedThroughputExceeded
    • ReadProvisionedThroughputExceeded
  • 遅延状態
    • IteratorAgeMilliseconds
  • 性能
    • PutRecord.Latency
    • GetRecords.Latency

概要

大きく、ストリームレベルとシャードレベルの2種類があります。

ストリームレベル(基本) シャードレベル(拡張)
収集サイクル 1分 1分
料金 なし 追加料金あり
有効化 デフォルト EnableEnhanceMonitoringで明示的に有効化する

ストリームレベル

主に、以下のメトリクスが確認できます。

分類 目的 メトリクス 単位 説明 判断基準
プロデューサー 処理の成功/失敗 PutRecord Success カウント 指定された期間に測定された、Kinesis ストリームごとの成功した PutRecord オペレーションの数。
Average はストリームへの書き込み成功率を反映
SuccessfulRecords カウント 指定された期間に測定された、Kinesis Data Streams ごとの PutRecords オペレーションの正常なレコード数。
TotalRecordsに対して、SuccessfulRecordsを確認して、成功率がわかる。
失敗したレコードの有効な再試行ロジックを確認。
FailedRecords カウント 指定された期間に測定された、Kinesis Data Streams ごとに PutRecords オペレーションで内部障害のために拒否されたレコードの数。
時折内部障害が予想されるため、再試行する必要がある。
0が正常
ThrottledRecords カウント 指定された期間に測定された、Kinesis Data Streams ごとに PutRecords オペレーションでスロットリングのために拒否されたレコードの数。 0が正常
WriteProvisionedThroughputExceeded カウント 指定された期間にストリームのスロットリングにより拒否されたレコードの数。
このメトリクスには、PutRecord および PutRecords オペレーションのスロットリングが含まれる。
0が正常
性能 PutRecord Latency ミリ秒 指定された期間に測定された PutRecords オペレーションごとにかかった時間。 0に近いほどよい 
流量 PutRecord TotalRecords カウント 指定された期間に測定された、Kinesis Data Streams ごとに PutRecords オペレーションで送信されたレコードの総数。
Bytes バイト 指定された期間に PutRecords オペレーションを使用して Kinesis ストリームに送信されたバイト数。
--- IncomingRecords カウント 指定された期間に、Kinesis ストリームに正常に送信されたレコードの数。
このメトリクスには、PutRecord および PutRecords オペレーションのレコード数が含まれる。
--- IncomingBytes バイト 指定された期間に、Kinesis ストリームに正常に送信されたバイト数
このメトリクスには、PutRecord および PutRecords オペレーションのバイト数が含まれる。
コンシューマー 処理の成功/失敗 GetRecords Success カウント 指定された期間に測定された、ストリームごとの成功した GetRecords オペレーションの数。
SubscribeToShard Success カウント サブスクリプションが正常に確立されたかどうかを記録
RateExceeded 同じコンシューマーによるアクティブなサブスクリプションがすでにあるため新しいサブスクリプションが失敗したとき、またはこのオペレーションで許可される 1 秒あたりの呼び出し回数を超えた場合に出力 0が正常
--- ReadProvisionedThroughputExceeded Kinesis Data Streams のサービスまたはシャードの制限を超えて、所定の期間にスロットリングされた GetRecords 呼び出しの数 0が正常
性能 GetRecords Latency ミリ秒 指定された期間に測定された GetRecords オペレーションごとにかかった時間。 0に近いほどよい
IteratorAgeMilliseconds ミリ秒 すべての GetRecords リクエストについて、ストリーム内の最後のレコードの経過時間をミリ秒単位で測定 0に近いほどよい
SubscribeToShard MillisBehindLatest ミリ秒 現在の時刻と最後のレコードが記録されている時刻の差
流量 GetRecords Bytes バイト 指定された期間に測定された、Kinesis ストリームから取得したバイト数
GetRecords Records カウント 指定された期間に測定された、シャードから取得したレコード数
SubscribeToShard Bytes バイト 指定された期間に測定された、シャードから受信したバイト数。
SubscribeToShard Records カウント 指定された期間に測定された、シャードから受信したレコード数。

実践

メトリクスの確認

前提

Kinesis Data Streams

  1. シャード数が1のプロビジョニングモードのKDSを元に検証します。
    image.png

  2. EnhancedMonitoringを有効化します。

sh-4.2$ aws kinesis enable-enhanced-monitoring --stream-name XXXX --shard-level-metrics ALL --region XXX
{
    "StreamName": "XXXXXX",
    "CurrentShardLevelMetrics": [],
    "DesiredShardLevelMetrics": [
        "IncomingBytes",
        "OutgoingRecords",
        "IteratorAgeMilliseconds",
        "IncomingRecords",
        "ReadProvisionedThroughputExceeded",
        "WriteProvisionedThroughputExceeded",
        "OutgoingBytes"
    ]
}

Redshift Streaming Injection

1.Kinesis専用の外部スキーマの作成

  CREATE EXTERNAL SCHEMA ext_kinesis
    FROM KINESIS
iam_role 'arn:aws:iam::{AWSアカウント}:role/XXXXX';

1.マテビュー専用のスキーマを作成

create schema report;

1.マテビューを作成

CREATE MATERIALIZED VIEW report.m_kds_connect AS
SELECT ApproximateArrivalTimestamp
       ,partitionkey
       ,shardid
       ,sequencenumber
       ,FROM_VARBYTE(data, 'utf-8')::SUPER AS payload
  FROM ext_kinesis."Kinesis Data Stream名"
 WHERE is_utf8(data) AND is_valid_json(from_varbyte(data, 'utf-8'));

テストデータ生成

  1. EC2上で、テストデータを生成するスクリプトを動かします。
$ cat random_put_to_kds.py
import json
import boto3
import datetime
import random
from datetime import timedelta

kdsname='xxxxx'
region='xxxxx'
clientkinesis= boto3.client('kinesis', region_name=region)
i=0

print("Start.")

while True:
    i = int(i) + 1
    id = 'id-' + str(random.randint(1665586, 8888888))
    execution_time = datetime.datetime.now().isoformat()
    int_value0 = random.randint(1, 65535)
    int_value1 = random.randint(1, 200000)
    int_value2 = random.randint(1, 100000)
    int_value3 = random.randint(20, 120)

    new_dict={}
    new_dict["id"] = id
    new_dict["execution_time"] = execution_time
    new_dict["int_value0"] = int_value0
    new_dict["int_value1"] = int_value1
    new_dict["int_value2"] = int_value2
    new_dict["int_value3"] = int_value3

    try:
        response=clientkinesis.put_record(StreamName=kdsname, Data=json.dumps(new_dict), PartitionKey=id)
        print("ingested:"+str(i) +",ReqID:"+ response['ResponseMetadata']['RequestId'] + ",HTTPStatusCode:"+ str(response['ResponseMetadata']['HTTPStatusCode']))
    except Exception as e:
        print(e)
$
  1. スクリプトを実行します。
$ python3 random_put_to_kds.py &
Start.
ingested:1,ReqID:f98c9196-97b7-884f-a237-1253501a2a51,HTTPStatusCode:200
ingested:2,ReqID:f1d55f5d-d967-a428-aa6e-dc9b1eca0636,HTTPStatusCode:200
・・・・

WriteProvisionedThroughputExceededの対応

WriteProvisionedThroughputExceededが発生しました。
image.png

スクリプトにエラーが返された。

botocore.errorfactory.ProvisionedThroughputExceededException: An error occurred (ProvisionedThroughputExceededException) when calling the PutRecord operation (reached max retries: 4): Rate exceeded for shard shardId-000000000000 in stream XXXXXX under account XXXXXXX.

Kinesisのシャードを増やして数値を確認する

1.シャード数を1から2に変更します。
image.png

1.変更を保存をクリックします。
image.png

1.変更されました。
image.png

項目 変更前 変更後 変化点
プロデューサー PutRecord.Success 0.97897671097 1 成功率が上がった。
WriteProvisionedThroughputExceeded 0.01524925066 0 ThroughputExceededがなくなった(改善された)
PutRecord.Latency 453,479 262,582 レイテンシーも改善
IncomingBytes 159.371873817 159.367549593
PutRecord.Bytes 159.371873817 159.367549593
IncomingRecords 71,293 43,605 バイト数は変わらないが、レコード数は減っている。
コンシューマー GetRecords.Success 7 10
ReadProvisionedThroughputExceeded 0 0
GetRecords.IteratorAge 135 194 滞留時間が延びた
GetRecords.IteratorAgeMilliseconds 135,000 194,000 滞留時間が延びた
GetRecords.Latency 2,550 3,287
GetRecords.Bytes 11,052,698 6,956,263
GetRecords.Records 69,351 43,649

IteratorAgeMillisecondsの対応①

スクリプト実行後、マテビューRedreshを実行する。

refresh materialized view  report.m_kds_connect;
---
経過時間: 06 分 53 秒

select count(1) from report.m_kds_connect;
---
2591194

マテビュー Refresh前後で、IteratorAgeMillisecondsが減少していることがわかる。

image.png

image.png

さらにマテビュー Refreshを定期実行し確認する

1.Scheduleをクリックします。

image.png

1.1分毎に実行します。
image.png

image.png

定期的に実行することで、GetRecords.IteratorAgeMillisecondsが132秒になった。IncomingRecordsが70,053、GetRecords.Recordsが63,874。インプットのほうがやや多いため、時間の経過とともに増える可能性がある。
image.png

IteratorAgeMillisecondsの対応②

  1. KDSのシャードをさらに増やします。(2→4)

KDSのシャード数の変更は、現在のシャード数の1/2〜2倍の範囲で変更が可能です。

image.png

image.png

  1. 実行数を更に増やす
# 実行前

ps -ef | grep random_put_to_kds.py
ssm-user  6830  6751 14 00:22 pts/5    00:02:24 python3 random_put_to_kds.py
ssm-user  6835  6751 14 00:22 pts/5    00:02:25 python3 random_put_to_kds.py
ssm-user  6853  6751 14 00:22 pts/5    00:02:25 python3 random_put_to_kds.py
ssm-user  6856  6751 14 00:22 pts/5    00:02:27 python3 random_put_to_kds.py
ssm-user  6869  6751 14 00:22 pts/5    00:02:26 python3 random_put_to_kds.py
ssm-user  6870  6751 14 00:22 pts/5    00:02:26 python3 random_put_to_kds.py
ssm-user  6877  6751 14 00:22 pts/5    00:02:25 python3 random_put_to_kds.py
ssm-user  6879  6751 14 00:22 pts/5    00:02:25 python3 random_put_to_kds.py
ssm-user  6902  6751 14 00:22 pts/5    00:02:27 python3 random_put_to_kds.py
ssm-user  6911  6751 14 00:22 pts/5    00:02:27 python3 random_put_to_kds.py
ssm-user  6912  6751 14 00:22 pts/5    00:02:26 python3 random_put_to_kds.py
ssm-user  6919  6751 14 00:22 pts/5    00:02:29 python3 random_put_to_kds.py
ssm-user  6929  6751 14 00:22 pts/5    00:02:26 python3 random_put_to_kds.py
ssm-user  6953  6751 14 00:22 pts/5    00:02:30 python3 random_put_to_kds.py
ssm-user  7225  6751 14 00:24 pts/5    00:02:09 python3 random_put_to_kds.py
ssm-user  7226  6751 15 00:24 pts/5    00:02:10 python3 random_put_to_kds.py
ssm-user  7228  6751 14 00:24 pts/5    00:02:06 python3 random_put_to_kds.py
ssm-user  7231  6751 14 00:24 pts/5    00:02:10 python3 random_put_to_kds.py
ssm-user  7232  6751 15 00:24 pts/5    00:02:10 python3 random_put_to_kds.py
ssm-user  7351  6751  0 00:39 pts/5    00:00:00 grep random_put_to_kds.py
$
$ ps -ef | grep random_put_to_kds.py | wc -l
20

# 実行後
## 42まで増やしました.
$ ps -ef | grep random_put_to_kds.py | wc -l
42
$

RedshiftのDistkeyを見直す

  1. IteratorAgeMillisecondsを確認します。
    image.png

  2. 変化点を確認します。流入量を増やした結果、以下の確認ができました。

  • 成功率が下がった。(プロデューサーの失敗が増えた)
  • 流入量は、バイト数は増えていないが、レコード数は増えた。
  • 滞留時間は増え、コンシューマの性能が追いついていないことがわかる。
項目 変更前 変更後 変化点
プロデューサー PutRecord.Success 1 0.99589393396 成功率は下がった。
WriteProvisionedThroughputExceeded 0 0.00410606604 閾値エラーが増えた。
PutRecord.Latency 188,079 1,011,896
IncomingBytes 159.369530033 159.373077409 流入量(バイト数)は、ほぼ変わらない。
PutRecord.Bytes 159.369530033 159.373077409 流入量(バイト数)は、ほぼ変わらない。
IncomingRecords 32,747 158,380 流入量(レコード)は増えた。
コンシューマー GetRecords.Success 6 16 流量が増えて成功数も増えた。 (つまり、変更前はコンシューマーに余裕があった)
ReadProvisionedThroughputExceeded 0 0
GetRecords.IteratorAge 104 394
GetRecords.IteratorAgeMilliseconds 104,000 394,000 滞留時間が増えた(4倍程度)
GetRecords.Latency 2,131 4,791 取得時間が延びた
GetRecords.Bytes 5,230,379 24,894,547 流出量が増えた
GetRecords.Records 32,819 156,206 流出量が増えた

3.コンシューマ(Redshift)の並列度(スライス数)を増やしてIteratorAgeMillisecondsを改善します。まずは、クラスタのノード数とノードタイプを確認します。

image.png

現在、以下のノード数とノードタイプから、スライス数は、
ra3.4xlargeのvCPU(12)*ノード数が2なので、=24スライスと考える。

  • ノードタイプは、ra3.4xlarge
  • ノード数が2

こう考えると、Kinesisのパーティション数を24まで増やしても良いかもしれない。

4.現在のテーブルのDISTKEYやSORTKEYの状況を確認します。

select * from svv_table_info where schema='report'

---
database,schema,table_id,table,encoded,diststyle,sortkey1,max_varchar,sortkey1_enc,sortkey_num,size,pct_used,empty,unsorted,stats_off,tbl_rows,skew_sortkey1,skew_rows,estimated_visible_rows,risk_event,vacuum_sort_benefit
sample,report,110106,mv_tbl__m_kds_connect__0,Y,KEY(num_rec),,256,,0,5624,0.0021,0,,5.00,84772425,,100.00,84772448,,
  • num_recというKEYでKEY分散されている。
    • num_recはマテビューの内部的なカラムだろうか??
  • SORTKEYは指定されていない
  • 現在のサイズは、5.6GB程度、行数は84,772, 425件。

5.マテビューの更新を停止します。(スケジュールを停止します。)
image.png

6.マテビューでは、alter文でdistkeyとsortkeywを設定できないようなので、再作成して様子を見ます。

  • 現distkeyは、num_recというキーでしたが、24スライス以上に分散されるキー(sequencenumber)を指定してみます。
# Drop
DROP MATERIALIZED VIEW report.m_kds_connect 

# Create(再作成)
CREATE MATERIALIZED VIEW report.m_kds_connect 
DISTSTYLE KEY
DISTKEY(4)
SORTKEY(1)
AS
SELECT ApproximateArrivalTimestamp
       ,partitionkey
       ,shardid
       ,sequencenumber
       ,FROM_VARBYTE(data, 'utf-8')::SUPER AS payload
  FROM ext_kinesis."Kinesis Data Stream名"
 WHERE is_utf8(data) AND is_valid_json(from_varbyte(data, 'utf-8'));

7.マテビューをリフレッシュします。

REFRESH MATERIALIZED VIEW report.m_kds_connect 


# ロック状態を確認
select * from SVV_TRANSACTIONS where lock_mode='ExclusiveLock' and granted='true'

--- 
txn_owner,txn_db,xid,pid,txn_start,lock_mode,lockable_object_type,relation,granted
IAM:XXX,sample,665275,1075274149,2022-10-09 07:58:41.341266,ExclusiveLock,transactionid,,true
IAM:XXX,sample,665320,1074158295,2022-10-09 07:59:38.388461,ExclusiveLock,transactionid,,true

# 実行中のクエリを確認
select pid, starttime, duration,
trim(user_name) as user,
trim (query) as querytxt
from stv_recents
where status = 'Running';

---
pid,starttime,duration,user,querytxt
1075274149,2022-10-09 07:58:41.205075,141644262,IAM:XXX,REFRESH MATERIALIZED VIEW report.m_kds_connect

image.png

Distkeyの見直し前後でIteratorAgeMillisecondsは変わらず。

項目 変更前  変更後 変化点
プロデューサー PutRecord.Success 0.99589393396 0.99534917425
WriteProvisionedThroughputExceeded 0.00410606604 0.00465082575
PutRecord.Latency 1,011,896 917,820
IncomingBytes 159.373077409 159.370078427
PutRecord.Bytes 159.373077409 159.370078427
IncomingRecords 158,380 151,095
コンシューマー GetRecords.Success 16 16
ReadProvisionedThroughputExceeded 0 0
GetRecords.IteratorAge 394 371
GetRecords.IteratorAgeMilliseconds 394,000 371,000 それほど、変わらない。
GetRecords.Latency 4,791 3,278
GetRecords.Bytes 24,894,547 23,357,850
GetRecords.Records 156,206 146,559

KDSのシャード数を見なおす

Redshiftのスライス数が24なので、同様のシャード数とする。

  1. シャード数を変更
    image.png
    image.png

  2. シャード見直しにより、IteratorAgeMillisecondsが、若干短縮された。
    3.71s → 2.76s

参考

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1