AWSのリアルタイムデータストリームといえばKinesisということなので、PythonのAWSライブラリboto
でこれとしゃべってみる。
例によってDockerを使ってみる。
Dockerである必要はないが、開発環境でミニマムな手順まで試行錯誤するのに便利なので、勉強がてら使ってみている。
Dockerfile
Dockerfile
FROM centos:centos7
RUN yum -y install epel-release
RUN yum -y install python-boto
RUN useradd -m aws
COPY files/* /home/aws/
RUN chown -R aws:aws /home/aws/
USER aws
ENV HOME /home/aws/
WORKDIR $HOME
-
Pythonモジュール管理は
pyenv
やvirtualenv
などがよいかもしれない。 -
python-boto 2.34.0-4.el7
を含む新しいEPELを思い切ってとりこんだ。 -
CentOS7の最新で入る
python-boto 2.25.0-2.el7.centos
だと、なぜかKinesisはnot found in endpoints
といわれてしまった(importはできる)
ほかのファイル
コンテナ内で必要になるファイルをCOPY
でコンテナ内にコピーする。
- 解凍が必要なら
ADD
になるが、そうでなければCOPY
で。 - ボリュームなどを使うほうがよいかも。
$ tree -a
.
├── Dockerfile
└── files
├── .boto
└── kinesis-sample.py
IAMで発行したクレデンシャル。
.boto
[Credentials]
aws_access_key_id = YOUR-ACCESS-KEY-ID
aws_secret_access_key = YOUR-SECRET-ACCESS-KEY
Kinesisに接続するスクリプト本体。
- ストリームに適当にputしたデータを10秒後にgetする。
- 適当な名前をつけたストリームをAPIもしくはGUIで先に作っておく必要がある。
kinesis-sample.py
from boto import kinesis
import time
stream_name = 'iot'
conn = kinesis.connect_to_region(region_name = "ap-northeast-1")
shard = conn.put_record(
stream_name,
data = 'DATA',
partition_key = 'partition_key'
)
time.sleep(10)
shard_iterator = conn.get_shard_iterator(
stream_name,
shard_id = shard['ShardId'],
shard_iterator_type = 'AT_SEQUENCE_NUMBER',
starting_sequence_number = shard['SequenceNumber']
)['ShardIterator']
print conn.get_records(shard_iterator)['Records'][0]['Data']
コンテナ作成
docker attach
だと同一コンテナに対して複数セッション(?)貼れないのでdocker exec
を使う。
$ docker build .
..
Successfully built 4132428e3b84
$ docker run -t -d -i 4132428e3b84 bash
20be91d870f7ce4c0712954689981962af6e92f6aeb23efe1fe3acf31f3e6338
$ docker exec -it 20be91d870f7ce4c0712954689981962af6e92f6aeb23efe1fe3acf31f3e6338 bash
[aws@20be91d870f7 aws]$ ls -a
. .. .bash_logout .bash_profile .bashrc .boto kinesis-sample.py
動作確認
[aws@20be91d870f7 aws]$ python kinesis-sample.py
DATA
とりあえず、putしたレコードがとれた。