0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

AWSIOT: デバイスデータを Kinesis データストリームに送信

Last updated at Posted at 2021-09-17

data/sample にパブリッシュされたデータを
sample_kinesis_sep17 というKinesis データストリームに送信します。

次の5ステップになります。

  1. IAM でロールを作成

ロール名 role_sep17

  1. IAM でポリシーをロールにアタッチ

Kinesis にアクセスできるポリシーをアタッチ

  1. Kinesis でデータストリームを作成

ストリーム名 kinesis_sep17

  1. IOT でルールを作成

ルール名 rule_sep17

##ロールを作成##

ROLL_NAME="role_sep17"
#
aws iam create-role --role-name $ROLL_NAME --assume-role-policy-document file://iot-role-trust.json
iot-role-trust.json
{
  "Version":"2012-10-17",
  "Statement":[{
      "Effect": "Allow",
      "Principal": {
        "Service": "iot.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
  }]
}

##ポリシーをロールにアタッチ##

ROLE_NAME=role_sep17
#
aws iam attach-role-policy --role-name $ROLE_NAME \
	--policy-arn "arn:aws:iam::aws:policy/AmazonKinesisFullAccess"

##Kinesis データストリームを作成##

STREAM_NAME="kinesis_sep17"
aws kinesis create-stream --stream-name $STREAM_NAME --shard-count 1

##IOT ルールを作成##

RULENAME="rule_sep17"
#
aws iot create-topic-rule --rule-name $RULENAME \
	--topic-rule-payload file://iot_kinesis.json
iot_kinesis.json
{
  "sql": "SELECT * FROM 'data/sample'",
  "ruleDisabled": false,
  "awsIotSqlVersion": "2016-03-23",
  "actions": [
    {
      "kinesis": {
        "roleArn": "arn:aws:iam::123495704257:role/role_sep17",
        "streamName": "kinesis_sep17",
        "partitionKey": "${topic()}"
      }
    }
  ]
}

##テスト##

data/sample にデータをパブリッシュします。
パブリッシュをするスクリプトのサンプルはこちら
AWSIOT: デバイスデータを DynamoDB に保存

ストリームにあるデータの数を数えます。

record_coount.py
#! /usr/bin/python
#
#	record_count.py
#
#					Sep/17/2021
#
import boto3
import sys
#
name_stream='kinesis_sep17'
#
client = boto3.client('kinesis')
response = client.get_shard_iterator(
    StreamName=name_stream,
    ShardId='shardId-000000000000',
    ShardIteratorType='TRIM_HORIZON',
)

#
shared_iterator = response['ShardIterator']
#
response = client.get_records(
    ShardIterator=shared_iterator,
    Limit=10
)
#
llx = len(response['Records'])
sys.stderr.write("llx = %d\n" % llx)
#
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?