0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Python3の対話シェル上でKinesis Streams上のレコードを読みだす

Last updated at Posted at 2020-03-14

Pythonと boto3 によるAmazon Kinesis Streamからのレコードの読みだしを、対話型シェル上でステップバイステップで実行してみました。現在のレコードをちょっと確認してみたいと思って確認した際のメモです。

前提

手順

  1. Boto3(AWS SDK for Python)のインポート
  2. Kinesisのクライアントの生成
  3. Streamの取得
  4. StreamのShardの取得
  5. StreamのShardのイテレータの取得
  6. イテレータからのレコード取得

実行

boto3 をインポートしKinesisクライアントを生成します。デフォルトプロファイルを使うときは、2行目は client = boto3.client("kinesis") でいいはず。

>>> import boto3
>>> client = boto3.session.Session(profile_name="プロファイル名").client("kinesis")

レコードを取得する際にはイテレータが必要、イテレータを取得するためにはストリーム名、シャードID、等が必要、ということで順に取得していきます。

まずlist_streams()でストリームの取得とストリーム名の確認。ストリーム名がリストで格納されているので、先頭のものを確認することにします。

>>> streams = client.list_streams()
>>> streams["StreamNames"][0]

次にlist_shards()を使ってシャードの取得とシャードIDの確認。引数としてストリーム名を指定するので、先ほど取得したストリーム名の最初のものを使用します。シャードがリストで格納されているので、先頭のもののシャードIDを確認することにします。

>>> shards = client.list_shards(StreamName=streams["StreamNames"][0])
>>> shards["Shards"][0]["ShardId"]

これでget_shard_iterator()に必要な情報が集まりました。レコードを読み取るためのイテレータを取得します。ShardIteratorTypeについては公式ドキュメント(現時点で英語版のみ)またはDevelopers.IOの記事などをご参照。

>>> iter = client.get_shard_iterator(StreamName=streams["StreamNames"][0], ShardId=shards["Shards"][0]["ShardId"], ShardIteratorType='TRIM_HORIZON')

これでget_records()、このイテレータからレコードを読みだせます。レコードがリストで格納されているので、レコード数と先頭レコードの内容を確認することにします。

>>> records = client.get_records(ShardIterator=iter['ShardIterator'])
>>> len(records["Records"])
(件数が表示される)
>>> records["Records"][0]
(レコードの内容が表示される)

これでレコードを読みだすことができました。

イテレータの有効期間

イテレータの有効期間は取得後5分間です。

A shard iterator expires 5 minutes after it is returned to the requester.
(GetShardIterator - Amazon Kinesis Data Streams Service)

5分超が経ってから get_records で使おうとすると botocore.errorfactory.ExpiredIteratorException の例外が発生して失敗します。取得後はお早めにお使いください。

まとめ

「Kinesisのレコードをちょっと確認してみたい」「スクリプト化する前にステップバイステップでboto3でのアクセス手順を確認してみたい」というタイミングがあったので、Pythonの対話型シェルでのKinesisのレコード読出し手順を確認、まとめてみました。上の説明では、ストリーム名やシャードIDのような手順上必要な情報だけを確認していますが、各段階でどんなデータが取得できているかを確認するのにも役立つと思います。

参考

主に以下を参考にしました。


本ページ内容は筆者が参照の便のためにある時点でまとめた個人的なメモです。内容を保証するものではなく、また筆者の所属組織等とは一切かかわりがありません。

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?