Overview
kinesis consumer library for python は java 経由で呼ばれるので python 部分のテストが非常にめんどくさい。
ので、 python 部分だけで動かせるように調べた話。
準備
リポジトリは以下。
https://github.com/awslabs/amazon-kinesis-client-python.git
cloneする
$ mkdir kcl-test
$ cd kcl-test
$ git clone https://github.com/awslabs/amazon-kinesis-client-python.git
$ git checkout v2.0.1
virtualenv設定
$ virtualenv .venv
$ . .venv/bin/activate
$ python -V
Python 3.7.2
pip で入る形になってるので install する。
install
$ pip install amazon-kinesis-client-python/
Processing ./amazon-kinesis-client-python
Collecting boto (from amazon-kclpy==2.0.1)
Using cached https://files.pythonhosted.org/packages/23/10/c0b78c27298029e4454a472a1919bde20cb182dab1662cec7f2ca1dcc523/boto-2.49.0-py2.py3-none-any.whl
Collecting argparse (from amazon-kclpy==2.0.1)
Using cached https://files.pythonhosted.org/packages/f2/94/3af39d34be01a24a6e65433d19e107099374224905f1e0cc6bbe1fd22a2f/argparse-1.4.0-py2.py3-none-any.whl
Building wheels for collected packages: amazon-kclpy
Building wheel for amazon-kclpy (setup.py) ... done
Created wheel for amazon-kclpy: filename=amazon_kclpy-2.0.1-cp37-none-any.whl size=20257392 sha256=7f26793030f6fa59e51bd73cccf2ac8a91f3a1e0a090021eb334612cfcd9ffce
Stored in directory: /Users/uu147969/Library/Caches/pip/wheels/0e/31/54/e1def72cdc7c05e171e76e1886a58f5414d1f03e0728c4795f
Successfully built amazon-kclpy
Installing collected packages: boto, argparse, amazon-kclpy
Successfully installed amazon-kclpy-2.0.1 argparse-1.4.0 boto-2.49.0
データを整形
kinesis streams の中身のデータはこんな形になってる。
cf. https://docs.aws.amazon.com/ja_jp/streams/latest/dev/fundamental-stream.html#put-record
{
"Records": [
{
"Data": "dGVzdGRhdGE=",
"PartitionKey": 123,
"ApproximateArrivalTimestamp": 1441215410.867,
"SequenceNumber": "49544985256907370027570885864065577703022652638596431874"
}
],
"MillisBehindLatest": 24000,
"NextShardIterator": "AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
}
これを、 kcl で渡されてくる形に変換する。
convert.jq
{
action: "processRecords",
records: .Records
| map({
data: .Data,
partitionKey: .PartitionKey,
sequenceNumber: .SequenceNumber,
approximateArrivalTimestamp: .ApproximateArrivalTimestamp|floor,
subSequenceNumber: 0,
action: "record"
}
),
millisBehindLatest: .MillisBehindLatest
}
$ cat data.json | jq -f convert.jq
{
"action": "processRecords",
"records": [
{
"data": "dGVzdGRhdGE=",
"partitionKey": 123,
"sequenceNumber": "49544985256907370027570885864065577703022652638596431874",
"approximateArrivalTimestamp": 1441215410,
"subSequenceNumber": 0,
"action": "record"
}
],
"millisBehindLatest": 24000
}
コード
全部実装するとけっこうめんどくさいので、 付属の sample を拝借する。
$ mkdir tests
$ cd tests
$ cp ../amazon-kinesis-client-python/samples/sample_kclpy_app.py .
細かい説明は端折るので気になるなら中身見てください。
sample.py
import json
from amazon_kclpy import kcl
from amazon_kclpy.messages import InitializeInput
import sample_kclpy_app
initialize_input_json = """
{
"shardId": "shardId-000000000000",
"sequenceNumber": "TRIM_HORIZON",
"subSequenceNumber": 0,
"action": "initialize"
}
"""
class RecordProcessor(sample_kclpy_app.RecordProcessor):
def __init__(self):
super(RecordProcessor, self).__init__()
def initialize(self, initialize_input):
self._shard_id = initialize_input.shard_id
super().initialize(initialize_input)
@property
def shard_id(self):
return self._shard_id
def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
print(locals())
if __name__ == "__main__":
initial_input = InitializeInput(json.loads(initialize_input_json))
proc = RecordProcessor()
proc.initialize(initial_input)
kcl_process = kcl.KCLProcess(proc)
kcl_process.run()
動かす
こんな配置になっていると思うので、
$ tree .. --charset unicode
..
|-- amazon-kinesis-client-python
(..snip..)
`-- tests
|-- convert.jq
|-- data.json
|-- sample.py
`-- sample_kclpy_app.py
こう。
$ cat data.json | jq -c -f convert.jq | python sample.py
{'self': <__main__.RecordProcessor object at 0x109f4d390>, 'data': b'testdata', 'partition_key': 123, 'sequence_number': 49544985256907370027570885864065577703022652638596431874, 'sub_sequence_number': 0}
{"action": "status", "responseFor": "processRecords"}
おわり。