5
Help us understand the problem. What are the problem?

More than 3 years have passed since last update.

posted at

updated at

Organization

Kinesis Data Streams プロキシとしてのAPI Gatewayの利用

はじめに

以下のドキュメントを参考に、ユースケースとしてセンサーデータなどをWEB APIにGET もしくは POSTで送信しているリクエストをKinesis Streamに送ってみる方法を試しました。

Amazon Kinesis プロキシとして API Gateway API を作成する

いくつかの実装例が記載されていますが、具体的には以下のAPIを作成します。

API の /streams/{stream-name}/record リソースで HTTP GET/POST メソッドを公開し、このメソッドを Kinesis の PutRecord アクションと統合します。
これにより、クライアントは名前付きストリームに 1 つのデータレコードを追加できます。

ロールの作成

API GatewayがKinesisにアクセスするためのIAMロールを作成します。

  • 使用するサービスで「API Gateway」を選択します。

image.png

  • ここはデフォルトのままで

image.png

  • タグ設定も任意でOKです

image.png

  • ロール名は任意ですが、ここでは「apigateway-kinesis-stream-role」にしました。

image.png

  • ロールを作成したあとに、Kinesisで読み取りおよび書き込みアクションを有効にするため、「AmazonKinesisFullAccess」ポリシーを付与します。

image.png

ロールの準備はこれで終わりです。

Amazon Kinesis Streamsの作成

Amazon Kinesis Streamsを作成します。
ストリームの名前は任意です。

image.png

API Gateway の作成

API Gateway を作成します。
プロトコルはRESTを選択し新しいAPIを作成します。

image.png

リソースの作成

/streamsリソースをAPIのルートに追加します。

image.png

/streamsリソースの下に{stream-name}パス変数を使用して、子リソースを作成します。

image.png

名前付きストリームリソース/{stream-name}recordリソースを追加します。

image.png

GETメソッドを追加し、以下を入力します。

項目
統合タイプ AWSサービス
AWSリージョン ap-northeast-1
AWSサービス Kinesis
AWSサブドメイン (ブランク)
HTTPメソッド POST
アクション PutRecord
実行ロール 先程作成したロールをARNで入力

image.png

HTTPヘッダーセクションに以下を入力します。

項目
Content-Type 'x-amz-json-1.1'

image.png

マッピングテンプレートセクションで以下を入力します。

  • 「テンプレートが定義されていない場合(推奨)」を選択
  • 【マッピングテンプレートの追加】を選択し、Content-Typeapplication/jsonを入力する

image.png

  • テンプレート入力欄に以下を入力します。
#set($allParams = $input.params())
#set($params = $allParams.get('querystring'))
#set($data = "{#foreach($paramName in $params.keySet())""$paramName"": ""$util.escapeJavaScript($params.get($paramName))""#if($foreach.hasNext),#end#end}")
{
    "StreamName": "$input.params('stream-name')",
    "Data": "$util.base64Encode($data)",
    "PartitionKey": "$input.params('partitionkey')"
}

image.png

テンプレートでは、クエリ文字列からPutRecordアクションに必要なStreamNameDataPartitionKeyをマッピングしています。

API gatewayの設定は以上です。

テスト

ここまででテストをしてみます。
以下の情報を入力し、テストしてみましょう。

項目 
パス {stream-name} iot-stream
クエリ文字列 {record} partitionkey=testkey&param1=value1

image.png

画面右のように以下が返ってくれば成功です。

{
  "SequenceNumber": "xxxxxxxxxxxxxxx",
  "ShardId": "shardId-000000000000"
}

統合リクエストには以下のデータが渡されます。

{
    "StreamName": "iot-stream",
    "Data": "eyJwYXJ0aXRpb25rZXkiOiAidGVzdGtleSIsInBhcmFtMSI6ICJ2YWx1ZTEifQ==",
    "PartitionKey": "testkey"
}

おまけ

Kinesis Streamをデータソースとしてpayloadをコンソール出力するLambdaです。

const util = require('util');

exports.handler = (event, context, callback) => {
    console.log(`event: ${util.inspect(event, { depth: null })}`);

    for (const record of event['Records']) {
        console.log('test: ' + record.kinesis.data);
        const dataDecode = Buffer.from(record.kinesis.data, 'base64').toString('utf8');

        console.log(`payload: ${dataDecode}`);
    }

    callback(null, 'Success');
}

まとめ

ユースケースとしてセンサーデータなどをWEB APIにGETで送信しているリクエストをKinesis Streamに送ってみる方法を試しました。
GETでリクエストすることがあるかどうかはおいといて、クエリ文字列をかき集めて統合リクエストにマッピングすることが出来ました。

Register as a new user and use Qiita more conveniently

  1. You can follow users and tags
  2. you can stock useful information
  3. You can make editorial suggestions for articles
What you can do with signing up
5
Help us understand the problem. What are the problem?