1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Python3の対話シェル上でKinesis Streamsにレコードを書き込む

Last updated at Posted at 2020-03-14

Pythonと boto3 によるAmazon Kinesis Streamへのレコードの書き込みを、対話型シェル上でステップバイステップで実行してみました。

前提

手順

  1. Boto3(AWS SDK for Python)のインポート
  2. Kinesisのクライアントの生成
  3. Streamの取得
  4. Streamにレコード書込み
  5. 結果確認

実行

boto3 をインポートしKinesisクライアントを生成します。デフォルトプロファイルを使うときは、2行目は client = boto3.client("kinesis") でいいはず。

>>> import boto3
>>> client = boto3.session.Session(profile_name="プロファイル名").client("kinesis")

レコードを書込む際にはストリーム名を指定します。まずlist_streams()でストリームの取得とストリーム名の確認します。ストリーム名がリストで格納されているので、先頭のものを確認することにします。

>>> streams = client.list_streams()
>>> streams["StreamNames"][0]

レコードの書込みにはput_record()を使います。必須の引数として、以下を指定します。

  • StreamName … 書込み先のストリーム名を指定します。ここでは上で取得したストリーム名を指定します。
  • Data … 格納したいデータをBLOB形式で指定します。ここでは変数 data に入っている任意のデータを JSON 形式で指定することにします。
  • PartitionKey … 任意の文字列を指定します。詳細は「パーティションキー」を参照ください。ここではランダム値を指定することにします。

JSON形式への返還とランダム値の取得のため、 jsonrandom モジュールもインポートしました。

>>> import json
>>> import random
>>> result = client.put_record(StreamName=streams["StreamNames"][0], PartitionKey=str(random.randrange(0,100)), Data=json.dumps(data))

成功していれば result にはレコードが書き込まれた先のシャードIDとシーケンス番号が設定されています。

>>> result['ShardId'], result['SequenceNumber']

これでレコードを書込むことができました。

複数レコードをまとめて書込む

put_record()のかわりにputs_record()を使うと、一度に最大500件までのレコードを書込むことができます。手順は次のようになります。

  1. Boto3(AWS SDK for Python)のインポート
  2. Kinesisのクライアントの生成
  3. Streamの取得
  4. レコードのリストの生成
  5. Streamにレコード書込み
  6. 結果確認

boto3のインポート、Kinesisクライアントの生成、ストリームの取得までは上と同じ手順です。また今回も json random モジュールも使用するので、ここでインポートしておくことにします。

>>> import json
>>> import random
>>> import boto3
>>> client = boto3.session.Session(profile_name="プロファイル名").client("kinesis")
>>> streams = client.list_streams()
>>> streams["StreamNames"][0]

puts_record() では複数の書込み対象をリストで指定するので、あらかじめ形式を整えたリストを作成しておくことにします。上記同様の DataPartitionKey を持つ辞書のリストですが、 StreamName は不要です。

以下ではデータのリスト datas 変数から records に変換することにします。変換をまとめて行うため、変換内容をlambdaで無名関数として定義しておき、map() 関数で datas 全体に適用し、レコード数と1レコード目の内容を表示してみています。

>>> converter = lambda data: { "PartitionKey": str(random.randrange(0,100)), "Data": json.dumps(data) }
>>> records = list(map(converter, datas))
>>> len(records, records[0])

レコードの書込みにはput_records()を使います。引数として StreamName で書き込み先ストリーム名、 Records で書込むレコードのリストを指定します。

>>> results = client.put_records(StreamName=streams["StreamNames"][0], Records=records)

results['FailedRecordCount'] を確認します。失敗件数が格納されるので、全件成功していれば 0 が入っています。

>>> results['FailedRecordCount']

1以上の値が入っている時は、その件数分だけ失敗したレコードがあります。この時はレコードのリスト records と結果のリスト results['Records'] を突き合わせて、特に結果に含まれる ErrorCodeProvisionedThroughputExceededExceptionInternalFailure かを確認するとよいようです。

>>> for record, result in zip(records, results['Records']):
...   if result['ShardId']:
...     continue
...   print(record, result)
...   

まとめ

「スクリプト化する前にステップバイステップでboto3でのアクセス手順を確認してみたい」というタイミングがあったので、Pythonの対話型シェルでのKinesisのレコード読出し手順を確認、まとめてみました。各段階でどんなデータが取得できているかを確認するのにも役立つと思います。

参考

主に以下を参考にしました。


本ページ内容は筆者が参照の便のためにある時点でまとめた個人的なメモです。内容を保証するものではなく、また筆者の所属組織等とは一切かかわりがありません。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?