Edited at

Kinesis Data Streams プロキシとしてのAPI Gatewayの利用


はじめに

以下のドキュメントを参考に、ユースケースとしてセンサーデータなどをWEB APIにGET もしくは POSTで送信しているリクエストをKinesis Streamに送ってみる方法を試しました。

Amazon Kinesis プロキシとして API Gateway API を作成する

いくつかの実装例が記載されていますが、具体的には以下のAPIを作成します。

API の /streams/{stream-name}/record リソースで HTTP GET/POST メソッドを公開し、このメソッドを Kinesis の PutRecord アクションと統合します。

これにより、クライアントは名前付きストリームに 1 つのデータレコードを追加できます。


ロールの作成

API GatewayがKinesisにアクセスするためのIAMロールを作成します。


  • 使用するサービスで「API Gateway」を選択します。

image.png


  • ここはデフォルトのままで

image.png


  • タグ設定も任意でOKです

image.png


  • ロール名は任意ですが、ここでは「apigateway-kinesis-stream-role」にしました。

image.png


  • ロールを作成したあとに、Kinesisで読み取りおよび書き込みアクションを有効にするため、「AmazonKinesisFullAccess」ポリシーを付与します。

image.png

ロールの準備はこれで終わりです。


Amazon Kinesis Streamsの作成

Amazon Kinesis Streamsを作成します。

ストリームの名前は任意です。

image.png


API Gateway の作成

API Gateway を作成します。

プロトコルはRESTを選択し新しいAPIを作成します。

image.png


リソースの作成


/streamsリソースをAPIのルートに追加します。


image.png


/streamsリソースの下に{stream-name}パス変数を使用して、子リソースを作成します。


image.png


名前付きストリームリソース/{stream-name}recordリソースを追加します。


image.png


GETメソッドを追加し、以下を入力します。


項目

統合タイプ
AWSサービス

AWSリージョン
ap-northeast-1

AWSサービス
Kinesis

AWSサブドメイン
(ブランク)

HTTPメソッド
POST

アクション
PutRecord

実行ロール
先程作成したロールをARNで入力

image.png


HTTPヘッダーセクションに以下を入力します。


項目

Content-Type
'x-amz-json-1.1'

image.png


マッピングテンプレートセクションで以下を入力します。



  • 「テンプレートが定義されていない場合(推奨)」を選択

  • 【マッピングテンプレートの追加】を選択し、Content-Typeapplication/jsonを入力する

image.png


  • テンプレート入力欄に以下を入力します。

#set($allParams = $input.params())

#set($params = $allParams.get('querystring'))
#set($data = "{#foreach($paramName in $params.keySet())""$paramName"": ""$util.escapeJavaScript($params.get($paramName))""#if($foreach.hasNext),#end#end}")
{
"StreamName": "$input.params('stream-name')",
"Data": "$util.base64Encode($data)",
"PartitionKey": "$input.params('partitionkey')"
}

image.png

テンプレートでは、クエリ文字列からPutRecordアクションに必要なStreamNameDataPartitionKeyをマッピングしています。

API gatewayの設定は以上です。


テスト

ここまででテストをしてみます。

以下の情報を入力し、テストしてみましょう。

項目 

パス {stream-name}
iot-stream

クエリ文字列 {record}
partitionkey=testkey&param1=value1

image.png

画面右のように以下が返ってくれば成功です。

{

"SequenceNumber": "xxxxxxxxxxxxxxx",
"ShardId": "shardId-000000000000"
}

統合リクエストには以下のデータが渡されます。

{

"StreamName": "iot-stream",
"Data": "eyJwYXJ0aXRpb25rZXkiOiAidGVzdGtleSIsInBhcmFtMSI6ICJ2YWx1ZTEifQ==",
"PartitionKey": "testkey"
}


おまけ

Kinesis Streamをデータソースとしてpayloadをコンソール出力するLambdaです。

const util = require('util');

exports.handler = (event, context, callback) => {
console.log(`event: ${util.inspect(event, { depth: null })}`);

for (const record of event['Records']) {
console.log('test: ' + record.kinesis.data);
const dataDecode = Buffer.from(record.kinesis.data, 'base64').toString('utf8');

console.log(`payload: ${dataDecode}`);
}

callback(null, 'Success');
}


まとめ

ユースケースとしてセンサーデータなどをWEB APIにGETで送信しているリクエストをKinesis Streamに送ってみる方法を試しました。

GETでリクエストすることがあるかどうかはおいといて、クエリ文字列をかき集めて統合リクエストにマッピングすることが出来ました。