data/sample にパブリッシュされたデータを
sample_kinesis_sep17 というKinesis データストリームに送信します。
次の5ステップになります。
- IAM でロールを作成
ロール名 role_sep17
- IAM でポリシーをロールにアタッチ
Kinesis にアクセスできるポリシーをアタッチ
- Kinesis でデータストリームを作成
ストリーム名 kinesis_sep17
- IOT でルールを作成
ルール名 rule_sep17
##ロールを作成##
ROLL_NAME="role_sep17"
#
aws iam create-role --role-name $ROLL_NAME --assume-role-policy-document file://iot-role-trust.json
iot-role-trust.json
{
"Version":"2012-10-17",
"Statement":[{
"Effect": "Allow",
"Principal": {
"Service": "iot.amazonaws.com"
},
"Action": "sts:AssumeRole"
}]
}
##ポリシーをロールにアタッチ##
ROLE_NAME=role_sep17
#
aws iam attach-role-policy --role-name $ROLE_NAME \
--policy-arn "arn:aws:iam::aws:policy/AmazonKinesisFullAccess"
##Kinesis データストリームを作成##
STREAM_NAME="kinesis_sep17"
aws kinesis create-stream --stream-name $STREAM_NAME --shard-count 1
##IOT ルールを作成##
RULENAME="rule_sep17"
#
aws iot create-topic-rule --rule-name $RULENAME \
--topic-rule-payload file://iot_kinesis.json
iot_kinesis.json
{
"sql": "SELECT * FROM 'data/sample'",
"ruleDisabled": false,
"awsIotSqlVersion": "2016-03-23",
"actions": [
{
"kinesis": {
"roleArn": "arn:aws:iam::123495704257:role/role_sep17",
"streamName": "kinesis_sep17",
"partitionKey": "${topic()}"
}
}
]
}
##テスト##
data/sample にデータをパブリッシュします。
パブリッシュをするスクリプトのサンプルはこちら
AWSIOT: デバイスデータを DynamoDB に保存
ストリームにあるデータの数を数えます。
record_coount.py
#! /usr/bin/python
#
# record_count.py
#
# Sep/17/2021
#
import boto3
import sys
#
name_stream='kinesis_sep17'
#
client = boto3.client('kinesis')
response = client.get_shard_iterator(
StreamName=name_stream,
ShardId='shardId-000000000000',
ShardIteratorType='TRIM_HORIZON',
)
#
shared_iterator = response['ShardIterator']
#
response = client.get_records(
ShardIterator=shared_iterator,
Limit=10
)
#
llx = len(response['Records'])
sys.stderr.write("llx = %d\n" % llx)
#