GCPのPubSubで配信されているストリームデータを、AWSのKinesis Data Streams(以降、KDSと記述)に連携する方法を考える機会があったので、一番手っ取り早そうな方法として、PubSubからのpushをAPI Gatewayで受信してKDSにPutするやり方を試してみました。
以下の記事を参考にさせていただきました。
HTTP API と REST API
API GatewayでRESTfulなAPIを構築する場合、「HTTP API」と「REST API」のどちらかを選択する必要があります。もう1つ「WebSocket API」というものもありますが、今回のユースケースには合わないので言及しません。
AWSのドキュメントを見る限り、HTTP API が新世代、REST API が旧世代という位置付けのようです。HTTP API は REST API より低レイテンシーで低コストという特徴がある一方、機能の幅広さの面ではまだまだ REST API の方が対応可能な範囲が広いようです。詳細は公式ドキュメントをご参照ください。
今回は REST API と HTTP API の両方を試してみました。先に REST API の場合について記載し、次に HTTP API の場合の REST API との差異について記載します。個人的な感想としては、HTTP APIの方が必須の設定項目が少なく容易に構築できる印象でした。ユースケース的に HTTP API が対応している機能で十分な場合は、HTTP APIの方がラクかもしれません。
REST APIの場合
以下の通り、各パラメータを指定してリソースを構築していきます。特に記載のないパラメータはデフォルトのままです。
KDS
API GatewayからPutする先のKDSです。
- データストリーム名:
apigwtest
(任意の名前) - シャード数:
1
Consumer用のLambda
KDSへのデータ登録状況を確認するためのLambda関数です。KDSからストリームデータを受信してログ出力を行います。
kinesis-process-record
の設計図の使用して関数を作成します。
- 関数名:
KinesisConsumerTest
(任意の名前) - 実行ロール: 「基本的な Lambda アクセス権限で新しいロールを作成」
- 作成されたロールにKDSアクセス用ポリシーをアタッチ
- Kinesis ストリーム:
apigwtest
(上記で作成したストリーム名)
console.log('Loading function');
exports.handler = async (event, context) => {
//console.log('Received event:', JSON.stringify(event, null, 2));
for (const record of event.Records) {
// Kinesis data is base64 encoded so decode here
const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
console.log('Decoded payload:', payload);
// PubSubのメッセージ本文もBASE64エンコードされているので、さらにデコードする
const decodedMessage = Buffer.from(payload, 'base64').toString('utf-8');
console.log("message", decodedMessage);
}
return `Successfully processed ${event.Records.length} records.`;
};
API Gateway(KDS統合まで)
まずはPOSTで受信したデータをKDSにPutする部分だけ構築します。
KDSへのPut用のロール
-
ロール名:
ApigwKinesisPutTestRole
(任意の名前) -
ポリシー:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "kinesis:PutRecord" ], "Resource": "arn:aws:kinesis:ap-northeast-1:<アカウントID>:stream/apigwtest" } ] }
-
信頼関係:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "apigateway.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
REST API
基本的には以下の公式チュートリアルに沿って行いました。
-
「REST API」 で「構築」を選択
- プロトコル:
REST
- 「新しいAPI」を選択
- API名:
RestKinesisPut
(任意の名前) - 「作成」を選択
- プロトコル:
-
リソース
-
/
を選択 - 「アクション」→「メソッドの作成」 →
POST
- 統合タイプ: AWSサービス
- AWSリージョン:
ap-northeast-1
- AWSサービス:
Kinesis
- AWSサブドメイン: (空白のまま)
- HTTPメソッド:
POST
- アクションの種類:
アクション名の使用
- アクション:
PutRecord
- 実行ロール: KDSへのPut用のロールのARNを設定
- コンテンツの処理:
パススルー
-
-
統合リクエスト
- PubSubから連携されるデータ形式に合わせてマッピングを調整します。
- https://cloud.google.com/pubsub/docs/push?hl=ja#receiving_messages
- マッピングテンプレートの追加
application/json
{ "StreamName": "apigwtest", "Data": "$util.base64Encode($input.json('$.message.data'))", "PartitionKey": "$input.path('$.message').messageId" }
- PubSubから連携されるデータ形式に合わせてマッピングを調整します。
messageId
がパーティションキーとして適切かは要確認
PubSub
PubSubを構築するプロジェクトを選択(任意だが今回は新規プロジェクトを作成して使用)して、トピックとサブスクリプションを作成します。
- トピック作成
- トピックID:
kinesistest
(任意の名前)
- トピックID:
- サブスクリプション作成(トピック作成時のデフォルトサブスクリプションでもよい)
- サブスクリプションID:
kinesistest-sub
(任意の名前) - トピック:
projects/<プロジェクト名>/topics/kinesistest
- 配信タイプ: push
- エンドポイントURL: APIGatewayのURL(リソースのパスまで)
- サブスクリプションID:
疎通確認
ここでいったんPubSub〜API Gateway〜KDS〜Lambdaの疎通確認を行います。
- PubSubで「トピック」 → 「MESSAGES」 → 「メッセージのパブリッシュ」
- メッセージの数:
1
- メッセージの本文:
hoge
(適当) - 「公開」を選択
- メッセージの数:
- Consumer用のLambdaのログを確認
問題なければ、次にPubSub〜API Gateway間の認証を追加します。
認証の追加
上記のままだと誰でもAPI GatewayにPOSTできてしまうので、認証処理を追加します。
PubSubの設定
- サービスアカウントの作成
- サービスアカウント名:
pubsub
(任意の名前) - 「サービスアカウントトークン作成者」権限を付与
- サービスアカウント名:
- サブスクリプションを編集
- 「認証を有効にする」
- サービスアカウント: 上記で作成したサービスアカウントを選択
- 対象:
pubsubtest
(任意の値)
- 「認証を有効にする」
オーソライザー用Lambda関数の作成
以下のドキュメントを参考に認証用のLambda関数を作成します。この関数をAPI Gatewayのオーソライザーとして使用します。
-
google-auth-library
をnpm install
したうえでzipアップロード
exports.handler = async (event) => {
// console.log("event", event);
const authorizationHeader = event.authorizationToken;
const isAuthorized = await validate(authorizationHeader);
if (isAuthorized) {
return generatePolicy('pubsub', 'Allow', event.methodArn);
} else {
return generatePolicy('pubsub', 'Deny', event.methodArn);
}
};
const {OAuth2Client} = require('google-auth-library');
const authClient = new OAuth2Client();
const validate = async (authorizationHeader) => {
// console.log("authorizationHeader", authorizationHeader);
try {
const [, token] = authorizationHeader.match(/Bearer (.*)/);
const ticket = await authClient.verifyIdToken({
idToken: token,
audience: '<PubSubで設定した「対象」の値>',
});
const claim = ticket.getPayload();
// console.log("claim", claim);
if(isValidEmail(claim)) {
console.log('authorized');
return true;
} else {
console.log('unauthorized, invalid email', claim.email);
return false;
};
} catch (e) {
console.log("unauthorized", e);
return false;
}
}
const isValidEmail = (claim) => {
return claim.email_verified && claim.email === '<PubSubで設定したサービスアカウントのメールアドレス>';
}
const generatePolicy = (principalId, effect, resource) => {
const authResponse = {};
authResponse.principalId = principalId;
if (effect && resource) {
const policyDocument = {};
policyDocument.Version = '2012-10-17';
policyDocument.Statement = [];
const statementOne = {};
statementOne.Action = 'execute-api:Invoke';
statementOne.Effect = effect;
statementOne.Resource = resource;
policyDocument.Statement[0] = statementOne;
authResponse.policyDocument = policyDocument;
}
return authResponse;
}
API Gatewayの設定
- オーソライザー作成
- 「新しいオーソライザーの作成」
- 名前:
pubsubAuthorizer
(任意の名前) - タイプ:
Lambda
- Lambda関数: 作成したオーソライザー用Lambda関数を指定
- Lambda呼び出しロール: (空白のまま)
- Lambdaイベントペイロード:
トークン
- トークンのソース:
Authorization
- トークンの検証: (空白のまま)
- API Gateway に、Lambda 関数を呼び出すアクセス許可を自動的に付与
- 認可の設定
- 「リソース」 →
/
→POST
→ 「メソッドリクエスト」 - 認可:
pubsubAuthorizer
(上記で作成したオーソライザー名)
- 「リソース」 →
認証ありで疎通確認
前述の疎通確認と同様に確認を行います。
また、以下の場合は認証エラーとなることを確認します。
- PubSubでなく別途curl等で、Authorizationヘッダを無しか不正なJWTにしてAPIリクエストした場合
- PubSubで設定する「対象」の値を別の値を変えた場合
- PubSubで設定する認証用サービスアカウントを別のサービスアカウントに変えた場合
CDKによる記述
上記のように、まずは試行錯誤しつつマネジメントコンソールで構築していましたが、一通り成功したのでCDKでも記述してみました。
準備
Cloud9上でコード記述やデプロイ等の操作を行いました。必要なライブラリがデフォルトでインストールされていて便利でした。以下のドキュメントに沿って実施しています。
# プロジェクト作成・準備
mkdir ~/environment/apigw-kds
cd ~/environment/apigw-kds
cdk init sample-app --language typescript
cdk bootstrap
コード記述の際は、以下の記事を参考にさせていただきました。
リソースごとに必要なCDKモジュールは適宜 npm install
しています。
{
"name": "apigw-kds",
"version": "0.1.0",
"bin": {
"apigw-kds": "bin/apigw-kds.js"
},
"scripts": {
"build": "tsc",
"watch": "tsc -w",
"test": "jest",
"cdk": "cdk"
},
"devDependencies": {
"aws-cdk": "1.119.0",
"@aws-cdk/assert": "1.119.0",
"@types/jest": "^26.0.10",
"@types/node": "10.17.27",
"jest": "^26.4.2",
"ts-jest": "^26.2.0",
"ts-node": "^9.0.0",
"typescript": "~3.9.7"
},
"dependencies": {
"@aws-cdk/aws-apigateway": "^1.119.0",
"@aws-cdk/aws-iam": "^1.119.0",
"@aws-cdk/aws-kinesis": "^1.119.0",
"@aws-cdk/aws-lambda": "^1.119.0",
"@aws-cdk/aws-lambda-event-sources": "^1.119.0",
"@aws-cdk/core": "1.119.0"
}
}
コード
# !/usr/bin/env node
import * as cdk from '@aws-cdk/core';
import { KdsStack } from '../lib/kds-stack';
import { ApigwToKdsStack } from '../lib/apigw-to-kds-stack';
import { ConsumerLambdaStack } from '../lib/consumer-lambda-stack';
const app = new cdk.App();
const kdsStack = new KdsStack(app, 'KdsStack');
new ConsumerLambdaStack(app, 'ConsumerLambdaStack', {
stream: kdsStack.stream
});
new ApigwToKdsStack(app, 'ApigwToKdsStack', {
streamName: kdsStack.stream.streamName
});
import * as cdk from '@aws-cdk/core';
import * as kinesis from '@aws-cdk/aws-kinesis';
export class KdsStack extends cdk.Stack {
public readonly stream: kinesis.Stream;
constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
super(scope, id, props);
this.stream = new kinesis.Stream(this, 'Kinesis', {
streamName: 'apigwtest',
shardCount: 1,
});
}
}
import * as cdk from '@aws-cdk/core';
import * as lambda from '@aws-cdk/aws-lambda';
import * as iam from '@aws-cdk/aws-iam';
import * as kinesis from '@aws-cdk/aws-kinesis';
import { KinesisEventSource } from '@aws-cdk/aws-lambda-event-sources';
interface ConsumerLambdaStackProps extends cdk.StackProps {
stream: kinesis.Stream;
}
export class ConsumerLambdaStack extends cdk.Stack {
constructor(scope: cdk.App, id: string, props: ConsumerLambdaStackProps) {
super(scope, id, props);
const region = cdk.Stack.of(this).region;
const accountId = cdk.Stack.of(this).account;
const consumerFn = new lambda.Function(this, 'ConsumerFunction', {
runtime: lambda.Runtime.NODEJS_14_X,
handler: 'consumer.handler',
code: new lambda.AssetCode('lambda'),
});
consumerFn.role?.addToPolicy(
new iam.PolicyStatement({
actions: [
'kinesis:DescribeStream',
'kinesis:DescribeStreamSummary',
'kinesis:GetRecords',
'kinesis:GetShardIterator',
'kinesis:ListShards',
'kinesis:ListStreams',
'kinesis:SubscribeToShard'
],
effect: iam.Effect.ALLOW,
resources: [`arn:aws:kinesis:${region}:${accountId}:stream/${props.stream.streamName}`],
}),
);
consumerFn.addEventSource(new KinesisEventSource(props.stream, {
batchSize: 100, // default
startingPosition: lambda.StartingPosition.TRIM_HORIZON
}));
}
}
import * as cdk from '@aws-cdk/core';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as apigateway from '@aws-cdk/aws-apigateway';
import * as iam from '@aws-cdk/aws-iam';
import * as lambda from '@aws-cdk/aws-lambda';
interface ApigwToKdsStackProps extends cdk.StackProps {
streamName: string;
}
export class ApigwToKdsStack extends cdk.Stack {
constructor(scope: cdk.App, id: string, props: ApigwToKdsStackProps) {
super(scope, id, props);
const region = cdk.Stack.of(this).region;
const accountId = cdk.Stack.of(this).account;
// Kinesisアクセス用IAMロール
const apigwToKdsRole = new iam.Role(this, 'ApigwToKdsRole', {
assumedBy: new iam.ServicePrincipal('apigateway.amazonaws.com'),
});
apigwToKdsRole.addToPolicy(
new iam.PolicyStatement({
actions: ['kinesis:PutRecord'],
effect: iam.Effect.ALLOW,
resources: [`arn:aws:kinesis:${region}:${accountId}:stream/${props.streamName}`],
}),
);
// オーソライザー用Lambda関数
const authorizerFn = new lambda.Function(this, 'AuthorizerFunction', {
runtime: lambda.Runtime.NODEJS_14_X,
handler: 'authorizer.handler',
code: new lambda.AssetCode('lambda'),
});
// API Gateway
const restApi = new apigateway.RestApi(this, 'RestApi', {
restApiName: 'RestKdsPut',
deployOptions: {
stageName: 'test',
},
});
// オーソライザー設定
const authorizer = new apigateway.TokenAuthorizer(this, 'Authorizer', {
handler: authorizerFn,
});
// 統合リクエスト設定
const kinesisIntegration = new apigateway.AwsIntegration({
service: 'kinesis',
action: 'PutRecord',
options: {
credentialsRole: apigwToKdsRole,
passthroughBehavior: apigateway.PassthroughBehavior.WHEN_NO_TEMPLATES,
requestTemplates: {
'application/json': `{
"StreamName": "${props.streamName}",
"Data": "$util.base64Encode($input.json('$.message.data'))",
"PartitionKey": "$input.path('$.message').messageId"
}`,
},
integrationResponses: [
{
statusCode: '200',
responseTemplates: {
'application/json': '',
},
},
],
},
});
// メソッドリクエスト設定
restApi.root.addMethod('POST', kinesisIntegration, {
methodResponses: [
{
statusCode: '200',
},
],
authorizer,
});
}
}
なお。Lambdaのソースは前述のコードをそれぞれ lambda/consumer.js
, lambda/authorizer.js
として配置しています。( google-auth-library
を含むnode_modules
等も合わせてlambda
ディレクトリ配下に配置しています。)
構築
# ビルド
npm run build
cdk synth
# 構築(複数スタックを一括指定する場合)
cdk deploy '*'
# 削除(複数スタックを一括指定する場合)
cdk destroy '*'
HTTP APIの場合
HTTP API の場合について、REST API の場合との差異に絞って記載します。
KDS
REST APIの場合と同じ
Consumer用のLambda
REST APIの場合と同じ
API Gateway(KDS統合まで)
KDSへのPut用のロール
REST APIの場合と同じ
HTTP API
- 「HTTP API」で「構築」を選択
- API名:
HttpKinesisPut
(任意の名前) - 統合は後で追加する
- 「確認して作成」を選択
- API名:
- ルートの作成
- POST
/
- POST
- 統合の設定
- 「統合を作成してアタッチ」
- 統合タイプ:
Amazon Kinesis Data Streams
- 統合アクション:
PutRecord
- ストリーム名:
apigwtest
(作成したストリーム名) - データ:
$request.body.message.data
- パーティションキー:
$request.body.message.messageId
- 呼び出しロール: KDSへのPut用のロールのARNを設定
REST API のときは統合リクエストの設定でマッピングテンプレートの記述形式がよくわからずちょっとハマったので、上記のように簡単に設定できるのは簡単でよかったです。
PubSub
REST APIの場合と同じ
疎通確認
REST APIの場合と同じ
認証の追加
PubSubの設定
REST APIの場合と同じ
オーソライザー用Lambda関数の作成
HTTP APIの場合は、ペイロード形式バージョン(1.0 or 2.0)を選択する必要があり、バージョンによってペイロード形式が異なるのに加え、レスポンスの形式も変えることができます。
バージョン1.0の場合は、REST APIのLambdaオーソライザーと互換のようです。
バージョン2.0の場合は、IAMポリシーを返す代わりにシンプルに認証成否をブール値で返すことができます。
exports.handler = async (event) => {
// console.log("event", event);
// HTTP API ペイロード形式バージョン1.0 または REST APIの場合
// const authorizationHeader = event.authorizationToken;
// HTTP API ペイロード形式バージョン2.0 の場合
const authorizationHeader = event.headers.authorization;
const isAuthorized = await validate(authorizationHeader);
// HTTP API レスポンス形式1.0 または REST APIの場合
// if (isAuthorized) {
// return generatePolicy('user', 'Allow', event.methodArn);
// } else {
// return generatePolicy('user', 'Deny', event.methodArn);
// }
// HTTP API レスポンス形式2.0 の場合
return { isAuthorized };
};
// 以降はREST APIの場合と同じ
API Gatewayの設定
HTTP APIの場合は、JWTオーソライザーという機能で、JWTの検証を行うオーソライザーを簡単に作成することもできます。
JWTオーソライザーの作成
- 「認可」で「オーソライザーを作成してアタッチ」
- 名前:
pubsubjwt
(任意の名前) - IDソース:
$request.header.Authorization
- 発行者URL:
https://accounts.google.com
- 対象者: PubSubで設定した対象の値
- 名前:
ただし、JWTオーソライザーの場合、ドキュメント記載の検証内容を見る限り、発行者がGoogleであることと、対象(aud
)が設定値と一致するかの検証のみで、PubSubのドキュメントで推奨されているようなemail
の検証はできないように見えました。
そうなると、APIのURLと対象(aud
)の値さえわかれば、誰でもGCPの任意のプロジェクトからリクエストを投げれば認証が通ってしまうと思われます。
email
の検証も行うのであれば、やはりLambdaオーソライザーを使うことになるかと思います。
Lambdaオーソライザーの作成
- 「認可」で「オーソライザーを作成してアタッチ」
- オーソライザーのタイプ:
Lambda
- 名前:
pubsublambda
- Lambda関数: オーソライザー用Lambda関数を選択
- ペイロード形式のバージョン:
2.0
- レスポンスモード:
シンプル
- IDソース:
$request.header.Authorization
- 「API Gateway に、Lambda 関数を呼び出すアクセス許可を自動的に付与します。」をチェック
- オーソライザーのタイプ:
認証ありで疎通確認
REST APIの場合と同じ
CDK
HTTP APIの場合は@aws-sdk/aws-apigatewayv2
, @aws-sdk/aws-apigatewayv2-integration
あたりを使うのかと思ったのですが、本記事作成時点で最新のCDKバージョン1.119.0のドキュメントでは、AWSサービス統合に関する記述が見つけられませんでした。また、本バージョンでは apigatewayv2 の Higher Level Construct はまだ Experimental の状態でした。
CloudFormationのドキュメントでも、AWSサービス統合らしい設定を見つけられませんでした。IntegrationType
にAWS_PROXY
というものがありましたが、説明を見る限りこれはLambda統合用のように見えました。
HTTP APIの場合も、直接KDS統合でなくLambda経由でKDSにPutすれば、CDK/CloudFormationで構築できそうですが、今回はいったん断念しています。
他の対応案
PubSubからKDSに連携する方法としては、他にも以下の案などが考えられますが、これらの検証・比較も時間があれば別途実施したいと考えています。他によりよい方法があればコメントいただけるとありがたいです。
- GCPのDataflowやCloud FunctionsがPubSubから取得してKDSにPut
- GCP側で事前にストリームデータのフィルタができれば、AWSへの連携データ量を減らして通信コストを抑えられそう。
- DataFlowやCloud Functionsのデータ出力先をPubSubの別トピックにすれば、他の方法との組み合わせも可能と思われる。ただ、GCP側でのフィルタや加工が不要なら意味はない。
- GCPのVPC内のアプリケーション(GCE、GAE等)がPubSubから取得してKDSにPut(またはAPI Gateway経由でKDSにPut)
- Cloud NAT経由でAWS通信すれば、AWSへの送信元IPが固定できるので、AWS側で送信元IP制限が可能と思われる。
- AWSのVPC内のアプリケーション(EC2、ECS、Kinesis Data Analytics等)がPubSubからpullしてKDSにPut
- AWSリソースへのインバウンド通信(PubSubからのレスポンスを除く)を不要にできると思われる。