0
Help us understand the problem. What are the problem?

More than 1 year has passed since last update.

posted at

updated at

kcl for python を python だけで動かす

Overview

kinesis consumer library for python は java 経由で呼ばれるので python 部分のテストが非常にめんどくさい。
ので、 python 部分だけで動かせるように調べた話。

準備

リポジトリは以下。
https://github.com/awslabs/amazon-kinesis-client-python.git

cloneする
$ mkdir kcl-test
$ cd kcl-test

$ git clone https://github.com/awslabs/amazon-kinesis-client-python.git
$ git checkout v2.0.1
virtualenv設定
$ virtualenv .venv
$ . .venv/bin/activate
$ python -V
Python 3.7.2

pip で入る形になってるので install する。

install
$ pip install amazon-kinesis-client-python/
Processing ./amazon-kinesis-client-python
Collecting boto (from amazon-kclpy==2.0.1)
  Using cached https://files.pythonhosted.org/packages/23/10/c0b78c27298029e4454a472a1919bde20cb182dab1662cec7f2ca1dcc523/boto-2.49.0-py2.py3-none-any.whl
Collecting argparse (from amazon-kclpy==2.0.1)
  Using cached https://files.pythonhosted.org/packages/f2/94/3af39d34be01a24a6e65433d19e107099374224905f1e0cc6bbe1fd22a2f/argparse-1.4.0-py2.py3-none-any.whl
Building wheels for collected packages: amazon-kclpy
  Building wheel for amazon-kclpy (setup.py) ... done
  Created wheel for amazon-kclpy: filename=amazon_kclpy-2.0.1-cp37-none-any.whl size=20257392 sha256=7f26793030f6fa59e51bd73cccf2ac8a91f3a1e0a090021eb334612cfcd9ffce
  Stored in directory: /Users/uu147969/Library/Caches/pip/wheels/0e/31/54/e1def72cdc7c05e171e76e1886a58f5414d1f03e0728c4795f
Successfully built amazon-kclpy
Installing collected packages: boto, argparse, amazon-kclpy
Successfully installed amazon-kclpy-2.0.1 argparse-1.4.0 boto-2.49.0

データを整形

kinesis streams の中身のデータはこんな形になってる。

cf. https://docs.aws.amazon.com/ja_jp/streams/latest/dev/fundamental-stream.html#put-record

{
  "Records": [
    {
      "Data": "dGVzdGRhdGE=",
      "PartitionKey": 123,
      "ApproximateArrivalTimestamp": 1441215410.867,
      "SequenceNumber": "49544985256907370027570885864065577703022652638596431874"
    }
  ],
  "MillisBehindLatest": 24000,
  "NextShardIterator": "AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
}

これを、 kcl で渡されてくる形に変換する。

convert.jq
{
  action: "processRecords",
  records: .Records
    | map({
        data: .Data,
        partitionKey: .PartitionKey,
        sequenceNumber: .SequenceNumber,
        approximateArrivalTimestamp: .ApproximateArrivalTimestamp|floor,
        subSequenceNumber: 0,
        action: "record"
      }
    ),
  millisBehindLatest: .MillisBehindLatest
}
$ cat data.json | jq -f convert.jq
{
  "action": "processRecords",
  "records": [
    {
      "data": "dGVzdGRhdGE=",
      "partitionKey": 123,
      "sequenceNumber": "49544985256907370027570885864065577703022652638596431874",
      "approximateArrivalTimestamp": 1441215410,
      "subSequenceNumber": 0,
      "action": "record"
    }
  ],
  "millisBehindLatest": 24000
}

コード

全部実装するとけっこうめんどくさいので、 付属の sample を拝借する。

$ mkdir tests
$ cd tests

$ cp ../amazon-kinesis-client-python/samples/sample_kclpy_app.py .

細かい説明は端折るので気になるなら中身見てください。

sample.py
import json

from amazon_kclpy import kcl
from amazon_kclpy.messages import InitializeInput
import sample_kclpy_app

initialize_input_json = """
{
  "shardId": "shardId-000000000000",
  "sequenceNumber": "TRIM_HORIZON",
  "subSequenceNumber": 0,
  "action": "initialize"
}
"""

class RecordProcessor(sample_kclpy_app.RecordProcessor):
    def __init__(self):
        super(RecordProcessor, self).__init__()

    def initialize(self, initialize_input):
        self._shard_id = initialize_input.shard_id
        super().initialize(initialize_input)

    @property
    def shard_id(self):
        return self._shard_id

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        print(locals())


if __name__ == "__main__":
    initial_input = InitializeInput(json.loads(initialize_input_json))
    proc = RecordProcessor()
    proc.initialize(initial_input)

    kcl_process = kcl.KCLProcess(proc)
    kcl_process.run()

動かす

こんな配置になっていると思うので、

$ tree .. --charset unicode
..
|-- amazon-kinesis-client-python
    (..snip..)
`-- tests
    |-- convert.jq
    |-- data.json
    |-- sample.py
    `-- sample_kclpy_app.py

こう。

$ cat data.json | jq -c -f convert.jq | python sample.py
{'self': <__main__.RecordProcessor object at 0x109f4d390>, 'data': b'testdata', 'partition_key': 123, 'sequence_number': 49544985256907370027570885864065577703022652638596431874, 'sub_sequence_number': 0}

{"action": "status", "responseFor": "processRecords"}

おわり。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Sign upLogin
0
Help us understand the problem. What are the problem?