はじめに
AWS DVA取得のためのアウトプットが目的です。
対象チュートリアル
公式ドキュメントのチュートリアル: Amazon Kinesis で AWS Lambda を使用する
やってみた
チュートリアルがCLIで書かれていたので、最後までコンソールではなくCLIで実行しています。
AWS CLIのバージョンが2のため、バイナリ関係で追加オプションを記載しています。
# aws --version
aws-cli/2.1.3 Python/3.7.4 Darwin/20.3.0 exe/x86_64
Lambdaの実行ロール作成
CloudWatch Losgへの書き込み権限と、Kinesisからの読み込み権限を持つIAMロールを作成します。
まず、Lambdaに対して信頼関係のある、IAMロールの信頼ポリシーをJSON形式で作成します。
# vi lambda.json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "1",
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
},
"Action": "sts:AssumeRole"
}
]
}
作成した信頼ポリシーが書かれたJSONファイルを使用し、IAMロールを作成します。
# export ROLE_NAME="lambda-kinesis-role"
# aws iam create-role --role-name $ROLE_NAME \
--assume-role-policy-document file://lambda.json
{
"Role": {
"Path": "/",
"RoleName": "lambda-kinesis-role",
"RoleId": "AROAUMCLE6NZSYX7HX4HH",
"Arn": "arn:aws:iam::111111111111:role/lambda-kinesis-role",
"CreateDate": "2021-07-22T04:36:41+00:00",
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "1",
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
},
"Action": "sts:AssumeRole"
}
]
}
}
}
IAMロールにアタッチするAWSマネージドポリシーであるAWSLambdaKinesisExecutionRole
のARNを取得します。
# export POLICY_ARN=$(aws iam list-policies --scope AWS \
--policy-usage-filter PermissionsPolicy \
--query 'Policies[?PolicyName==`AWSLambdaKinesisExecutionRole`].Arn' \
--output text)
# echo $POLICY_ARN
arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
このマネージドポリシーは、以下の通りCloudWatch Logsへの書き込みとKinesisの読み取り権限を持っています。
# export POLICY_VERSION=$(aws iam get-policy \
--policy-arn $POLICY_ARN \
--query 'Policy.DefaultVersionId' \
--output text)
# echo $POLICY_VERSION
v2
# aws iam get-policy-version \
--version-id $POLICY_VERSION \
--policy-arn $POLICY_ARN
{
"PolicyVersion": {
"Document": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:DescribeStreamSummary",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:ListShards",
"kinesis:ListStreams",
"kinesis:SubscribeToShard",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
},
"VersionId": "v2",
"IsDefaultVersion": true,
"CreateDate": "2018-11-19T20:09:24+00:00"
}
}
IAMロールにAWSマネージドポリシーであるAWSLambdaKinesisExecutionRole
をアタッチします。
# aws iam attach-role-policy \
--role-name $ROLE_NAME \
--policy-arn $POLICY_ARN
アタッチされたことを確認します。
list-attached-role-policies
は、ロールにアタッチされたマネージドポリシーを表示します。
なお、インラインポリシーを表示するのは別のコマンドです。
# aws iam list-attached-role-policies \
--role-name $ROLE_NAME
{
"AttachedPolicies": [
{
"PolicyName": "AWSLambdaKinesisExecutionRole",
"PolicyArn": "arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole"
}
]
}
Lambda作成
まず、以下のサンプルコードをコピペしてコードを作成します。
この後Lambdaのテストを行う際、Lambdaの成功を分かりやすくするために、returnを追加します。
チュートリアルではNode.jsでしたが、私は普段Pythonを使用しているのでPythonを選択しました。
# vi lambda_function.py
from __future__ import print_function
# import json
import base64
def lambda_handler(event, context):
for record in event['Records']:
#Kinesis data is base64 encoded so decode here
payload=base64.b64decode(record["kinesis"]["data"])
print("Decoded payload: " + str(payload))
return str(payload)
作成したコードをZip化します。
zip function.zip lambda_function.py
Lambdaを作成します。
先ほど作成したIAMロールのARNを指定しなければならないので、IAMロールのARNを取得してから作成しています。
handlerでは、Lambdaが呼び出されると実行する関数を指定します。
今回、Lambda_function.pyというファイル内のLambda_handler関数を実行したいので、handlerをlambda_function.lambda_handler
としてLambdaを作成しています。
# export ROLE_ARN=$(aws iam get-role \
--role-name $ROLE_NAME \
--query 'Role.Arn' \
--output text)
# echo $ROLE_ARN
arn:aws:iam::111111111111:role/lambda-kinesis-role
# export LAMBDA_NAME="ProcessKinesisRecords"
# aws lambda create-function --function-name $LAMBDA_NAME \
--zip-file fileb://function.zip --handler lambda_function.lambda_handler --runtime python3.7 \
--role $ROLE_ARN
{
"FunctionName": "ProcessKinesisRecords",
"FunctionArn": "arn:aws:lambda:ap-northeast-1:111111111111:function:ProcessKinesisRecords",
"Runtime": "python3.7",
"Role": "arn:aws:iam::111111111111:role/lambda-kinesis-role",
"Handler": "index.handler",
"CodeSize": 360,
"Description": "",
"Timeout": 3,
"MemorySize": 128,
"LastModified": "2021-07-22T05:29:11.313+0000",
"CodeSha256": "dDwoJP20Yk7Dpk936dXZL4LPURp+gG2XmGfkzpOfKno=",
"Version": "$LATEST",
"TracingConfig": {
"Mode": "PassThrough"
},
"RevisionId": "7fa1afa5-b052-435a-8621-598724bc048a",
"State": "Active",
"LastUpdateStatus": "Successful"
}
Lambdaテスト
Kinesisで実行する前に、Lambdaでテストします。
以下のチュートリアル提供のJSONを使用します。
# vi input.txt
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1545084650.987
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}
]
}
テストを行います。
応答がout.txtに保存されるので、確認します。
# aws lambda invoke \
--function-name $LAMBDA_NAME \
--payload file://input.txt out.txt \
--cli-binary-format raw-in-base64-out
{
"StatusCode": 200,
"ExecutedVersion": "$LATEST"
}
# cat out.txt
"b'Hello, this is a test.'"%
returnで、input.txtのRecords[0].kinesis.dataのデコードされた文字が返却されているので、成功したことがわかります。
エラーの場合は次のように表示されました。
{
"StatusCode": 200,
"FunctionError": "Unhandled",
"ExecutedVersion": "$LATEST"
}
# cat out.txt
{"errorMessage": "Syntax error in module 'lambda_function': inconsistent use of tabs and spaces in indentation (lambda_function.py, line 9)", "errorType": "Runtime.UserCodeSyntaxError", "stackTrace": [" File \"/var/task/lambda_function.py\" Line 9\n \t\t\t return payload\n"]}%
Kinesisストリームを作成
Kinesisストリームを作成し、そのARNを取得します。
# export STREAM_NAME="lambda-stream"
# aws kinesis create-stream \
--stream-name $STREAM_NAME \
--shard-count 1
# export STREAM_ARN=$(aws kinesis describe-stream \
--stream-name $STREAM_NAME \
--query 'StreamDescription.StreamARN' \
--output text)
# echo $STREAM_ARN
arn:aws:kinesis:ap-northeast-1:111111111111:stream/lambda-stream
Lambdaにイベントソースを追加
LambdaのイベントソースにKinesisを追加します。
後ほど使用するため、マッピングIDを取得(チュートリアルの指示ですが、後ほど使用することはありませんでした。ただしこの記事では削除の時に使用します。)。
# aws lambda create-event-source-mapping --function-name $LAMBDA_NAME \
> --event-source-arn $STREAM_ARN \
> --batch-size 100 \
> --starting-position LATEST
{
"UUID": "e8ae25a5-6bd5-43a5-aa60-aca8fd8a91c3",
"StartingPosition": "LATEST",
"BatchSize": 100,
"MaximumBatchingWindowInSeconds": 0,
"ParallelizationFactor": 1,
"EventSourceArn": "arn:aws:kinesis:ap-northeast-1:111111111111:stream/lambda-stream",
"FunctionArn": "arn:aws:lambda:ap-northeast-1:111111111111:function:ProcessKinesisRecords",
"LastModified": "2021-07-22T15:52:29.098000+09:00",
"LastProcessingResult": "No records processed",
"State": "Creating",
"StateTransitionReason": "User action",
"DestinationConfig": {
"OnFailure": {}
},
"MaximumRecordAgeInSeconds": -1,
"BisectBatchOnFunctionError": false,
"MaximumRetryAttempts": -1
}
# aws lambda list-event-source-mappings \
--function-name $LAMBDA_NAME \
--event-source-arn $STREAM_ARN
{
"EventSourceMappings": [
{
"UUID": "e8ae25a5-6bd5-43a5-aa60-aca8fd8a91c3",
"StartingPosition": "LATEST",
"BatchSize": 100,
"MaximumBatchingWindowInSeconds": 0,
"ParallelizationFactor": 1,
"EventSourceArn": "arn:aws:kinesis:ap-northeast-1:111111111111:stream/lambda-stream",
"FunctionArn": "arn:aws:lambda:ap-northeast-1:111111111111:function:ProcessKinesisRecords",
"LastModified": "2021-07-22T15:53:00+09:00",
"LastProcessingResult": "No records processed",
"State": "Enabled",
"StateTransitionReason": "User action",
"DestinationConfig": {
"OnFailure": {}
},
"MaximumRecordAgeInSeconds": -1,
"BisectBatchOnFunctionError": false,
"MaximumRetryAttempts": -1
}
]
}
# export MAPPING_ID=$(aws lambda list-event-source-mappings \
--function-name $LAMBDA_NAME \
--event-source-arn $STREAM_ARN \
--query 'EventSourceMappings[0].UUID' \
--output text)
# echo $MAPPING_ID
e8ae25a5-6bd5-43a5-aa60-aca8fd8a91c3
セットアップをテスト
Kinesisストリームにイベントレコードを追加し、Lambdaで処理されるか確認します。
Kinesisストリームにイベントレコード追加。
# aws kinesis put-record \
--stream-name $STREAM_NAME \
--partition-key 1 \
--data "Hello, First." \
--cli-binary-format raw-in-base64-out
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49620367105734911944180226173654955033153918722975465474"
}
Lambdaが実行されたか確認するため、CloudWatch Logsのログストリームを確認します。
まず、ロググループの最新のログストリーム名を、クエリのソートにより取得します。
# export LOG_GROUP_NAME="/aws/lambda/$LAMBDA_NAME"
# export LATEST_EVENT=$(aws logs describe-log-streams \
--log-group-name $LOG_GROUP_NAME \
--query 'sort_by(logStreams, &lastEventTimestamp)[].{logStreamName: logStreamName,lastEventTimestamp:lastEventTimestamp}[-1].logStreamName' \
--output text)
# echo $LATEST_EVENT
2021/07/22/[$LATEST]9e1599c0f1cd4a6c90fb1966fe7695be
describe-log-streams
でログストリームが表示されますが、その順番はおそらく数値・アルファベット順であり、ログが送信された時刻とは関係ありません。
そのため、lastEventTimestamp
の表示が最新のログストリームが、最後にLambdaが実行したログだと考えられます。
lastEventTimestamp
はエポック秒のため、ログストリームの中でその値が一番大きなログストリーム名を取得しました。
以下のLambdaのロググループ内のログストリーム一覧を見ると、最新版として取得したログストリーム2021/07/22/[$LATEST]9e1599c0f1cd4a6c90fb1966fe7695be
が一番下にあり、ログストリームの中で最もエポック秒が大きいことが分かります。
# aws logs describe-log-streams \
--log-group-name $LOG_GROUP_NAME \
--query 'sort_by(logStreams, &lastEventTimestamp)[].{logStreamName: logStreamName,lastEventTimestamp:lastEventTimestamp}'
[
{
"logStreamName": "2021/07/22/[$LATEST]939c96448db643468102506b5ac6b506",
"lastEventTimestamp": 1626932290474
},
{
"logStreamName": "2021/07/22/[$LATEST]b73cd4b6ff084f1fa67bffc50455cdb4",
"lastEventTimestamp": 1626932533447
},
{
"logStreamName": "2021/07/22/[$LATEST]66b34064a27340a9b33b4dbd09f11cd3",
"lastEventTimestamp": 1626932697007
},
{
"logStreamName": "2021/07/22/[$LATEST]7fbfd3b7fbe444ff9ced5a1fde5fa1a4",
"lastEventTimestamp": 1626933502267
},
{
"logStreamName": "2021/07/22/[$LATEST]137462ac70dc418eb196b7597fbebfc1",
"lastEventTimestamp": 1626933926801
},
{
"logStreamName": "2021/07/22/[$LATEST]9e1599c0f1cd4a6c90fb1966fe7695be",
"lastEventTimestamp": 1626937651674
}
]
取得したログストリーム名を使用して、LambdaがKinesisストリームに追加されたレコードを処理したか確認します。
# aws logs get-log-events \
--log-group-name $LOG_GROUP_NAME \
--log-stream-name $LATEST_EVENT
{
"events": [
{
"timestamp": 1626937651672,
"message": "START RequestId: af71154e-1723-4020-b110-c41069d35475 Version: $LATEST\n",
"ingestionTime": 1626937657154
},
{
"timestamp": 1626937651674,
"message": "Decoded payload: b'Hello, First.'\n",
"ingestionTime": 1626937657154
},
{
"timestamp": 1626937651674,
"message": "END RequestId: af71154e-1723-4020-b110-c41069d35475\n",
"ingestionTime": 1626937657154
},
{
"timestamp": 1626937651674,
"message": "REPORT RequestId: af71154e-1723-4020-b110-c41069d35475\tDuration: 1.55 ms\tBilled Duration: 2 ms\tMemory Size: 128 MB\tMax Memory Used: 48 MB\tInit Duration: 120.37 ms\t\n",
"ingestionTime": 1626937657154
}
],
"nextForwardToken": "f/36281922023877642931232848957132370540239295426765062147",
"nextBackwardToken": "b/36281922023833041440835787710849299103693998703753101312"
}
Decoded payload: b'Hello, First.'\n
と表示されているので、無事Lambdaが実行されたことが分かりました。
念のため、もう一度Kinesisストリームにレコードを追加して、Lambdaが処理するか確認します。
# aws kinesis put-record \
--stream-name $STREAM_NAME \
--partition-key 1 \
--data "Hello, Second." \
--cli-binary-format raw-in-base64-out
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49620367105734911944180226173656163958973622275153068034"
}
# export LATEST_EVENT=$(aws logs describe-log-streams \
--log-group-name $LOG_GROUP_NAME \
--query 'sort_by(logStreams, &lastEventTimestamp)[].{logStreamName: logStreamName,lastEventTimestamp:lastEventTimestamp}[-1].logStreamName' --output text)
# echo $LATEST_EVENT
2021/07/22/[$LATEST]c7f063e18a6542e594df5ede54c28a52
# aws logs get-log-events \
--log-group-name $LOG_GROUP_NAME \
--log-stream-name $LATEST_EVENT
{
"events": [
{
"timestamp": 1626938946056,
"message": "START RequestId: 8d7be5ff-085c-472f-9d4b-c700809661f1 Version: $LATEST\n",
"ingestionTime": 1626938949676
},
{
"timestamp": 1626938946057,
"message": "Decoded payload: b'Hello, Second.'\n",
"ingestionTime": 1626938949676
},
{
"timestamp": 1626938946059,
"message": "END RequestId: 8d7be5ff-085c-472f-9d4b-c700809661f1\n",
"ingestionTime": 1626938949676
},
{
"timestamp": 1626938946059,
"message": "REPORT RequestId: 8d7be5ff-085c-472f-9d4b-c700809661f1\tDuration: 1.86 ms\tBilled Duration: 2 ms\tMemory Size: 128 MB\tMax Memory Used: 48 MB\tInit Duration: 118.07 ms\t\n",
"ingestionTime": 1626938949676
}
],
"nextForwardToken": "f/36281950889627716731293485576406154355201954809020153859",
"nextBackwardToken": "b/36281950889560814495697893706981547200384009724502212608"
}
Decoded payload: b'Hello, Second.'\n
が確認できたので、Lambdaが実行できていたことが分かりました。
これでチュートリアルは終了です。
最後に、作成したリソースの削除を行います。
リソースの削除
Kinesis削除・確認
# aws kinesis delete-stream \
--stream-name $STREAM_NAME
# aws kinesis describe-stream \
--stream-name $STREAM_NAME
An error occurred (ResourceNotFoundException) when calling the DescribeStream operation: Stream lambda-stream under account 111111111111 not found.
# aws kinesis list-streams
{
"StreamNames": []
}
Lambda削除・確認
# aws lambda delete-function \
--function-name $LAMBDA_NAME
# aws lambda get-function \
--function-name $LAMBDA_NAME
An error occurred (ResourceNotFoundException) when calling the GetFunction operation: Function not found: arn:aws:lambda:ap-northeast-1:111111111111:function:ProcessKinesisRecords
# aws lambda list-functions --query 'Functions[*].FunctionName'
[
"aaa",
"bbb",
"ccc",
"ddd"
]
CloudWatchLogsロググループ削除・確認
# aws logs delete-log-group \
--log-group-name $LOG_GROUP_NAME
# aws logs describe-log-groups \
--log-group-name-prefix "/aws/lambda/" \
--query 'logGroups[*].logGroupName'
[
"/aws/lambda/aaa",
"/aws/lambda/bbb",
"/aws/lambda/ccc",
"/aws/lambda/ddd",
"/aws/lambda/eee"
]
Lambdaイベントソースマッピングが残っていたので削除・確認
# aws lambda list-event-source-mappings
{
"EventSourceMappings": [
{
"UUID": "e8ae25a5-6bd5-43a5-aa60-aca8fd8a91c3",
"StartingPosition": "TRIM_HORIZON",
"BatchSize": 100,
"MaximumBatchingWindowInSeconds": 0,
"ParallelizationFactor": 1,
"EventSourceArn": "arn:aws:kinesis:ap-northeast-1:111111111111:stream/lambda-stream",
"FunctionArn": "arn:aws:lambda:ap-northeast-1:111111111111:function:ProcessKinesisRecords",
"LastModified": "2021-07-22T17:04:00+09:00",
"LastProcessingResult": "PROBLEM: Stream not found. Recreate the stream and re-enable the event source mapping to resume processing from TRIM_HORIZON.",
"State": "Disabled",
"StateTransitionReason": "Unrecoverable error",
"DestinationConfig": {
"OnFailure": {}
},
"MaximumRecordAgeInSeconds": -1,
"BisectBatchOnFunctionError": false,
"MaximumRetryAttempts": -1
}
]
}
# aws lambda delete-event-source-mapping --uuid $MAPPING_ID
{
"UUID": "e8ae25a5-6bd5-43a5-aa60-aca8fd8a91c3",
"StartingPosition": "TRIM_HORIZON",
"BatchSize": 100,
"MaximumBatchingWindowInSeconds": 0,
"ParallelizationFactor": 1,
"EventSourceArn": "arn:aws:kinesis:ap-northeast-1:111111111111:stream/lambda-stream",
"FunctionArn": "arn:aws:lambda:ap-northeast-1:111111111111:function:ProcessKinesisRecords",
"LastModified": "2021-07-22T17:04:00+09:00",
"LastProcessingResult": "PROBLEM: Stream not found. Recreate the stream and re-enable the event source mapping to resume processing from TRIM_HORIZON.",
"State": "Deleting",
"StateTransitionReason": "Unrecoverable error",
"DestinationConfig": {
"OnFailure": {}
},
"MaximumRecordAgeInSeconds": -1,
"BisectBatchOnFunctionError": false,
"MaximumRetryAttempts": -1
}
# aws lambda list-event-source-mappings
{
"EventSourceMappings": []
}
IAMロール削除・確認
# aws iam detach-role-policy \
--role-name $ROLE_NAME \
--policy-arn $POLICY_ARN
# aws iam list-attached-role-policies \
--role-name $ROLE_NAME
{
"AttachedPolicies": []
}
# aws iam delete-role --role-name $ROLE_NAME
# aws iam get-role --role-name $ROLE_NAME
An error occurred (NoSuchEntity) when calling the GetRole operation: The role with name lambda-kinesis-role cannot be found.
おわりに
元々は触ったことのないKinesisを理解するために始めたチュートリアルでしたが、結果的にAWS CLIの勉強になった気がします。
CLIでリソースを作成すると、いろいろな発見があって面白いですね。