次のページを参考にしました。
Boto3 documentation -> Kinesis
ストリームを作成
create_stream.py
#! /usr/bin/python
#
# create_stream.py
#
# Sep/16/2021
#
import boto3
#
name_stream='sample_sep16'
#
client = boto3.client('kinesis')
response = client.create_stream(
StreamName=name_stream,
ShardCount=1
)
#
ストリームをリスト
list_streams.py
#! /usr/bin/python
#
# list_streams.py
#
# Sep/16/2021
#
import boto3
import sys
#
client = boto3.client('kinesis')
streams = client.list_streams()
llx = len(streams["StreamNames"])
sys.stderr.write("llx = %d\n" % llx)
for it in range(llx):
print(streams["StreamNames"][it])
詳細を表示
describe_stream.py
#! /usr/bin/python
#
# describe_stream.py
#
# Sep/16/2021
#
import boto3
#
name_stream='sample_sep16'
#
client = boto3.client('kinesis')
response = client.describe_stream_summary(
StreamName=name_stream
)
#
print(response)
レコードを入れる
put_record.py
#! /usr/bin/python
#
# put_record.py
#
# Sep/17/2021
#
import boto3
#
name_stream='sample_sep16'
#
client = boto3.client('kinesis')
response = client.put_record(
StreamName=name_stream,
Data=b'testdata-aa',
PartitionKey='123'
)
print(response)
#
レコードを取り出す
get_record.py
#! /usr/bin/python
#
# get_record.py
#
# Sep/17/2021
#
import boto3
#
name_stream='sample_sep16'
#
client = boto3.client('kinesis')
response = client.get_shard_iterator(
StreamName=name_stream,
ShardId='shardId-000000000000',
ShardIteratorType='TRIM_HORIZON',
)
print(response)
print()
#
shared_iterator = response['ShardIterator']
#
response = client.get_records(
ShardIterator=shared_iterator,
Limit=10
)
#
print(response)
print()
#
ストリームの削除
delete_stream.py
#! /usr/bin/python
#
# delete_stream.py
#
# Sep/16/2021
#
import boto3
#
name_stream='sample_sep16'
#
client = boto3.client('kinesis')
response = client.delete_stream(
StreamName=name_stream,
EnforceConsumerDeletion=True
)
#