背景・目的
Kinesis Data Streamsのボトルネック調査のため、確認した内容を整理します。
今回は、拡張ファンアウトはオフの状態で確認します。
まとめ
以下の順番で確認をする。
- 書き込み、読み込みの成否
- PutRecord.Success
- GetRecords.Success
- WriteProvisionedThroughputExceeded
- ReadProvisionedThroughputExceeded
- 遅延状態
- IteratorAgeMilliseconds
- 性能
- PutRecord.Latency
- GetRecords.Latency
概要
大きく、ストリームレベルとシャードレベルの2種類があります。
ストリームレベル(基本) | シャードレベル(拡張) | |
---|---|---|
収集サイクル | 1分 | 1分 |
料金 | なし | 追加料金あり |
有効化 | デフォルト | EnableEnhanceMonitoringで明示的に有効化する |
ストリームレベル
主に、以下のメトリクスが確認できます。
分類 | 目的 | メトリクス | 単位 | 説明 | 判断基準 | |
---|---|---|---|---|---|---|
プロデューサー | 処理の成功/失敗 | PutRecord | Success | カウント | 指定された期間に測定された、Kinesis ストリームごとの成功した PutRecord オペレーションの数。 Average はストリームへの書き込み成功率を反映 |
|
SuccessfulRecords | カウント | 指定された期間に測定された、Kinesis Data Streams ごとの PutRecords オペレーションの正常なレコード数。 TotalRecordsに対して、SuccessfulRecordsを確認して、成功率がわかる。 失敗したレコードの有効な再試行ロジックを確認。 |
||||
FailedRecords | カウント | 指定された期間に測定された、Kinesis Data Streams ごとに PutRecords オペレーションで内部障害のために拒否されたレコードの数。 時折内部障害が予想されるため、再試行する必要がある。 |
0が正常 | |||
ThrottledRecords | カウント | 指定された期間に測定された、Kinesis Data Streams ごとに PutRecords オペレーションでスロットリングのために拒否されたレコードの数。 | 0が正常 | |||
WriteProvisionedThroughputExceeded | カウント | 指定された期間にストリームのスロットリングにより拒否されたレコードの数。 このメトリクスには、PutRecord および PutRecords オペレーションのスロットリングが含まれる。 |
0が正常 | |||
性能 | PutRecord | Latency | ミリ秒 | 指定された期間に測定された PutRecords オペレーションごとにかかった時間。 | 0に近いほどよい | |
流量 | PutRecord | TotalRecords | カウント | 指定された期間に測定された、Kinesis Data Streams ごとに PutRecords オペレーションで送信されたレコードの総数。 | ||
Bytes | バイト | 指定された期間に PutRecords オペレーションを使用して Kinesis ストリームに送信されたバイト数。 | ||||
--- | IncomingRecords | カウント | 指定された期間に、Kinesis ストリームに正常に送信されたレコードの数。 このメトリクスには、PutRecord および PutRecords オペレーションのレコード数が含まれる。 |
|||
--- | IncomingBytes | バイト | 指定された期間に、Kinesis ストリームに正常に送信されたバイト数 このメトリクスには、PutRecord および PutRecords オペレーションのバイト数が含まれる。 |
|||
コンシューマー | 処理の成功/失敗 | GetRecords | Success | カウント | 指定された期間に測定された、ストリームごとの成功した GetRecords オペレーションの数。 | |
SubscribeToShard | Success | カウント | サブスクリプションが正常に確立されたかどうかを記録 | |||
RateExceeded | % | 同じコンシューマーによるアクティブなサブスクリプションがすでにあるため新しいサブスクリプションが失敗したとき、またはこのオペレーションで許可される 1 秒あたりの呼び出し回数を超えた場合に出力 | 0が正常 | |||
--- | ReadProvisionedThroughputExceeded | % | Kinesis Data Streams のサービスまたはシャードの制限を超えて、所定の期間にスロットリングされた GetRecords 呼び出しの数 | 0が正常 | ||
性能 | GetRecords | Latency | ミリ秒 | 指定された期間に測定された GetRecords オペレーションごとにかかった時間。 | 0に近いほどよい | |
IteratorAgeMilliseconds | ミリ秒 | すべての GetRecords リクエストについて、ストリーム内の最後のレコードの経過時間をミリ秒単位で測定 | 0に近いほどよい | |||
SubscribeToShard | MillisBehindLatest | ミリ秒 | 現在の時刻と最後のレコードが記録されている時刻の差 | |||
流量 | GetRecords | Bytes | バイト | 指定された期間に測定された、Kinesis ストリームから取得したバイト数 | ||
GetRecords | Records | カウント | 指定された期間に測定された、シャードから取得したレコード数 | |||
SubscribeToShard | Bytes | バイト | 指定された期間に測定された、シャードから受信したバイト数。 | |||
SubscribeToShard | Records | カウント | 指定された期間に測定された、シャードから受信したレコード数。 |
実践
メトリクスの確認
前提
Kinesis Data Streams
sh-4.2$ aws kinesis enable-enhanced-monitoring --stream-name XXXX --shard-level-metrics ALL --region XXX
{
"StreamName": "XXXXXX",
"CurrentShardLevelMetrics": [],
"DesiredShardLevelMetrics": [
"IncomingBytes",
"OutgoingRecords",
"IteratorAgeMilliseconds",
"IncomingRecords",
"ReadProvisionedThroughputExceeded",
"WriteProvisionedThroughputExceeded",
"OutgoingBytes"
]
}
Redshift Streaming Injection
1.Kinesis専用の外部スキーマの作成
CREATE EXTERNAL SCHEMA ext_kinesis
FROM KINESIS
iam_role 'arn:aws:iam::{AWSアカウント}:role/XXXXX';
1.マテビュー専用のスキーマを作成
create schema report;
1.マテビューを作成
CREATE MATERIALIZED VIEW report.m_kds_connect AS
SELECT ApproximateArrivalTimestamp
,partitionkey
,shardid
,sequencenumber
,FROM_VARBYTE(data, 'utf-8')::SUPER AS payload
FROM ext_kinesis."Kinesis Data Stream名"
WHERE is_utf8(data) AND is_valid_json(from_varbyte(data, 'utf-8'));
テストデータ生成
- EC2上で、テストデータを生成するスクリプトを動かします。
$ cat random_put_to_kds.py
import json
import boto3
import datetime
import random
from datetime import timedelta
kdsname='xxxxx'
region='xxxxx'
clientkinesis= boto3.client('kinesis', region_name=region)
i=0
print("Start.")
while True:
i = int(i) + 1
id = 'id-' + str(random.randint(1665586, 8888888))
execution_time = datetime.datetime.now().isoformat()
int_value0 = random.randint(1, 65535)
int_value1 = random.randint(1, 200000)
int_value2 = random.randint(1, 100000)
int_value3 = random.randint(20, 120)
new_dict={}
new_dict["id"] = id
new_dict["execution_time"] = execution_time
new_dict["int_value0"] = int_value0
new_dict["int_value1"] = int_value1
new_dict["int_value2"] = int_value2
new_dict["int_value3"] = int_value3
try:
response=clientkinesis.put_record(StreamName=kdsname, Data=json.dumps(new_dict), PartitionKey=id)
print("ingested:"+str(i) +",ReqID:"+ response['ResponseMetadata']['RequestId'] + ",HTTPStatusCode:"+ str(response['ResponseMetadata']['HTTPStatusCode']))
except Exception as e:
print(e)
$
- スクリプトを実行します。
$ python3 random_put_to_kds.py &
Start.
ingested:1,ReqID:f98c9196-97b7-884f-a237-1253501a2a51,HTTPStatusCode:200
ingested:2,ReqID:f1d55f5d-d967-a428-aa6e-dc9b1eca0636,HTTPStatusCode:200
・・・・
WriteProvisionedThroughputExceededの対応
WriteProvisionedThroughputExceededが発生しました。
スクリプトにエラーが返された。
botocore.errorfactory.ProvisionedThroughputExceededException: An error occurred (ProvisionedThroughputExceededException) when calling the PutRecord operation (reached max retries: 4): Rate exceeded for shard shardId-000000000000 in stream XXXXXX under account XXXXXXX.
Kinesisのシャードを増やして数値を確認する
項目 | 変更前 | 変更後 | 変化点 | |
---|---|---|---|---|
プロデューサー | PutRecord.Success | 0.97897671097 | 1 | 成功率が上がった。 |
WriteProvisionedThroughputExceeded | 0.01524925066 | 0 | ThroughputExceededがなくなった(改善された) | |
PutRecord.Latency | 453,479 | 262,582 | レイテンシーも改善 | |
IncomingBytes | 159.371873817 | 159.367549593 | ||
PutRecord.Bytes | 159.371873817 | 159.367549593 | ||
IncomingRecords | 71,293 | 43,605 | バイト数は変わらないが、レコード数は減っている。 | |
コンシューマー | GetRecords.Success | 7 | 10 | |
ReadProvisionedThroughputExceeded | 0 | 0 | ||
GetRecords.IteratorAge | 135 | 194 | 滞留時間が延びた | |
GetRecords.IteratorAgeMilliseconds | 135,000 | 194,000 | 滞留時間が延びた | |
GetRecords.Latency | 2,550 | 3,287 | ||
GetRecords.Bytes | 11,052,698 | 6,956,263 | ||
GetRecords.Records | 69,351 | 43,649 |
IteratorAgeMillisecondsの対応①
スクリプト実行後、マテビューRedreshを実行する。
refresh materialized view report.m_kds_connect;
---
経過時間: 06 分 53 秒
select count(1) from report.m_kds_connect;
---
2591194
マテビュー Refresh前後で、IteratorAgeMillisecondsが減少していることがわかる。
さらにマテビュー Refreshを定期実行し確認する
1.Scheduleをクリックします。
定期的に実行することで、GetRecords.IteratorAgeMillisecondsが132秒になった。IncomingRecordsが70,053、GetRecords.Recordsが63,874。インプットのほうがやや多いため、時間の経過とともに増える可能性がある。
IteratorAgeMillisecondsの対応②
- KDSのシャードをさらに増やします。(2→4)
KDSのシャード数の変更は、現在のシャード数の1/2〜2倍の範囲で変更が可能です。
- 実行数を更に増やす
# 実行前
ps -ef | grep random_put_to_kds.py
ssm-user 6830 6751 14 00:22 pts/5 00:02:24 python3 random_put_to_kds.py
ssm-user 6835 6751 14 00:22 pts/5 00:02:25 python3 random_put_to_kds.py
ssm-user 6853 6751 14 00:22 pts/5 00:02:25 python3 random_put_to_kds.py
ssm-user 6856 6751 14 00:22 pts/5 00:02:27 python3 random_put_to_kds.py
ssm-user 6869 6751 14 00:22 pts/5 00:02:26 python3 random_put_to_kds.py
ssm-user 6870 6751 14 00:22 pts/5 00:02:26 python3 random_put_to_kds.py
ssm-user 6877 6751 14 00:22 pts/5 00:02:25 python3 random_put_to_kds.py
ssm-user 6879 6751 14 00:22 pts/5 00:02:25 python3 random_put_to_kds.py
ssm-user 6902 6751 14 00:22 pts/5 00:02:27 python3 random_put_to_kds.py
ssm-user 6911 6751 14 00:22 pts/5 00:02:27 python3 random_put_to_kds.py
ssm-user 6912 6751 14 00:22 pts/5 00:02:26 python3 random_put_to_kds.py
ssm-user 6919 6751 14 00:22 pts/5 00:02:29 python3 random_put_to_kds.py
ssm-user 6929 6751 14 00:22 pts/5 00:02:26 python3 random_put_to_kds.py
ssm-user 6953 6751 14 00:22 pts/5 00:02:30 python3 random_put_to_kds.py
ssm-user 7225 6751 14 00:24 pts/5 00:02:09 python3 random_put_to_kds.py
ssm-user 7226 6751 15 00:24 pts/5 00:02:10 python3 random_put_to_kds.py
ssm-user 7228 6751 14 00:24 pts/5 00:02:06 python3 random_put_to_kds.py
ssm-user 7231 6751 14 00:24 pts/5 00:02:10 python3 random_put_to_kds.py
ssm-user 7232 6751 15 00:24 pts/5 00:02:10 python3 random_put_to_kds.py
ssm-user 7351 6751 0 00:39 pts/5 00:00:00 grep random_put_to_kds.py
$
$ ps -ef | grep random_put_to_kds.py | wc -l
20
# 実行後
## 42まで増やしました.
$ ps -ef | grep random_put_to_kds.py | wc -l
42
$
RedshiftのDistkeyを見直す
- 成功率が下がった。(プロデューサーの失敗が増えた)
- 流入量は、バイト数は増えていないが、レコード数は増えた。
- 滞留時間は増え、コンシューマの性能が追いついていないことがわかる。
項目 | 変更前 | 変更後 | 変化点 | |
---|---|---|---|---|
プロデューサー | PutRecord.Success | 1 | 0.99589393396 | 成功率は下がった。 |
WriteProvisionedThroughputExceeded | 0 | 0.00410606604 | 閾値エラーが増えた。 | |
PutRecord.Latency | 188,079 | 1,011,896 | ||
IncomingBytes | 159.369530033 | 159.373077409 | 流入量(バイト数)は、ほぼ変わらない。 | |
PutRecord.Bytes | 159.369530033 | 159.373077409 | 流入量(バイト数)は、ほぼ変わらない。 | |
IncomingRecords | 32,747 | 158,380 | 流入量(レコード)は増えた。 | |
コンシューマー | GetRecords.Success | 6 | 16 | 流量が増えて成功数も増えた。 (つまり、変更前はコンシューマーに余裕があった) |
ReadProvisionedThroughputExceeded | 0 | 0 | ||
GetRecords.IteratorAge | 104 | 394 | ||
GetRecords.IteratorAgeMilliseconds | 104,000 | 394,000 | 滞留時間が増えた(4倍程度) | |
GetRecords.Latency | 2,131 | 4,791 | 取得時間が延びた | |
GetRecords.Bytes | 5,230,379 | 24,894,547 | 流出量が増えた | |
GetRecords.Records | 32,819 | 156,206 | 流出量が増えた |
3.コンシューマ(Redshift)の並列度(スライス数)を増やしてIteratorAgeMillisecondsを改善します。まずは、クラスタのノード数とノードタイプを確認します。
現在、以下のノード数とノードタイプから、スライス数は、
ra3.4xlargeのvCPU(12)*ノード数が2なので、=24スライスと考える。
- ノードタイプは、ra3.4xlarge
- ノード数が2
こう考えると、Kinesisのパーティション数を24まで増やしても良いかもしれない。
4.現在のテーブルのDISTKEYやSORTKEYの状況を確認します。
select * from svv_table_info where schema='report'
---
database,schema,table_id,table,encoded,diststyle,sortkey1,max_varchar,sortkey1_enc,sortkey_num,size,pct_used,empty,unsorted,stats_off,tbl_rows,skew_sortkey1,skew_rows,estimated_visible_rows,risk_event,vacuum_sort_benefit
sample,report,110106,mv_tbl__m_kds_connect__0,Y,KEY(num_rec),,256,,0,5624,0.0021,0,,5.00,84772425,,100.00,84772448,,
- num_recというKEYでKEY分散されている。
- num_recはマテビューの内部的なカラムだろうか??
- SORTKEYは指定されていない
- 現在のサイズは、5.6GB程度、行数は84,772, 425件。
5.マテビューの更新を停止します。(スケジュールを停止します。)
6.マテビューでは、alter文でdistkeyとsortkeywを設定できないようなので、再作成して様子を見ます。
- 現distkeyは、num_recというキーでしたが、24スライス以上に分散されるキー(sequencenumber)を指定してみます。
# Drop
DROP MATERIALIZED VIEW report.m_kds_connect
# Create(再作成)
CREATE MATERIALIZED VIEW report.m_kds_connect
DISTSTYLE KEY
DISTKEY(4)
SORTKEY(1)
AS
SELECT ApproximateArrivalTimestamp
,partitionkey
,shardid
,sequencenumber
,FROM_VARBYTE(data, 'utf-8')::SUPER AS payload
FROM ext_kinesis."Kinesis Data Stream名"
WHERE is_utf8(data) AND is_valid_json(from_varbyte(data, 'utf-8'));
7.マテビューをリフレッシュします。
REFRESH MATERIALIZED VIEW report.m_kds_connect
# ロック状態を確認
select * from SVV_TRANSACTIONS where lock_mode='ExclusiveLock' and granted='true'
---
txn_owner,txn_db,xid,pid,txn_start,lock_mode,lockable_object_type,relation,granted
IAM:XXX,sample,665275,1075274149,2022-10-09 07:58:41.341266,ExclusiveLock,transactionid,,true
IAM:XXX,sample,665320,1074158295,2022-10-09 07:59:38.388461,ExclusiveLock,transactionid,,true
# 実行中のクエリを確認
select pid, starttime, duration,
trim(user_name) as user,
trim (query) as querytxt
from stv_recents
where status = 'Running';
---
pid,starttime,duration,user,querytxt
1075274149,2022-10-09 07:58:41.205075,141644262,IAM:XXX,REFRESH MATERIALIZED VIEW report.m_kds_connect
Distkeyの見直し前後でIteratorAgeMillisecondsは変わらず。
項目 | 変更前 | 変更後 | 変化点 | |
---|---|---|---|---|
プロデューサー | PutRecord.Success | 0.99589393396 | 0.99534917425 | |
WriteProvisionedThroughputExceeded | 0.00410606604 | 0.00465082575 | ||
PutRecord.Latency | 1,011,896 | 917,820 | ||
IncomingBytes | 159.373077409 | 159.370078427 | ||
PutRecord.Bytes | 159.373077409 | 159.370078427 | ||
IncomingRecords | 158,380 | 151,095 | ||
コンシューマー | GetRecords.Success | 16 | 16 | |
ReadProvisionedThroughputExceeded | 0 | 0 | ||
GetRecords.IteratorAge | 394 | 371 | ||
GetRecords.IteratorAgeMilliseconds | 394,000 | 371,000 | それほど、変わらない。 | |
GetRecords.Latency | 4,791 | 3,278 | ||
GetRecords.Bytes | 24,894,547 | 23,357,850 | ||
GetRecords.Records | 156,206 | 146,559 |
KDSのシャード数を見なおす
Redshiftのスライス数が24なので、同様のシャード数とする。
参考