AWS
S3
DynamoDB
Kinesis
SORACOM

SORACOM Funnelを利用してKinesis FirehoseにJSONを送信してS3にPUTする(ついでにDynamoDBに書き込む)

はじめに

今頃Kinesis Firehoseとか今更感がありますがちゃんと使ってみたかったってのと、FunnelからFirehoseにJSONを投げたいという要望が舞い込んできたので、あんまりニーズはないと思いますが記事にしてみた

処理フロー

※点線部分はおまけ

funnel_firehose.PNG

Create S3

FirehoseがPUTするためのS3を作成しておきます。(作成する方法は省略します)

Create Kinesis Firehose

コンソールからFirehoseの画面へ移動して「Create delivery stream」からFirehoseを作成します
まだ東京リージョンで利用することはできないので、オレゴンリージョンで作成しました

  1. 「Delivery stream name」に任意の文字列を入力し、そのまま「next」をクリック
  2. FirehoseからLambdaをキックすることはしないのでそのまま「next」をクリック
  3. 「Destination」でS3を選択し(おそらくデフォルトで選択されています)、PUTするバケットを選択
  4. 「S3 buffer conditions」の「Buffer interval」を最小の「60」に設定。これで60秒待ってからS3にPUTします
  5. その他のデータを圧縮するか、暗号化するかなどのを任意に設定して、IAMroleを作成します
  6. 新しいIAMroleを作成すると今までの設定で必要な権限がついているIAMroleが作成されます
  7. 「Status」が「ACTIVE」になれば作成完了です

これでFirehoseにPUTRecordされたデータはS3にPUTObjectされます

※AWS-CLIでテストを行う

aws firehose put-record --delivery-stream-name deliveryStreamName --record Data="test"

SORACOM Funnelの設定

新しくSIMグループを作成し、Funnelの設定を行っていきます

  1. 「転送先サービス」から「Amazon Kinesis Firehose」を選択
  2. 「転送先URL」に「https://firehose.[region].amazonaws.com/[deliveryStreamName] 」を入力
  3. 「認証情報」には、Kinesis Firehoseの実行権限があるKey情報を持ったものを作成し、適用します
  4. 「送信データ形式」には「JSON」を選択します

これで、「http://funnel.soracom.io」にリクエストを送るとFirehoseにデータが送信されます

さぁテストの時間だ

先ほど設定したSIMグループのSIMが刺さったデバイスからPOSTメソッドを送信します

 curl -X POST --data @test.json -H "Content-Type: application/json" http://funnel.soracom.io

送信後S3にデータが保存されていれば成功です

おまけ

S3にPUTが行われたときにLambdaをキックしてDynamoDBにデータを保存する
おおまかなフロー
1. Funnel経由でFirehoseにJSONを送信
2. FirehoseがS3にPutRecord
3. S3がLambdaをキック
4. LambdaがS3にPutされたJSONを取得しDynamoDBにPut

Lambda

LambdaのロールにはDynamoDBへの権限を持たせておきます

index.js
const AWS = require("aws-sdk");
const co = require("co");

const dynamodb = new AWS.DynamoDB.DocumentClient({
  region: "us-west-2"
});
const s3 = new AWS.S3();

const dynamoPutData = require("./lib/dynamo_put_data");
const s3GetObject = require("./lib/s3_get_object");


exports.handler = (event, context, callback) => {
  console.log(JSON.stringify(event));
  co(function *() {
    // S3のBucketNameとPUTされたJSONのKey情報を取得
    const bucketName = event["Records"][0]["s3"]["bucket"]["name"];
    const objectKey = event["Records"][0]["s3"]["object"]["key"];
    // S3からJSONファイルを取得
    const s3GetData = yield s3GetObject.getObject(s3, bucketName, objectKey);
    const item = {
      id: "test",
      payloads: s3GetData
    };
    return yield dynamoPutData.putDynamoDB(dynamodb, item);
  }).then(() => {
    console.log("success");
  }).catch((err) => {
    console.log(err);
  });

};
s3_get_object.js
class s3GetObject {

  /**
   * S3からObjectを取得する
   * @param s3
   * @param bucket
   * @param key
   * @returns {Promise}
   */
  static getObject(s3, bucket, key) {
    return new Promise((resolve, reject) => {
      const params = {
        Bucket: bucket,
        Key: key
      };
      s3.getObject(params, (err, data) => {
        if(err) {
          return reject(err);
        } else {
          const getData = JSON.parse(data.Body.toString());
          return resolve(getData["payloads"]);
        }
      });
    });
  }
}

module.exports = s3GetObject;
dynamo_put_data.js
class dynamoPutData {

   /**
   * DynamoDBにデータを保存する(Put)
   * @param {DocumentClient} dynamoDB
   * @param item
   * @returns {Promise}
   */
  static putDynamoDB(dynamoDB, item) {
    return new Promise((resolve, reject) => {
      const params = {
        TableName: "tableName",
        Item: item
      };
      return dynamoDB.put(params, (err, data) => {
        if (err) {
          console.log(err);
          return reject(err);
        } else {
          console.log(data);
          return resolve(data);
        }
      });
    });
  }
}

module.exports = dynamoPutData;

S3

  1. S3のコンソール画面からプロパティ⇒Eventsを選択
  2. 「Add notification」を選択
  3. 「Name」には任意の文字列、「Events」はPut、「Send to」は「Lambda Function」を選択し、先ほど作成したLambdaを選択します

これで作業完了です。
Funnel経由でJSONを送信し、S3やDynamoDBにデータが保存されているか確認してください

まとめ

とりあえずFirehoseがどうしても1分データを保持してしまうのでStreamに比べたら処理時間がかかってしまうようです
というかリアルタイム性がほしければStream使おうかってなりますよねー
あと早く東京に上陸してくれないだろうかって感じですね

ではまた!