Edited at

ServerlessFrameworkでIoT向けのデータ処理フロー(AWS)をデプロイできるようにした

More than 1 year has passed since last update.


はじめに

私がいる部署(チーム)では、IoT.kyotoというセンサー等から送信されてくるデータを、簡単なグラフで可視化するという(基本)無料サービスを持っています。

AWSに環境を構築し、そのデータを取得してグラフ化しているのですが、そのAWS環境をお客様でもできるように、AWS環境の構築の仕方的なものを用意しています。

手順に沿ってコンソール画面でぽちぽちしていけば環境の構築ができるのですが、「コマンド一発で構築できたらもっと楽じゃない?」と感じたので、ServerlessFrameworkを使って、 コマンド一発で環境を構築できるようにしました。

あとは、チームの人間がこういうのにも興味持って欲しいなーという願望も含まれていますが…w


筆者のマシンスペック

念のため

マシン : MacBook Pro

OS : macOS Sierra

プロセッサ : 2.3GHz Intel Core i5

メモリ : 16GB


前提条件


  • AWSのアカウントを持っている、またはIAMユーザーを作成することができる

  • AWS-CLIインストール済み (参考ページ)


完成形

KinesisDataStreams経由とAWSIoT経由の2本立てでお送りいたします。

スクリーンショット 2018-03-13 16.50.09.png


ソースコード

ここにあります。


ディレクトリ構成

ディレクトリはこんな感じです。

-- index.js

|
|- handlers
| |
| |- put_dynamo_form_kinesis.js
| - put_dynamo_form_iot.js
|
|- serverless.yml
|- node_modules
- package.json


デプロイで構築されるリソースたち


  • Lambda

  • DynamoDB

  • KinesisDataStreams

  • AWSIoT(ルールのみ)


Serverless Frameworkのインストール

正直ググれば出てきますが、npmで入れる方法は下記の通りです。

$ npm install -g severless


Credentialsの設定

デプロイするAWSのアカウントを作成して、そのKey情報を設定します。

$ aws configure

AWS Access Key ID [None]: IAMユーザーのアクセスキー
AWS Secret Access Key [None]: IAMユーザーのシークレットアクセスキー
Default region name [None]: ap-northeast-1
Default output format [None]:


必要なパッケージ類のインストール

package.jsonがなければinitして生成してください

$ npm init

$ npm install --save aws-sdk co


serverless.yml

どーん。

一応コメントで少し説明をしています。

また、serverless.yml 内の各種リソース名は適宜変更していただき、判別しやすいよう、運用しやすいようにしてください。


serverless.yml


service: iot-serverless-sample

# 基本設定
# accountIDは適宜修正してください
custom:
defaultStage: prod
defaultRegion: ap-northeast-1
accountID: xxxxxxxxxxxx

# LambdaのruntimeやiamRoleなどの設定
# iamRole
# DynamoDB TableName:iot-serverless-sampleに対してGet/Query/Put/Updateを許可
# Kinesis StreamName:iot-serverless-kinesisに対してDescribeStream/GetRecords/GetShardIterator/ListStreamsを許可
provider:
name: aws
runtime: nodejs6.10
stage: ${opt:stage, self:custom.defaultStage}
region: ${opt:region, self:custom.defaultRegion}
iamRoleStatements:
- Effect: "Allow"
Resource: "arn:aws:dynamodb:${self:provider.region}:*:table/iot-serverless-dynamo"
Action:
- "dynamodb:GetItem"
- "dynamodb:PutItem"
- "dynamodb:UpdateItem"
- "dynamodb:Query"
- Effect: "Allow"
Resource: "arn:aws:kinesis:${self:provider.region}:*:stream/iot-serverless-kinesis"
Action:
- "kinesis:DescribeStream"
- "kinesis:GetRecords"
- "kinesis:GetShardIterator"
- "kinesis:ListStreams"

# Lambdaのhandlerやevent(トリガー)の設定
# LambdaFunctionName:putDynamoDBfromKinesisはkinesis経由でDynamoDBにPUTするLambda
# LambdaFunctionName:putDynamoDBfromIoTはAWSIoT経由でDynamoDBにPUTするLambda
# KinesisのARNにアカウント番号が入っているので、アカウントごとに修正する必要があります ※なんとかして直接書かずに済む方法が模索中
# KinesisかAWSIoTのどっちかだけ必要な場合が多いと思うのでコメントアウトしてください
functions:
putDynamoDBfromKinesis:
handler: index.put_dynamo_form_kinesis
description: iot-serverless-sample LambdaFunction from Kinesis
events:
- stream:
arn: "arn:aws:kinesis:${self:provider.region}:${self:custom.accountID}:stream/iot-serverless-kinesis"
batchSize: 100
startingPosition: LATEST
putDynamoDBfromIoT:
handler: index.put_dynamo_from_iot
description: iot-serverless-sample LambdaFunction from IoT
events:
- iot:
name: LambdaInvokeRole
description: AWSIoTからLambdaをinvokeする
sql: "SELECT * FROM 'iot-serverless-topic'"
sqlVersion: beta

# その他各種サービスリソースの設定
# 初回デプロイ時にDynamoDBのテーブルとKinesisDataStreamsを作成している
# DynmoDB
# TableName: iot-serverless-dynamo
# HashKey: deviceID(string)
# RangeKey: record_time(string)
# Capacity: Read/Writeともに1 必要に応じて調整してください
# KinesisDataStreams
# StreamName: iot-serverless-kinesis
# Shard: 1
resources:
Resources:
DynamoDB:
Type: "AWS::DynamoDB::Table"
Properties:
TableName: iot-serverless-dynamo
AttributeDefinitions:
- AttributeName: deviceID
AttributeType: S
- AttributeName: record_time
AttributeType: S
KeySchema:
- AttributeName: deviceID
KeyType: HASH
- AttributeName: record_time
KeyType: RANGE
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
KinesisDataStreams:
Type: "AWS::Kinesis::Stream"
Properties:
Name: "iot-serverless-kinesis"
ShardCount: 1



Lambda

少し特殊な呼び方をしています。

serverless.ymlのhandlerには index.put_dynamo_form_kinesis と書いていますが、これは index.js 内でexportされている put_dynamo_form_kinesis をhandlerとして扱うという意味になります。

各handlerはhandlersというディレクトリにまとめてます。


index.js

"use strict";

module.exports = {
put_dynamo_form_kinesis: require("./handlers/put_dynamo_form_kinesis"),
put_dynamo_from_iot: require("./handlers/put_dynamo_form_iot")
};



handlers/put_dynamo_form_kinesis.js

"use strict";

const AWS = require("aws-sdk");
const dynamoDB = new AWS.DynamoDB.DocumentClient({
region: process.env.AWS_REGION
});

module.exports = (event, context, callback) => {
console.log(`event: ${JSON.stringify(event)}`);

co(function* () {
const items = event["Records"];
console.log(`items: ${JSON.stringify(items)}`);
const promises = items.map(item => {
const payload = new Buffer(item, "base64").toString();
const data = JSON.parse(payload);
const param = {
TableName: "iot-serverless-dynamo",
Item: data
};
return dynamoDB.put(param).promise();
});
return Promise.all(promises);
}).then(() => {
console.log("All done");
callback(null);
}).catch((err) => {
console.error(err);
callback(err);
});
};



handlers/put_dynamo_form_iot.js

"use strict";

const co = require("co");
const AWS = require("aws-sdk");
const dynamoDB = new AWS.DynamoDB.DocumentClient({
region: process.env.AWS_REGION
});

module.exports = (event, context, callback) => {
console.log(`event: ${JSON.stringify(event)}`);
const param = {
TableName: "iot-serverless-dynamo",
Item: event
};
return dynamoDB.put(param).promise()
.then(() => {
console.log("All done");
callback(null);
}).catch((err) => {
console.error(err);
callback(err);
});
};



あとはデプロイ

deployコマンドでデプロイが行われます。 v オプションをつけると詳細のログ情報が表示されるようになります。

$ sls deploy -v


あとは試すだけ♫

サンプルのJSONファイルを作成します。


sample.json

{"val_1": "testtesttest", "val_2": "1234567890", "val_3": "q2w3e4r5t6y7u8i", "deviceID": "sample0001", "record_time": "2018-03-13T00:00:00.000+0900"}


KinesisDataStreamsとAWSIoTにAWSCLI経由でテストデータを送信してみます。

KinesisDataStreams

$ aws kinesis put-record --stream-name iot-serverless-kinesis --data file://sample.json --partition-key "deviceID"

AWS IoT

$ aws iot-data publish --topic iot-serverless-topic --payload file://sample.json

無事DynamoDBまでデータがPUTされたらOKです。


後片付け

removeコマンドで一発です。

$ sls remove


まとめ

ServerlessFrameworkを使って、センサーデータがDynamoDBにPUTされる構成を簡単?に構築できました。

CloudFormationのラッパーということもあり、CloudFormationのymlファイルを書くよりかは、簡単に書くことができるのかなと実感しました。

ただ、CloudFormationでできることがServerlessFrameworkだとできないみたいなこともあるようなので、まだまだ模索中のところもあります。

まだ改善の余地は十分にあると思いますが、一旦まとめてチーム内に共有するものを作りました。

これでメンバーが興味持ってくれたらいいんですが…w

ではまた!