Node.js
AWS
cloud9
DynamoDB
lambda

エヌシーアイ総合システムのmizu-ponです。

主に金融業務系システムの開発をしています。
AWSは勉強中ですが、知り得た知識などを載せていければと思います。

今回はLambdaからDynamoDBの操作(参照/更新)を行うまでの話です。

Lambdaって?

AWS

AWSが提供するサービスのひとつ。
ソースコード実行のためにサーバを立てる必要がなく色々と楽。(サーバレスアーキテクチャ)
幾つかの言語(Java,node.js(JavaScript),C#,Pythonなど)に対応している。

DynamoDBって?

AWS

AWSが提供するNoSQLデータベース。
NoSQLは一般的に高速かつスケールアウトに強く、IoT/ビッグデータ向きといわれている。
データをキーとバリュー(値)の組み合わせで管理する(Key-Value型)

AWSコンソール画面

こんな感じです。リージョン(拠点)の指定や各種サービスの呼出しができます。

AWS

使用するテーブルの定義

今回はIoTセンサーデータをイメージした以下のテーブルを使用します。
DynamoDBのテーブルは、主キーとデータ部からなるシンプルな構成しか許容しません。
主キーにはパーティションキー(HASH)とソートキー(RANGE)を指定できます。
NoSQLなのでSQLでのデータ操作はできず(当たり前か…)、操作コマンド(メソッド)を使ってデータ操作します。

Field Type Key
datetime String HASH
sensor_mac String RANGE
info JSON

更に、infoには以下のようなデータが含まれます。

gw_mac rssi fix battery status reserve user fix2 reserve2 timestamp exist

開発環境(Cloud9)

開発環境にはAWSの統合開発環境サービスであるCloud9を利用します。

Cloud9

データの更新

更新用のソースコードは以下のリンクからダウンロードできます。ひとつずつ解説していきます。
Markdown: JavaScript

初めの部分でrequireでライブラリ参照しています。
csvtojson:csv形式をjsonに変換
date-utils:日時のフォーマットを行う

console.log('Loading function');

const csv = require('csvtojson');
require('date-utils');

次にAWSやDynamoDBに接続するためにaws-sdkを読み込んでいます。
AWS.DynamoDB.DocumentClientはDynamoDBのレコード操作を行うクラスです。

const AWS = require("aws-sdk");

AWS.config.update({
    region: "us-west-2"
});

const docClient = new AWS.DynamoDB.DocumentClient();

exports.handlerはLambdaの関数定義の書式です。
eventには実行時の引数が入ります。

exports.handler = (event, context, callback) => {

    console.log('readdata start.');

    let sensor_data = event.payload;

今回はセンサーデータとして以下の形式のデータを指定して渡します。

{
    "payload": "$GPRP,24718902B258,FE284D4457BC,-68,02010612FF0D0082BC350100FFFFFFFF000001000000"
}

次のコードでは、引数の形式からDynamoDBに格納する形式への変換を実施しています。

    // "$GPRP" is not need
    var sensor_data_non_gprp = sensor_data.replace(/^(\$GPRP,)/g, "");

    let payload = sensor_data_non_gprp.match(/^(.{12},.{12},.*,)(.*)/);
    let split = payload[2].match(/^(02010612FF0D0082BC)(.{4})(.{2})(.{8})(.{4})(.{2})(.{6})/); 

    // split payload.
    let fix = split[1];
    let battery = split[2];
    let status = split[3];
    let reserve = split[4];
    let user = split[5];
    let fix2 = split[6];
    let reserve2 = split[7];

    // make existFlg
    var existFlg = 0;
    if (status !== "00") {
        existFlg = 1;
    }

    let now = new Date();
    var sensor_data_split_payload = payload[1]
                        + fix + "," + battery + ","
                        + status + "," + reserve + ","
                        + user + "," + fix2+ ","
                        + reserve2 + "," + now.toFormat('YYYY-MM-DD_HH24:MI:SS.') + now.getTime() + ","
                        + existFlg;

さらに次のコードでcsv形式からDynamoDBに登録するJSON形式への変換を実施しています。

    csv({
        noheader: true,
        headers: [
            'sensor_mac',
            'gw_mac',
            'rssi',
            'fix',
            'battery',
            'status',
            'reserve',
            'user',
            'fix2',
            'reserve2',
            'timestamp',
            'exist'],
        colParser:{
            "exist":"number"
        }
    }).fromString(sensor_data_split_payload)
        .on('json',(jsonObj)=>{

        // Insert DynamoDB
        var params = {
            TableName:table,
            Item:{
                "datetime": jsonObj.timestamp,
                "sensor_mac": jsonObj.sensor_mac,
                "info":{
                    "gw_mac": jsonObj.gw_mac,
                    "rssi": jsonObj.rssi,
                    "fix": jsonObj.fix,
                    "battery": jsonObj.battery,
                    "status": jsonObj.status,
                    "reserve": jsonObj.reserve,
                    "user": jsonObj.user,
                    "fix2": jsonObj.fix2,
                    "reserve2": jsonObj.reserve2,
                    "timestamp": jsonObj.timestamp,
                    "exist": jsonObj.exist
                }
            }
        };

docClient.putメソッドで先ほど作成したparamsをDynamoDBへレコード挿入します。

        console.log("Adding a new item...");
        docClient.put(params, function(err, data) {

            if (err) {
                console.error("Unable to add item. Error JSON:", JSON.stringify(err, null, 2));
            } else {
                console.log("Added item:", JSON.stringify(data, null, 2));
            }
        });

[コードの実行]

Cloud9上で、"Run"を押下することでコードの実行ができます。

Cloud9Exec

データの参照

検索用のソースコードは以下のリンクからダウンロードできます。
Markdown: JavaScript

引数に検索キーを指定し、docClient.scanメソッドで範囲検索します。

    var params = {
        TableName: table,
        FilterExpression: "#sm = :key1 AND begins_with(#dt, :key2)",
        ExpressionAttributeNames: {
            "#sm": "sensor_mac",
            "#dt": "datetime"
        },
        ExpressionAttributeValues: {
            ":key1": event.key1,
            ":key2": event.key2,
        }
    };

    console.log("Scanning Sensordata table.");
    docClient.scan(params, onScan);

[コードの実行]
Cloud9上の実行結果

Cloud9Exec

Lambda関数のデプロイ

Cloud9の左側に"AWS Resources"というメニューがあり、ここからデプロイします。

Cloud9Deploy

デプロイされると、AWS上にLambda関数として登録され、他から呼出すことが可能です。

Cloud9Deploy

以上、終わりです。