Pythonと boto3
によるAmazon Kinesis Streamからのレコードの読みだしを、対話型シェル上でステップバイステップで実行してみました。現在のレコードをちょっと確認してみたいと思って確認した際のメモです。
前提
- APIアクセス用の認証情報がAWS CLIでデフォルトプロファイルまたは名前付きプロファイルに格納されていること
- Pythonがインストールされていること(ここではAnaconda 2020.02 for Windows Installerを使用しました)
- boto3がインストールされていること
手順
- Boto3(AWS SDK for Python)のインポート
- Kinesisのクライアントの生成
- Streamの取得
- StreamのShardの取得
- StreamのShardのイテレータの取得
- イテレータからのレコード取得
実行
boto3
をインポートしKinesisクライアントを生成します。デフォルトプロファイルを使うときは、2行目は client = boto3.client("kinesis")
でいいはず。
>>> import boto3
>>> client = boto3.session.Session(profile_name="プロファイル名").client("kinesis")
レコードを取得する際にはイテレータが必要、イテレータを取得するためにはストリーム名、シャードID、等が必要、ということで順に取得していきます。
まずlist_streams()
でストリームの取得とストリーム名の確認。ストリーム名がリストで格納されているので、先頭のものを確認することにします。
>>> streams = client.list_streams()
>>> streams["StreamNames"][0]
次にlist_shards()
を使ってシャードの取得とシャードIDの確認。引数としてストリーム名を指定するので、先ほど取得したストリーム名の最初のものを使用します。シャードがリストで格納されているので、先頭のもののシャードIDを確認することにします。
>>> shards = client.list_shards(StreamName=streams["StreamNames"][0])
>>> shards["Shards"][0]["ShardId"]
これでget_shard_iterator()
に必要な情報が集まりました。レコードを読み取るためのイテレータを取得します。ShardIteratorType
については公式ドキュメント(現時点で英語版のみ)またはDevelopers.IOの記事などをご参照。
>>> iter = client.get_shard_iterator(StreamName=streams["StreamNames"][0], ShardId=shards["Shards"][0]["ShardId"], ShardIteratorType='TRIM_HORIZON')
これでget_records()
、このイテレータからレコードを読みだせます。レコードがリストで格納されているので、レコード数と先頭レコードの内容を確認することにします。
>>> records = client.get_records(ShardIterator=iter['ShardIterator'])
>>> len(records["Records"])
(件数が表示される)
>>> records["Records"][0]
(レコードの内容が表示される)
これでレコードを読みだすことができました。
イテレータの有効期間
イテレータの有効期間は取得後5分間です。
A shard iterator expires 5 minutes after it is returned to the requester.
(GetShardIterator - Amazon Kinesis Data Streams Service)
5分超が経ってから get_records
で使おうとすると botocore.errorfactory.ExpiredIteratorException
の例外が発生して失敗します。取得後はお早めにお使いください。
まとめ
「Kinesisのレコードをちょっと確認してみたい」「スクリプト化する前にステップバイステップでboto3
でのアクセス手順を確認してみたい」というタイミングがあったので、Pythonの対話型シェルでのKinesisのレコード読出し手順を確認、まとめてみました。上の説明では、ストリーム名やシャードIDのような手順上必要な情報だけを確認していますが、各段階でどんなデータが取得できているかを確認するのにも役立つと思います。
参考
主に以下を参考にしました。
- AWS SDK for Python | AWS
- Quickstart — Boto 3 Docs 1.12.20 documentation
- Kinesis — Boto 3 Docs 1.12.20 documentation
- GetShardIterator - Amazon Kinesis Data Streams Service
- [新機能]Kinesis Streamsが時刻ベースのイテレーターに対応しました | Developers.IO
本ページ内容は筆者が参照の便のためにある時点でまとめた個人的なメモです。内容を保証するものではなく、また筆者の所属組織等とは一切かかわりがありません。