LoginSignup
1
0

More than 3 years have passed since last update.

kcl for python を python だけで動かす

Last updated at Posted at 2019-08-26

Overview

kinesis consumer library for python は java 経由で呼ばれるので python 部分のテストが非常にめんどくさい。
ので、 python 部分だけで動かせるように調べた話。

準備

リポジトリは以下。
https://github.com/awslabs/amazon-kinesis-client-python.git

cloneする
$ mkdir kcl-test
$ cd kcl-test

$ git clone https://github.com/awslabs/amazon-kinesis-client-python.git
$ git checkout v2.0.1
virtualenv設定
$ virtualenv .venv
$ . .venv/bin/activate
$ python -V
Python 3.7.2

pip で入る形になってるので install する。

install
$ pip install amazon-kinesis-client-python/
Processing ./amazon-kinesis-client-python
Collecting boto (from amazon-kclpy==2.0.1)
  Using cached https://files.pythonhosted.org/packages/23/10/c0b78c27298029e4454a472a1919bde20cb182dab1662cec7f2ca1dcc523/boto-2.49.0-py2.py3-none-any.whl
Collecting argparse (from amazon-kclpy==2.0.1)
  Using cached https://files.pythonhosted.org/packages/f2/94/3af39d34be01a24a6e65433d19e107099374224905f1e0cc6bbe1fd22a2f/argparse-1.4.0-py2.py3-none-any.whl
Building wheels for collected packages: amazon-kclpy
  Building wheel for amazon-kclpy (setup.py) ... done
  Created wheel for amazon-kclpy: filename=amazon_kclpy-2.0.1-cp37-none-any.whl size=20257392 sha256=7f26793030f6fa59e51bd73cccf2ac8a91f3a1e0a090021eb334612cfcd9ffce
  Stored in directory: /Users/uu147969/Library/Caches/pip/wheels/0e/31/54/e1def72cdc7c05e171e76e1886a58f5414d1f03e0728c4795f
Successfully built amazon-kclpy
Installing collected packages: boto, argparse, amazon-kclpy
Successfully installed amazon-kclpy-2.0.1 argparse-1.4.0 boto-2.49.0

データを整形

kinesis streams の中身のデータはこんな形になってる。

cf. https://docs.aws.amazon.com/ja_jp/streams/latest/dev/fundamental-stream.html#put-record

{
  "Records": [
    {
      "Data": "dGVzdGRhdGE=",
      "PartitionKey": 123,
      "ApproximateArrivalTimestamp": 1441215410.867,
      "SequenceNumber": "49544985256907370027570885864065577703022652638596431874"
    }
  ],
  "MillisBehindLatest": 24000,
  "NextShardIterator": "AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
}

これを、 kcl で渡されてくる形に変換する。

convert.jq
{
  action: "processRecords",
  records: .Records
    | map({
        data: .Data,
        partitionKey: .PartitionKey,
        sequenceNumber: .SequenceNumber,
        approximateArrivalTimestamp: .ApproximateArrivalTimestamp|floor,
        subSequenceNumber: 0,
        action: "record"
      }
    ),
  millisBehindLatest: .MillisBehindLatest
}
$ cat data.json | jq -f convert.jq
{
  "action": "processRecords",
  "records": [
    {
      "data": "dGVzdGRhdGE=",
      "partitionKey": 123,
      "sequenceNumber": "49544985256907370027570885864065577703022652638596431874",
      "approximateArrivalTimestamp": 1441215410,
      "subSequenceNumber": 0,
      "action": "record"
    }
  ],
  "millisBehindLatest": 24000
}

コード

全部実装するとけっこうめんどくさいので、 付属の sample を拝借する。

$ mkdir tests
$ cd tests

$ cp ../amazon-kinesis-client-python/samples/sample_kclpy_app.py .

細かい説明は端折るので気になるなら中身見てください。

sample.py
import json

from amazon_kclpy import kcl
from amazon_kclpy.messages import InitializeInput
import sample_kclpy_app

initialize_input_json = """
{
  "shardId": "shardId-000000000000",
  "sequenceNumber": "TRIM_HORIZON",
  "subSequenceNumber": 0,
  "action": "initialize"
}
"""

class RecordProcessor(sample_kclpy_app.RecordProcessor):
    def __init__(self):
        super(RecordProcessor, self).__init__()

    def initialize(self, initialize_input):
        self._shard_id = initialize_input.shard_id
        super().initialize(initialize_input)

    @property
    def shard_id(self):
        return self._shard_id

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        print(locals())


if __name__ == "__main__":
    initial_input = InitializeInput(json.loads(initialize_input_json))
    proc = RecordProcessor()
    proc.initialize(initial_input)

    kcl_process = kcl.KCLProcess(proc)
    kcl_process.run()

動かす

こんな配置になっていると思うので、

$ tree .. --charset unicode
..
|-- amazon-kinesis-client-python
    (..snip..)
`-- tests
    |-- convert.jq
    |-- data.json
    |-- sample.py
    `-- sample_kclpy_app.py

こう。

$ cat data.json | jq -c -f convert.jq | python sample.py
{'self': <__main__.RecordProcessor object at 0x109f4d390>, 'data': b'testdata', 'partition_key': 123, 'sequence_number': 49544985256907370027570885864065577703022652638596431874, 'sub_sequence_number': 0}

{"action": "status", "responseFor": "processRecords"}

おわり。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0