はじめに
以下のドキュメントを参考に、ユースケースとしてセンサーデータなどをWEB APIにGET もしくは POSTで送信しているリクエストをKinesis Streamに送ってみる方法を試しました。
Amazon Kinesis プロキシとして API Gateway API を作成する
いくつかの実装例が記載されていますが、具体的には以下のAPIを作成します。
API の /streams/{stream-name}/record リソースで HTTP GET/POST メソッドを公開し、このメソッドを Kinesis の PutRecord アクションと統合します。
これにより、クライアントは名前付きストリームに 1 つのデータレコードを追加できます。
ロールの作成
API GatewayがKinesisにアクセスするためのIAMロールを作成します。
- 使用するサービスで「API Gateway」を選択します。
- ここはデフォルトのままで
- タグ設定も任意でOKです
- ロール名は任意ですが、ここでは「apigateway-kinesis-stream-role」にしました。
- ロールを作成したあとに、Kinesisで読み取りおよび書き込みアクションを有効にするため、「AmazonKinesisFullAccess」ポリシーを付与します。
ロールの準備はこれで終わりです。
Amazon Kinesis Streamsの作成
Amazon Kinesis Streamsを作成します。
ストリームの名前は任意です。
API Gateway の作成
API Gateway を作成します。
プロトコルはREST
を選択し新しいAPIを作成します。
リソースの作成
/streams
リソースをAPI
のルートに追加します。
/streams
リソースの下に{stream-name}
パス変数を使用して、子リソースを作成します。
名前付きストリームリソース
/{stream-name}
にrecord
リソースを追加します。
GET
メソッドを追加し、以下を入力します。
項目 | 値 |
---|---|
統合タイプ | AWSサービス |
AWSリージョン | ap-northeast-1 |
AWSサービス | Kinesis |
AWSサブドメイン | (ブランク) |
HTTPメソッド | POST |
アクション | PutRecord |
実行ロール | 先程作成したロールをARNで入力 |
HTTPヘッダーセクションに以下を入力します。
項目 | 値 |
---|---|
Content-Type | 'x-amz-json-1.1' |
マッピングテンプレートセクションで以下を入力します。
- 「テンプレートが定義されていない場合(推奨)」を選択
- 【マッピングテンプレートの追加】を選択し、
Content-Type
にapplication/json
を入力する
- テンプレート入力欄に以下を入力します。
#set($allParams = $input.params())
#set($params = $allParams.get('querystring'))
#set($data = "{#foreach($paramName in $params.keySet())""$paramName"": ""$util.escapeJavaScript($params.get($paramName))""#if($foreach.hasNext),#end#end}")
{
"StreamName": "$input.params('stream-name')",
"Data": "$util.base64Encode($data)",
"PartitionKey": "$input.params('partitionkey')"
}
テンプレートでは、クエリ文字列からPutRecord
アクションに必要なStreamName
、Data
、PartitionKey
をマッピングしています。
API gatewayの設定は以上です。
テスト
ここまででテストをしてみます。
以下の情報を入力し、テストしてみましょう。
項目 | 値 |
---|---|
パス {stream-name} | iot-stream |
クエリ文字列 {record} | partitionkey=testkey¶m1=value1 |
画面右のように以下が返ってくれば成功です。
{
"SequenceNumber": "xxxxxxxxxxxxxxx",
"ShardId": "shardId-000000000000"
}
統合リクエストには以下のデータが渡されます。
{
"StreamName": "iot-stream",
"Data": "eyJwYXJ0aXRpb25rZXkiOiAidGVzdGtleSIsInBhcmFtMSI6ICJ2YWx1ZTEifQ==",
"PartitionKey": "testkey"
}
おまけ
Kinesis Streamをデータソースとしてpayload
をコンソール出力するLambdaです。
const util = require('util');
exports.handler = (event, context, callback) => {
console.log(`event: ${util.inspect(event, { depth: null })}`);
for (const record of event['Records']) {
console.log('test: ' + record.kinesis.data);
const dataDecode = Buffer.from(record.kinesis.data, 'base64').toString('utf8');
console.log(`payload: ${dataDecode}`);
}
callback(null, 'Success');
}
まとめ
ユースケースとしてセンサーデータなどをWEB APIにGETで送信しているリクエストをKinesis Streamに送ってみる方法を試しました。
GETでリクエストすることがあるかどうかはおいといて、クエリ文字列をかき集めて統合リクエストにマッピングすることが出来ました。