Pythonと boto3
によるAmazon Kinesis Streamへのレコードの書き込みを、対話型シェル上でステップバイステップで実行してみました。
前提
- APIアクセス用の認証情報がAWS CLIでデフォルトプロファイルまたは名前付きプロファイルに格納されていること
- Pythonがインストールされていること(ここではAnaconda 2020.02 for Windows Installerを使用しました)
- boto3がインストールされていること
手順
- Boto3(AWS SDK for Python)のインポート
- Kinesisのクライアントの生成
- Streamの取得
- Streamにレコード書込み
- 結果確認
実行
boto3
をインポートしKinesisクライアントを生成します。デフォルトプロファイルを使うときは、2行目は client = boto3.client("kinesis")
でいいはず。
>>> import boto3
>>> client = boto3.session.Session(profile_name="プロファイル名").client("kinesis")
レコードを書込む際にはストリーム名を指定します。まずlist_streams()
でストリームの取得とストリーム名の確認します。ストリーム名がリストで格納されているので、先頭のものを確認することにします。
>>> streams = client.list_streams()
>>> streams["StreamNames"][0]
レコードの書込みにはput_record()
を使います。必須の引数として、以下を指定します。
-
StreamName
… 書込み先のストリーム名を指定します。ここでは上で取得したストリーム名を指定します。 -
Data
… 格納したいデータをBLOB形式で指定します。ここでは変数data
に入っている任意のデータをJSON
形式で指定することにします。 -
PartitionKey
… 任意の文字列を指定します。詳細は「パーティションキー」を参照ください。ここではランダム値を指定することにします。
JSON形式への返還とランダム値の取得のため、 json
と random
モジュールもインポートしました。
>>> import json
>>> import random
>>> result = client.put_record(StreamName=streams["StreamNames"][0], PartitionKey=str(random.randrange(0,100)), Data=json.dumps(data))
成功していれば result
にはレコードが書き込まれた先のシャードIDとシーケンス番号が設定されています。
>>> result['ShardId'], result['SequenceNumber']
これでレコードを書込むことができました。
複数レコードをまとめて書込む
put_record()のかわりにputs_record()を使うと、一度に最大500件までのレコードを書込むことができます。手順は次のようになります。
- Boto3(AWS SDK for Python)のインポート
- Kinesisのクライアントの生成
- Streamの取得
- レコードのリストの生成
- Streamにレコード書込み
- 結果確認
boto3
のインポート、Kinesisクライアントの生成、ストリームの取得までは上と同じ手順です。また今回も json
random
モジュールも使用するので、ここでインポートしておくことにします。
>>> import json
>>> import random
>>> import boto3
>>> client = boto3.session.Session(profile_name="プロファイル名").client("kinesis")
>>> streams = client.list_streams()
>>> streams["StreamNames"][0]
puts_record()
では複数の書込み対象をリストで指定するので、あらかじめ形式を整えたリストを作成しておくことにします。上記同様の Data
と PartitionKey
を持つ辞書のリストですが、 StreamName
は不要です。
以下ではデータのリスト datas
変数から records
に変換することにします。変換をまとめて行うため、変換内容をlambdaで無名関数として定義しておき、map()
関数で datas
全体に適用し、レコード数と1レコード目の内容を表示してみています。
>>> converter = lambda data: { "PartitionKey": str(random.randrange(0,100)), "Data": json.dumps(data) }
>>> records = list(map(converter, datas))
>>> len(records, records[0])
レコードの書込みにはput_records()
を使います。引数として StreamName
で書き込み先ストリーム名、 Records
で書込むレコードのリストを指定します。
>>> results = client.put_records(StreamName=streams["StreamNames"][0], Records=records)
results['FailedRecordCount']
を確認します。失敗件数が格納されるので、全件成功していれば 0
が入っています。
>>> results['FailedRecordCount']
1以上の値が入っている時は、その件数分だけ失敗したレコードがあります。この時はレコードのリスト records
と結果のリスト results['Records']
を突き合わせて、特に結果に含まれる ErrorCode
が ProvisionedThroughputExceededException
か InternalFailure
かを確認するとよいようです。
>>> for record, result in zip(records, results['Records']):
... if result['ShardId']:
... continue
... print(record, result)
...
まとめ
「スクリプト化する前にステップバイステップでboto3
でのアクセス手順を確認してみたい」というタイミングがあったので、Pythonの対話型シェルでのKinesisのレコード読出し手順を確認、まとめてみました。各段階でどんなデータが取得できているかを確認するのにも役立つと思います。
参考
主に以下を参考にしました。
- AWS SDK for Python | AWS
- Quickstart — Boto 3 Docs 1.12.20 documentation
- Kinesis — Boto 3 Docs 1.12.20 documentation
- Amazon Kinesis Data Streams の用語と概念 - Amazon Kinesis Data Streams - パーティションキー
本ページ内容は筆者が参照の便のためにある時点でまとめた個人的なメモです。内容を保証するものではなく、また筆者の所属組織等とは一切かかわりがありません。