LoginSignup
0
0

More than 1 year has passed since last update.

【AWS DVA学習】Amazon KinesisとLambdaを使用するチュートリアルをやってみた

Last updated at Posted at 2021-07-22

はじめに

AWS DVA取得のためのアウトプットが目的です。

対象チュートリアル

公式ドキュメントのチュートリアル: Amazon Kinesis で AWS Lambda を使用する

やってみた

チュートリアルがCLIで書かれていたので、最後までコンソールではなくCLIで実行しています。

AWS CLIのバージョンが2のため、バイナリ関係で追加オプションを記載しています。

# aws --version
aws-cli/2.1.3 Python/3.7.4 Darwin/20.3.0 exe/x86_64

Lambdaの実行ロール作成

CloudWatch Losgへの書き込み権限と、Kinesisからの読み込み権限を持つIAMロールを作成します。

まず、Lambdaに対して信頼関係のある、IAMロールの信頼ポリシーをJSON形式で作成します。

# vi lambda.json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "1",
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "lambda.amazonaws.com"
        ]
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

作成した信頼ポリシーが書かれたJSONファイルを使用し、IAMロールを作成します。

# export ROLE_NAME="lambda-kinesis-role"

# aws iam create-role --role-name $ROLE_NAME \
--assume-role-policy-document file://lambda.json
{
    "Role": {
        "Path": "/",
        "RoleName": "lambda-kinesis-role",
        "RoleId": "AROAUMCLE6NZSYX7HX4HH",
        "Arn": "arn:aws:iam::111111111111:role/lambda-kinesis-role",
        "CreateDate": "2021-07-22T04:36:41+00:00",
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "1",
                    "Effect": "Allow",
                    "Principal": {
                        "Service": [
                            "lambda.amazonaws.com"
                        ]
                    },
                    "Action": "sts:AssumeRole"
                }
            ]
        }
    }
}

IAMロールにアタッチするAWSマネージドポリシーであるAWSLambdaKinesisExecutionRoleのARNを取得します。

# export POLICY_ARN=$(aws iam list-policies --scope AWS \
--policy-usage-filter PermissionsPolicy \
--query 'Policies[?PolicyName==`AWSLambdaKinesisExecutionRole`].Arn' \
--output text) 

# echo $POLICY_ARN
arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole

このマネージドポリシーは、以下の通りCloudWatch Logsへの書き込みとKinesisの読み取り権限を持っています。

# export POLICY_VERSION=$(aws iam get-policy \
--policy-arn $POLICY_ARN \
--query 'Policy.DefaultVersionId' \
--output text)

# echo $POLICY_VERSION 
v2

# aws iam get-policy-version \
--version-id $POLICY_VERSION \
--policy-arn $POLICY_ARN 
{
    "PolicyVersion": {
        "Document": {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Action": [
                        "kinesis:DescribeStream",
                        "kinesis:DescribeStreamSummary",
                        "kinesis:GetRecords",
                        "kinesis:GetShardIterator",
                        "kinesis:ListShards",
                        "kinesis:ListStreams",
                        "kinesis:SubscribeToShard",
                        "logs:CreateLogGroup",
                        "logs:CreateLogStream",
                        "logs:PutLogEvents"
                    ],
                    "Resource": "*"
                }
            ]
        },
        "VersionId": "v2",
        "IsDefaultVersion": true,
        "CreateDate": "2018-11-19T20:09:24+00:00"
    }
}

IAMロールにAWSマネージドポリシーであるAWSLambdaKinesisExecutionRoleをアタッチします。

# aws iam attach-role-policy \
--role-name $ROLE_NAME \                       
--policy-arn $POLICY_ARN                          

アタッチされたことを確認します。
list-attached-role-policiesは、ロールにアタッチされたマネージドポリシーを表示します。
なお、インラインポリシーを表示するのは別のコマンドです。

# aws iam list-attached-role-policies \
--role-name $ROLE_NAME 
{
    "AttachedPolicies": [
        {
            "PolicyName": "AWSLambdaKinesisExecutionRole",
            "PolicyArn": "arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole"
        }
    ]
}

Lambda作成

まず、以下のサンプルコードをコピペしてコードを作成します。
この後Lambdaのテストを行う際、Lambdaの成功を分かりやすくするために、returnを追加します。
チュートリアルではNode.jsでしたが、私は普段Pythonを使用しているのでPythonを選択しました。

# vi lambda_function.py 

from __future__ import print_function
#import json
import base64
def lambda_handler(event, context):
    for record in event['Records']:
       #Kinesis data is base64 encoded so decode here
       payload=base64.b64decode(record["kinesis"]["data"])
       print("Decoded payload: " + str(payload))
       return str(payload)

作成したコードをZip化します。

zip function.zip lambda_function.py 

Lambdaを作成します。
先ほど作成したIAMロールのARNを指定しなければならないので、IAMロールのARNを取得してから作成しています。
handlerでは、Lambdaが呼び出されると実行する関数を指定します。
今回、Lambda_function.pyというファイル内のLambda_handler関数を実行したいので、handlerをlambda_function.lambda_handlerとしてLambdaを作成しています。

# export ROLE_ARN=$(aws iam get-role \
--role-name $ROLE_NAME \
--query 'Role.Arn' \
--output text)

# echo $ROLE_ARN
arn:aws:iam::111111111111:role/lambda-kinesis-role

# export LAMBDA_NAME="ProcessKinesisRecords"

# aws lambda create-function --function-name $LAMBDA_NAME \
--zip-file fileb://function.zip --handler lambda_function.lambda_handler --runtime python3.7 \
--role $ROLE_ARN 
{
    "FunctionName": "ProcessKinesisRecords",
    "FunctionArn": "arn:aws:lambda:ap-northeast-1:111111111111:function:ProcessKinesisRecords",
    "Runtime": "python3.7",
    "Role": "arn:aws:iam::111111111111:role/lambda-kinesis-role",
    "Handler": "index.handler",
    "CodeSize": 360,
    "Description": "",
    "Timeout": 3,
    "MemorySize": 128,
    "LastModified": "2021-07-22T05:29:11.313+0000",
    "CodeSha256": "dDwoJP20Yk7Dpk936dXZL4LPURp+gG2XmGfkzpOfKno=",
    "Version": "$LATEST",
    "TracingConfig": {
        "Mode": "PassThrough"
    },
    "RevisionId": "7fa1afa5-b052-435a-8621-598724bc048a",
    "State": "Active",
    "LastUpdateStatus": "Successful"
}

Lambdaテスト

Kinesisで実行する前に、Lambdaでテストします。
以下のチュートリアル提供のJSONを使用します。

# vi input.txt
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}

テストを行います。
応答がout.txtに保存されるので、確認します。

# aws lambda invoke \
--function-name $LAMBDA_NAME \
--payload file://input.txt out.txt \
--cli-binary-format raw-in-base64-out
{
    "StatusCode": 200,
    "ExecutedVersion": "$LATEST"
} 

# cat out.txt
"b'Hello, this is a test.'"%                       

returnで、input.txtのRecords[0].kinesis.dataのデコードされた文字が返却されているので、成功したことがわかります。

エラーの場合は次のように表示されました。

{
    "StatusCode": 200,
    "FunctionError": "Unhandled",
    "ExecutedVersion": "$LATEST"
}

# cat out.txt 
{"errorMessage": "Syntax error in module 'lambda_function': inconsistent use of tabs and spaces in indentation (lambda_function.py, line 9)", "errorType": "Runtime.UserCodeSyntaxError", "stackTrace": ["  File \"/var/task/lambda_function.py\" Line 9\n    \t\t\t return payload\n"]}%                                                   

Kinesisストリームを作成

Kinesisストリームを作成し、そのARNを取得します。

# export STREAM_NAME="lambda-stream" 

# aws kinesis create-stream \
--stream-name $STREAM_NAME \
--shard-count 1

# export STREAM_ARN=$(aws kinesis describe-stream \
--stream-name $STREAM_NAME \
--query 'StreamDescription.StreamARN' \
--output text)

# echo $STREAM_ARN 
arn:aws:kinesis:ap-northeast-1:111111111111:stream/lambda-stream

Lambdaにイベントソースを追加

LambdaのイベントソースにKinesisを追加します。
後ほど使用するため、マッピングIDを取得(チュートリアルの指示ですが、後ほど使用することはありませんでした。ただしこの記事では削除の時に使用します。)。

# aws lambda create-event-source-mapping --function-name $LAMBDA_NAME \
> --event-source-arn $STREAM_ARN \
> --batch-size 100 \
> --starting-position LATEST
{
    "UUID": "e8ae25a5-6bd5-43a5-aa60-aca8fd8a91c3",
    "StartingPosition": "LATEST",
    "BatchSize": 100,
    "MaximumBatchingWindowInSeconds": 0,
    "ParallelizationFactor": 1,
    "EventSourceArn": "arn:aws:kinesis:ap-northeast-1:111111111111:stream/lambda-stream",
    "FunctionArn": "arn:aws:lambda:ap-northeast-1:111111111111:function:ProcessKinesisRecords",
    "LastModified": "2021-07-22T15:52:29.098000+09:00",
    "LastProcessingResult": "No records processed",
    "State": "Creating",
    "StateTransitionReason": "User action",
    "DestinationConfig": {
        "OnFailure": {}
    },
    "MaximumRecordAgeInSeconds": -1,
    "BisectBatchOnFunctionError": false,
    "MaximumRetryAttempts": -1
}


# aws lambda list-event-source-mappings \
--function-name $LAMBDA_NAME \         
--event-source-arn $STREAM_ARN 
{
    "EventSourceMappings": [
        {
            "UUID": "e8ae25a5-6bd5-43a5-aa60-aca8fd8a91c3",
            "StartingPosition": "LATEST",
            "BatchSize": 100,
            "MaximumBatchingWindowInSeconds": 0,
            "ParallelizationFactor": 1,
            "EventSourceArn": "arn:aws:kinesis:ap-northeast-1:111111111111:stream/lambda-stream",
            "FunctionArn": "arn:aws:lambda:ap-northeast-1:111111111111:function:ProcessKinesisRecords",
            "LastModified": "2021-07-22T15:53:00+09:00",
            "LastProcessingResult": "No records processed",
            "State": "Enabled",
            "StateTransitionReason": "User action",
            "DestinationConfig": {
                "OnFailure": {}
            },
            "MaximumRecordAgeInSeconds": -1,
            "BisectBatchOnFunctionError": false,
            "MaximumRetryAttempts": -1
        }
    ]
}

# export MAPPING_ID=$(aws lambda list-event-source-mappings \
--function-name $LAMBDA_NAME \
--event-source-arn $STREAM_ARN \
--query 'EventSourceMappings[0].UUID' \
--output text)

# echo $MAPPING_ID 
e8ae25a5-6bd5-43a5-aa60-aca8fd8a91c3

セットアップをテスト

Kinesisストリームにイベントレコードを追加し、Lambdaで処理されるか確認します。
Kinesisストリームにイベントレコード追加。

# aws kinesis put-record \
--stream-name $STREAM_NAME \
--partition-key 1 \
--data "Hello, First." \
--cli-binary-format raw-in-base64-out
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49620367105734911944180226173654955033153918722975465474"
}

Lambdaが実行されたか確認するため、CloudWatch Logsのログストリームを確認します。
まず、ロググループの最新のログストリーム名を、クエリのソートにより取得します。

# export LOG_GROUP_NAME="/aws/lambda/$LAMBDA_NAME"

# export LATEST_EVENT=$(aws logs describe-log-streams \
--log-group-name $LOG_GROUP_NAME \
--query 'sort_by(logStreams, &lastEventTimestamp)[].{logStreamName: logStreamName,lastEventTimestamp:lastEventTimestamp}[-1].logStreamName' \
--output text)

# echo $LATEST_EVENT 
2021/07/22/[$LATEST]9e1599c0f1cd4a6c90fb1966fe7695be

describe-log-streamsでログストリームが表示されますが、その順番はおそらく数値・アルファベット順であり、ログが送信された時刻とは関係ありません。

そのため、lastEventTimestampの表示が最新のログストリームが、最後にLambdaが実行したログだと考えられます。

lastEventTimestampはエポック秒のため、ログストリームの中でその値が一番大きなログストリーム名を取得しました。

以下のLambdaのロググループ内のログストリーム一覧を見ると、最新版として取得したログストリーム2021/07/22/[$LATEST]9e1599c0f1cd4a6c90fb1966fe7695beが一番下にあり、ログストリームの中で最もエポック秒が大きいことが分かります。

# aws logs describe-log-streams \
--log-group-name $LOG_GROUP_NAME \
--query 'sort_by(logStreams, &lastEventTimestamp)[].{logStreamName: logStreamName,lastEventTimestamp:lastEventTimestamp}'
[
    {
        "logStreamName": "2021/07/22/[$LATEST]939c96448db643468102506b5ac6b506",
        "lastEventTimestamp": 1626932290474
    },
    {
        "logStreamName": "2021/07/22/[$LATEST]b73cd4b6ff084f1fa67bffc50455cdb4",
        "lastEventTimestamp": 1626932533447
    },
    {
        "logStreamName": "2021/07/22/[$LATEST]66b34064a27340a9b33b4dbd09f11cd3",
        "lastEventTimestamp": 1626932697007
    },
    {
        "logStreamName": "2021/07/22/[$LATEST]7fbfd3b7fbe444ff9ced5a1fde5fa1a4",
        "lastEventTimestamp": 1626933502267
    },
    {
        "logStreamName": "2021/07/22/[$LATEST]137462ac70dc418eb196b7597fbebfc1",
        "lastEventTimestamp": 1626933926801
    },
    {
        "logStreamName": "2021/07/22/[$LATEST]9e1599c0f1cd4a6c90fb1966fe7695be",
        "lastEventTimestamp": 1626937651674
    }
]

取得したログストリーム名を使用して、LambdaがKinesisストリームに追加されたレコードを処理したか確認します。

# aws logs get-log-events \
--log-group-name $LOG_GROUP_NAME \
--log-stream-name $LATEST_EVENT 
{
    "events": [
        {
            "timestamp": 1626937651672,
            "message": "START RequestId: af71154e-1723-4020-b110-c41069d35475 Version: $LATEST\n",
            "ingestionTime": 1626937657154
        },
        {
            "timestamp": 1626937651674,
            "message": "Decoded payload: b'Hello, First.'\n",
            "ingestionTime": 1626937657154
        },
        {
            "timestamp": 1626937651674,
            "message": "END RequestId: af71154e-1723-4020-b110-c41069d35475\n",
            "ingestionTime": 1626937657154
        },
        {
            "timestamp": 1626937651674,
            "message": "REPORT RequestId: af71154e-1723-4020-b110-c41069d35475\tDuration: 1.55 ms\tBilled Duration: 2 ms\tMemory Size: 128 MB\tMax Memory Used: 48 MB\tInit Duration: 120.37 ms\t\n",
            "ingestionTime": 1626937657154
        }
    ],
    "nextForwardToken": "f/36281922023877642931232848957132370540239295426765062147",
    "nextBackwardToken": "b/36281922023833041440835787710849299103693998703753101312"
}

Decoded payload: b'Hello, First.'\nと表示されているので、無事Lambdaが実行されたことが分かりました。

念のため、もう一度Kinesisストリームにレコードを追加して、Lambdaが処理するか確認します。

# aws kinesis put-record \
--stream-name $STREAM_NAME \
--partition-key 1 \
--data "Hello, Second." \
--cli-binary-format raw-in-base64-out
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49620367105734911944180226173656163958973622275153068034"
}

# export LATEST_EVENT=$(aws logs describe-log-streams \
--log-group-name $LOG_GROUP_NAME \
--query 'sort_by(logStreams, &lastEventTimestamp)[].{logStreamName: logStreamName,lastEventTimestamp:lastEventTimestamp}[-1].logStreamName' --output text)

# echo $LATEST_EVENT
2021/07/22/[$LATEST]c7f063e18a6542e594df5ede54c28a52

# aws logs get-log-events \
--log-group-name $LOG_GROUP_NAME \
--log-stream-name $LATEST_EVENT
{
    "events": [
        {
            "timestamp": 1626938946056,
            "message": "START RequestId: 8d7be5ff-085c-472f-9d4b-c700809661f1 Version: $LATEST\n",
            "ingestionTime": 1626938949676
        },
        {
            "timestamp": 1626938946057,
            "message": "Decoded payload: b'Hello, Second.'\n",
            "ingestionTime": 1626938949676
        },
        {
            "timestamp": 1626938946059,
            "message": "END RequestId: 8d7be5ff-085c-472f-9d4b-c700809661f1\n",
            "ingestionTime": 1626938949676
        },
        {
            "timestamp": 1626938946059,
            "message": "REPORT RequestId: 8d7be5ff-085c-472f-9d4b-c700809661f1\tDuration: 1.86 ms\tBilled Duration: 2 ms\tMemory Size: 128 MB\tMax Memory Used: 48 MB\tInit Duration: 118.07 ms\t\n",
            "ingestionTime": 1626938949676
        }
    ],
    "nextForwardToken": "f/36281950889627716731293485576406154355201954809020153859",
    "nextBackwardToken": "b/36281950889560814495697893706981547200384009724502212608"
}

Decoded payload: b'Hello, Second.'\nが確認できたので、Lambdaが実行できていたことが分かりました。

これでチュートリアルは終了です。
最後に、作成したリソースの削除を行います。

リソースの削除

Kinesis削除・確認
# aws kinesis delete-stream \
--stream-name $STREAM_NAME 

# aws kinesis describe-stream \
--stream-name $STREAM_NAME 

An error occurred (ResourceNotFoundException) when calling the DescribeStream operation: Stream lambda-stream under account 111111111111 not found.

# aws kinesis list-streams
{
    "StreamNames": []
}

Lambda削除・確認
# aws lambda delete-function \            
--function-name $LAMBDA_NAME 

# aws lambda get-function \
--function-name $LAMBDA_NAME 

An error occurred (ResourceNotFoundException) when calling the GetFunction operation: Function not found: arn:aws:lambda:ap-northeast-1:111111111111:function:ProcessKinesisRecords

# aws lambda list-functions --query 'Functions[*].FunctionName'
[
    "aaa",
    "bbb",
    "ccc",
    "ddd"
]

CloudWatchLogsロググループ削除・確認
# aws logs delete-log-group \
--log-group-name $LOG_GROUP_NAME

# aws logs describe-log-groups \
--log-group-name-prefix "/aws/lambda/" \
--query 'logGroups[*].logGroupName'
[
    "/aws/lambda/aaa",
    "/aws/lambda/bbb",
    "/aws/lambda/ccc",
    "/aws/lambda/ddd",
    "/aws/lambda/eee"
]


Lambdaイベントソースマッピングが残っていたので削除・確認
# aws lambda list-event-source-mappings 
{
    "EventSourceMappings": [
        {
            "UUID": "e8ae25a5-6bd5-43a5-aa60-aca8fd8a91c3",
            "StartingPosition": "TRIM_HORIZON",
            "BatchSize": 100,
            "MaximumBatchingWindowInSeconds": 0,
            "ParallelizationFactor": 1,
            "EventSourceArn": "arn:aws:kinesis:ap-northeast-1:111111111111:stream/lambda-stream",
            "FunctionArn": "arn:aws:lambda:ap-northeast-1:111111111111:function:ProcessKinesisRecords",
            "LastModified": "2021-07-22T17:04:00+09:00",
            "LastProcessingResult": "PROBLEM: Stream not found. Recreate the stream and re-enable the event source mapping to resume processing from TRIM_HORIZON.",
            "State": "Disabled",
            "StateTransitionReason": "Unrecoverable error",
            "DestinationConfig": {
                "OnFailure": {}
            },
            "MaximumRecordAgeInSeconds": -1,
            "BisectBatchOnFunctionError": false,
            "MaximumRetryAttempts": -1
        }
    ]
}

# aws lambda delete-event-source-mapping --uuid $MAPPING_ID 
{
    "UUID": "e8ae25a5-6bd5-43a5-aa60-aca8fd8a91c3",
    "StartingPosition": "TRIM_HORIZON",
    "BatchSize": 100,
    "MaximumBatchingWindowInSeconds": 0,
    "ParallelizationFactor": 1,
    "EventSourceArn": "arn:aws:kinesis:ap-northeast-1:111111111111:stream/lambda-stream",
    "FunctionArn": "arn:aws:lambda:ap-northeast-1:111111111111:function:ProcessKinesisRecords",
    "LastModified": "2021-07-22T17:04:00+09:00",
    "LastProcessingResult": "PROBLEM: Stream not found. Recreate the stream and re-enable the event source mapping to resume processing from TRIM_HORIZON.",
    "State": "Deleting",
    "StateTransitionReason": "Unrecoverable error",
    "DestinationConfig": {
        "OnFailure": {}
    },
    "MaximumRecordAgeInSeconds": -1,
    "BisectBatchOnFunctionError": false,
    "MaximumRetryAttempts": -1
}

# aws lambda list-event-source-mappings                    
{
    "EventSourceMappings": []
}

IAMロール削除・確認
# aws iam detach-role-policy \
--role-name $ROLE_NAME \
--policy-arn $POLICY_ARN 

#  aws iam list-attached-role-policies \
--role-name $ROLE_NAME
{
    "AttachedPolicies": []
}

# aws iam delete-role --role-name $ROLE_NAME                                

# aws iam get-role --role-name $ROLE_NAME 

An error occurred (NoSuchEntity) when calling the GetRole operation: The role with name lambda-kinesis-role cannot be found.

おわりに

元々は触ったことのないKinesisを理解するために始めたチュートリアルでしたが、結果的にAWS CLIの勉強になった気がします。
CLIでリソースを作成すると、いろいろな発見があって面白いですね。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0