LoginSignup
5
1

More than 3 years have passed since last update.

AWS Kinesis Analyticsでレコードの事前処理(Lambda Python 3.7)

Last updated at Posted at 2019-09-21

はじめに

Amazon Kinesis Data Analyticsでは、Record pre-processingEnabledにすることで、Lambdaでレコードの事前処理ができます。

Lambdaのサンプルコードはドキュメントによるとblueprintでテンプレートが用意されているはずなのですが、利用できない状態になっています。
(コンソールからリンクをクリックしても何も出てこない。。)

今回、Lambdaの事前処理についてTwitterのツイートの頻出単語ウインドウ集計をテーマにしてサンプルを作成してみたのでまとめておきます。

構成

以下の順番で処理をしていきます。

  1. LogstashでTwitter Streaming APIからツイートを取得。
  2. LogstashからKinesis Data Streamsにツイートを転送。
  3. Kinesis Data AnalyticsのData SourceをData Streamsとし、LambdaでRecord pre-processingする
  4. Lambdaはツイートを単語間のスペースで分割してData Analyticsに返す
  5. Data Analyticsは単語の出現数をウインドウ集計

image.png

Logstash

Logstashのconfファイルのサンプルです。Twitter APIからツイートを取得し、必要なフィールドのみをKinesis Data Streamsにアウトプットします。

input {
  twitter {
    consumer_key => ""
    consumer_secret => ""
    oauth_token => ""
    oauth_token_secret => ""
    languages => ["en"]
    full_tweet => true
    keywords => ["trump"]
    ignore_retweets => true
    codec => "json"
  }
}
filter {
  # "text"と"@timestamp"フィールドのみを残す
  ruby {
    code => "
      wanted_fields = ['text', '@timestamp']
      event.to_hash.keys.each { |k|
        event.remove(k) unless wanted_fields.include? k
      }
    "
  }
}
output {
   kinesis {
     stream_name => ""
     region => "ap-northeast-1"
     access_key => ""
     secret_key => ""
   }
}

Record pre-processing

Record pre-processingEnabledにして、対象のLambdaを設定します。

image.png

Lambda

Kinesis Streamsからレコードを受け取り、受け取ったレコードのTextを単語間のスペースで分割していきます。

ポイントは以下です。

dataフィールドはbase64エンコードが必要

受け取る時はデコード、アウトプットはエンコードしなければなりません。

また、Kinesisをデータソースとした場合に限った話ではありませんが、Lambdaハンドラーのeventは辞書型に変換が必要です。(以下の記事参照)

lambda_function.py
import json
import base64
import logging
import re
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    response = []
    for record in event['records']:
        wordlist = []

        try:
            # Base64で送られてきたレコードをデコードする
            data = base64.b64decode(record['data']).decode(encoding='utf-8', errors='replace')
            # dataを辞書型に変換
            data_json = json.loads(data)

            logger.info(data_json)

            # ツイートの分割
            splited_words = re.split(" +", data_json['text'])
            for word in splited_words:
                wordlist.append({"word" : word})

            logger.info(wordlist)

            # Base64にエンコード
            wordlist_json = json.dumps(wordlist)
            wordlist_byte = wordlist_json.encode('utf-8')
            wordlist_base64 = base64.b64encode(wordlist_byte)
            wordlist_base64_str = wordlist_base64.decode()

            # 戻り値のdataの生成
            response.append(
                {
                    "recordId" : record['recordId'],
                    "result" : "Ok",
                    "data": wordlist_base64_str
                }
            ) 
        # 処理に失敗した場合の戻り値の生成
        except:
            response.append(
                {
                    "recordId" : record['recordId'],
                    "result" : "Dropped"
                }
            )

    return_dict = {
        "records": response
    }

    logger.info(return_dict)
    # 戻り値は辞書型(JSONではない点に注意!)
    return return_dict

Kinesis Analytics

SQL

"word"フィールドを10秒毎に集計するSQLです。


CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (word VARCHAR(32), word_count INTEGER);

CREATE OR REPLACE  PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"

SELECT STREAM "word", COUNT(*) AS "word_count"
FROM "SOURCE_SQL_STREAM_001"

GROUP BY "word", FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

集計結果

集計結果が確認できます。
"WORD_COUNT"列に集計した数が表示されています。

image.png

最後に

Lambdaの事前処理を紹介しました。
Base64の処理のところに手こずるかもしれませんが、事前処理がLambdaで手軽にできるのは便利かと思います。

誰かのお役に立てれば幸いです!

5
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1