はじめに
この記事は下記記事の続きとなっていますので先にこちらをご覧ください。
本記事ではAWS IoT Coreに送信されたデータを時系列データとして蓄えることができるようにします。
データを蓄える基盤を作る
構成
データを蓄えるためのテーブルとしてDynamoDBを、デバイスシャドウから最新データを取得しDynamoDBにアイテムを作成する機能をLambda関数を追加し図のような構成にします。
SAMを使用してDynamoDBとLambda関数を構築する
DynamoDB及びLambda関数はSAM(Serverless Application Model)を使用して構築します。
サンプルアプリケーションの初期化
- 今回使用する
sam cli
のバージョンは1.129.0でした$ sam --version SAM CLI, version 1.129.0
-
sam init
コマンドで対話しながらアプリケーションを作成します$ sam init
- 「Which template source would you like to use?」は楽をするため「1」を選択
Which template source would you like to use? 1 - AWS Quick Start Templates 2 - Custom Template Location Choice: 1
- 「Choose an AWS Quick Start application template」は最小限の「1 - Hello World Example」を選択
Choose an AWS Quick Start application template 1 - Hello World Example 2 - Data processing 3 - Hello World Example with Powertools for AWS Lambda 4 - Multi-step workflow 5 - Scheduled task 6 - Standalone function 7 - Serverless API 8 - Infrastructure event management 9 - Lambda Response Streaming 10 - Serverless Connector Hello World Example 11 - Multi-step workflow with Connectors 12 - GraphQLApi Hello World Example 13 - Full Stack 14 - Lambda EFS example 15 - DynamoDB Example 16 - Machine Learning Template: 1
- 「Use the most popular runtime and package type? (python3.13 and zip) 」は「Enter(N)」を選択し、「runtime」は「14 - python3.9」を「package type」は「1 - Zip」を選択
※今回の環境内のPythonが3.9のためUse the most popular runtime and package type? (python3.13 and zip) [y/N]: Which runtime would you like to use? 1 - dotnet8 2 - dotnet6 3 - go (provided.al2) 4 - go (provided.al2023) 5 - graalvm.java11 (provided.al2) 6 - graalvm.java17 (provided.al2) 7 - java21 8 - java17 9 - java11 10 - java8.al2 11 - nodejs20.x 12 - nodejs18.x 13 - nodejs16.x 14 - python3.9 15 - python3.8 16 - python3.13 17 - python3.12 18 - python3.11 19 - python3.10 20 - ruby3.3 21 - ruby3.2 22 - rust (provided.al2) 23 - rust (provided.al2023) Runtime: 14 What package type would you like to use? 1 - Zip 2 - Image Package type: 1
- アプリケーションの分析やロギング関連のオプションはすべて「Enter(N)」
Would you like to enable X-Ray tracing on the function(s) in your application? [y/N]: Would you like to enable monitoring using CloudWatch Application Insights? For more info, please view https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch-application-insights.html [y/N]: Would you like to set Structured Logging in JSON format on your Lambda functions? [y/N]:
- 「Project name」は「EnvDataStack」としてサンプルアプリケーションの初期化は完了です
Project name [sam-app]: EnvDataStack ----------------------- Generating application: ----------------------- Name: EnvDataStack Runtime: python3.9 Architectures: x86_64 Dependency Manager: pip Application Template: hello-world Output Directory: . Configuration file: EnvDataStack/samconfig.toml Next steps can be found in the README file at EnvDataStack/README.md Commands you can use next ========================= [*] Create pipeline: cd EnvDataStack && sam pipeline init --bootstrap [*] Validate SAM template: cd EnvDataStack && sam validate [*] Test Function in the Cloud: cd EnvDataStack && sam sync --stack-name {stack-name} --watch
ソースコードの追加
IoT Coreの「shadow/update/accepted」イベントを受信してDynamoDBに受信したセンサーデータを格納するLambda関数のコードをこのアプリケーションに追加します。
-
src
フォルダの追加[EnvDataStack]$ mkdir src
-
lambda_function.py
の追加
vi
コマンドで次のPythonスクリプトを追加します。[EnvDataStack]$ vi src/lambda_function.py
lambda_function.pyimport os import sys from typing import Literal from decimal import Decimal from datetime import datetime, timezone from dateutil.relativedelta import relativedelta from logging import Logger, getLogger, Formatter, StreamHandler import boto3 from botocore.config import Config from boto3.dynamodb.types import TypeSerializer class EnvDataPusher: LOGGING_LEVEL: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = os.environ.get("LOGGING_LEVEL", "WARNING") TABLE_ARN: str = os.environ["TABLE_ARN"] def __init__(self) -> None: self._create_logger() self._create_dynamodb_client() def _create_logger(self) -> None: self._logger: Logger = getLogger(self.__class__.__name__) self._logger.propagate = False for handler in self._logger.handlers: self._logger.removeHandler(handler) handler: StreamHandler = StreamHandler(sys.stdout) handler.setFormatter(Formatter("%(asctime)s [%(levelname)s] %(funcName)s %(message)s")) self._logger.addHandler(handler) self._logger.setLevel(self.LOGGING_LEVEL) def _create_dynamodb_client(self) -> None: self._dynamodb_client = boto3.client( "dynamodb", config=Config( retries={ "max_attempts": 5, "mode": "standard" } ) ) def _put_item_to_table(self, type: str, value: Decimal) -> bool: timestamp: datetime = datetime.now(timezone.utc) ttl: datetime = timestamp + relativedelta(days=1) try: self._dynamodb_client.put_item( TableName=self.TABLE_ARN, Item={ k: TypeSerializer().serialize(v) for k, v in { "Type": type, "Timestamp": timestamp.strftime("%Y-%m-%dT%H:%M:%SZ"), "Value": value, "TTL": Decimal(str(ttl.timestamp())) }.items() } ) except Exception as e: self._logger.error(f"Failed put_item, reason:\n{e}") return False return True def execute(self, event: dict) -> bool: temperature: Decimal = Decimal(str(event["state"]["reported"]["temperature"])) humidity: Decimal = Decimal(str(event["state"]["reported"]["humidity"])) pressure: Decimal = Decimal(str(event["state"]["reported"]["pressure"])) concentration: Decimal = Decimal(str(event["state"]["reported"]["concentration"])) self._logger.debug(f"{temperature=}") self._logger.debug(f"{humidity=}") self._logger.debug(f"{pressure=}") self._logger.debug(f"{concentration=}") is_succeeded: bool = True if not self._put_item_to_table("temperature", temperature): is_succeeded = False if not self._put_item_to_table("humidity", humidity): is_succeeded = False if not self._put_item_to_table("pressure", pressure): is_succeeded = False if not self._put_item_to_table("concentration", concentration): is_succeeded = False return is_succeeded pusher: EnvDataPusher = EnvDataPusher() def lambda_handler(event: dict, context: dict) -> None: is_succeeded: bool = pusher.execute(event) if not is_succeeded: raise Exception("データ格納処理に失敗")
Pythonスクリプト自体は受け取ったイベントデータを元にDynamoDBにタイムスタンプ付きでデータを格納する簡単なものにしています。
テンプレートの修正
DynamoDBテーブル、Lambda関数及びLambda関数をトリガーするIoT Ruleを構築することができるようにSAMテンプレートtemplate.yaml
を修正します。
[EnvDataStack]$ vi template.yaml
AWSTemplateFormatVersion: "2010-09-09"
Transform: "AWS::Serverless-2016-10-31"
Description: "EnvDataStack"
Resources:
EnvDataTable: # データ格納用DynamoDBテーブル
Type: "AWS::DynamoDB::Table"
Properties:
AttributeDefinitions:
- AttributeName: "Type"
AttributeType: "S"
- AttributeName: "Timestamp"
AttributeType: "S"
BillingMode: "PROVISIONED"
DeletionProtectionEnabled: true
KeySchema:
- AttributeName: "Type"
KeyType: "HASH"
- AttributeName: "Timestamp"
KeyType: "RANGE"
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
TableName: "EnvDataTable"
TimeToLiveSpecification:
AttributeName: "TTL"
Enabled: true
EnvDataPusher: # データ格納用Lambda関数
Type: "AWS::Serverless::Function"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- "lambda.amazonaws.com"
Action:
- "sts:AssumeRole"
AutoPublishAlias: "Alias"
CodeUri: "src/"
Description: "iot-env-data-collectorのデバイスシャドウ更新に応じて最新のデータをDynamoDBにプッシュする関数"
Environment:
Variables:
TABLE_ARN: !GetAtt "EnvDataTable.Arn"
Events:
ShadowUpdateAccepted:
Type: "IoTRule"
Properties:
Sql: "SELECT * FROM \"$aws/things/iot-env-data-collector/shadow/update/accepted\""
FunctionName: "EnvDataPusher"
Handler: "lambda_function.lambda_handler"
MemorySize: 128
Policies:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Action:
- "logs:CreateLogGroup"
- "logs:CreateLogStream"
- "logs:PutLogEvents"
Resource: "*"
- Effect: "Allow"
Action:
- "dynamodb:PutItem"
Resource: !GetAtt "EnvDataTable.Arn"
Runtime: "python3.9"
Timeout: 30
sam build
コマンドでデプロイの準備をします
Build Succeeded
となればおkです。
[EnvDataStack]$ sam build
Starting Build use cache
Building codeuri: /tmp/EnvDataStack/src runtime: python3.9 architecture: x86_64 functions: EnvDataPusher
requirements.txt file not found. Continuing the build without dependencies.
Running PythonPipBuilder:CopySource
Build Succeeded
Built Artifacts : .aws-sam/build
Built Template : .aws-sam/build/template.yaml
Commands you can use next
=========================
[*] Validate SAM template: sam validate
[*] Invoke Function: sam local invoke
[*] Test Function in the Cloud: sam sync --stack-name {{stack-name}} --watch
[*] Deploy: sam deploy --guided
アプリケーションをデプロイします
アプリケーションをAWSにデプロイしていきます。
-
sam deploy --guided
コマンドで対話的に実施します[EnvDataStack]$ sam deploy --guided
- 「Stack Name」はそのままでいいので「Enter」
Stack Name [EnvDataStack]:
- 「AWS Region」は東京リージョンにデプロイするので「Enter」
AWS Region [ap-northeast-1]:
- 「Confirm changes before deploy」はデプロイの度確認不要なので「n」
#Shows you resources changes to be deployed and require a 'Y' to initiate deploy Confirm changes before deploy [Y/n]: n
- 「Allow SAM CLI IAM role creation」は自動的に必要なロールを作ってもらって問題ないので「Enter(Y)」
#SAM needs permission to be able to create roles to connect to the resources in your template Allow SAM CLI IAM role creation [Y/n]:
- 「Disable rollback」はデプロイに失敗したらロールバックしてほしいので「Enter(N)」
#Preserves the state of previously provisioned resources when an operation fails Disable rollback [y/N]:
- 今まで答えた事を保存してくれるのですべてそのまま「Enter」
Save arguments to configuration file [Y/n]: SAM configuration file [samconfig.toml]: SAM configuration environment [default]:
- すべての質問に回答し終わるとリソースの構築が開始される
定期的に表示が更新されSuccessfully created/updated stack
と表示されればアプリケーションのデプロイは完了です。CloudFormation events from stack operations (refresh every 5.0 seconds) ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- ResourceStatus ResourceType LogicalResourceId ResourceStatusReason ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- CREATE_IN_PROGRESS AWS::CloudFormation::Stack EnvDataStack User Initiated CREATE_IN_PROGRESS AWS::DynamoDB::Table EnvDataTable - CREATE_IN_PROGRESS AWS::DynamoDB::Table EnvDataTable Resource creation Initiated CREATE_COMPLETE AWS::DynamoDB::Table EnvDataTable - CREATE_IN_PROGRESS AWS::IAM::Role EnvDataPusherRole - CREATE_IN_PROGRESS AWS::IAM::Role EnvDataPusherRole Resource creation Initiated CREATE_COMPLETE AWS::IAM::Role EnvDataPusherRole - CREATE_IN_PROGRESS AWS::Lambda::Function EnvDataPusher - CREATE_IN_PROGRESS AWS::Lambda::Function EnvDataPusher Resource creation Initiated CREATE_COMPLETE AWS::Lambda::Function EnvDataPusher - CREATE_IN_PROGRESS AWS::Lambda::Version EnvDataPusherVersion68a151ade3 - CREATE_IN_PROGRESS AWS::Lambda::Version EnvDataPusherVersion68a151ade3 Resource creation Initiated CREATE_COMPLETE AWS::Lambda::Version EnvDataPusherVersion68a151ade3 - CREATE_IN_PROGRESS AWS::Lambda::Alias EnvDataPusherAliasAlias - CREATE_IN_PROGRESS AWS::Lambda::Alias EnvDataPusherAliasAlias Resource creation Initiated CREATE_COMPLETE AWS::Lambda::Alias EnvDataPusherAliasAlias - CREATE_IN_PROGRESS AWS::IoT::TopicRule EnvDataPusherShadowUpdateAccepted - CREATE_IN_PROGRESS AWS::IoT::TopicRule EnvDataPusherShadowUpdateAccepted Resource creation Initiated CREATE_COMPLETE AWS::IoT::TopicRule EnvDataPusherShadowUpdateAccepted - CREATE_IN_PROGRESS AWS::Lambda::Permission EnvDataPusherShadowUpdateAcceptedPermission - CREATE_IN_PROGRESS AWS::Lambda::Permission EnvDataPusherShadowUpdateAcceptedPermission Resource creation Initiated CREATE_COMPLETE AWS::Lambda::Permission EnvDataPusherShadowUpdateAcceptedPermission - CREATE_COMPLETE AWS::CloudFormation::Stack EnvDataStack - ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Successfully created/updated stack - EnvDataStack in ap-northeast-1
収集したセンサーデータを確認する
SAMはCloudFormationの拡張したものであるため、今回構築したリソースやスタックのステータスなどはCloudFormationのマネコンからでも確認することができます。
マネコンで「CloudFormation > スタック > EnvDataStack > リソースタブ」を開くと作成したリソースが表示されています。
DynamoDBテーブルである「EnvDataTable」を開き「項目の探索」を行うと1分間隔で各種センサーデータが蓄えられていることが確認できます。
さいごに
IoT Coreに送信されたセンサーデータをDynamoDBに時系列データとして蓄えることができるようになりました。
しかしセンサ―データを時系列データとして蓄えることができるようになったことで、欲が出てきてグラフィカルに表示させたくなりますよね。
次の記事では各センサーデータをグラフィカルに表示するWEBアプリを作成して〆たいと思います。