LoginSignup
5
0

More than 5 years have passed since last update.

SORACOM Funnelを利用してKinesis FirehoseにJSONを送信してS3にPUTする(ついでにDynamoDBに書き込む)

Posted at

はじめに

今頃Kinesis Firehoseとか今更感がありますがちゃんと使ってみたかったってのと、FunnelからFirehoseにJSONを投げたいという要望が舞い込んできたので、あんまりニーズはないと思いますが記事にしてみた

処理フロー

※点線部分はおまけ

funnel_firehose.PNG

Create S3

FirehoseがPUTするためのS3を作成しておきます。(作成する方法は省略します)

Create Kinesis Firehose

コンソールからFirehoseの画面へ移動して「Create delivery stream」からFirehoseを作成します
まだ東京リージョンで利用することはできないので、オレゴンリージョンで作成しました

  1. 「Delivery stream name」に任意の文字列を入力し、そのまま「next」をクリック
  2. FirehoseからLambdaをキックすることはしないのでそのまま「next」をクリック
  3. 「Destination」でS3を選択し(おそらくデフォルトで選択されています)、PUTするバケットを選択
  4. 「S3 buffer conditions」の「Buffer interval」を最小の「60」に設定。これで60秒待ってからS3にPUTします
  5. その他のデータを圧縮するか、暗号化するかなどのを任意に設定して、IAMroleを作成します
  6. 新しいIAMroleを作成すると今までの設定で必要な権限がついているIAMroleが作成されます
  7. 「Status」が「ACTIVE」になれば作成完了です

これでFirehoseにPUTRecordされたデータはS3にPUTObjectされます

※AWS-CLIでテストを行う

aws firehose put-record --delivery-stream-name deliveryStreamName --record Data="test"

SORACOM Funnelの設定

新しくSIMグループを作成し、Funnelの設定を行っていきます

  1. 「転送先サービス」から「Amazon Kinesis Firehose」を選択
  2. 「転送先URL」に「https://firehose.[region].amazonaws.com/[deliveryStreamName] 」を入力
  3. 「認証情報」には、Kinesis Firehoseの実行権限があるKey情報を持ったものを作成し、適用します
  4. 「送信データ形式」には「JSON」を選択します

これで、「http://funnel.soracom.io」にリクエストを送るとFirehoseにデータが送信されます

さぁテストの時間だ

先ほど設定したSIMグループのSIMが刺さったデバイスからPOSTメソッドを送信します

 curl -X POST --data @test.json -H "Content-Type: application/json" http://funnel.soracom.io

送信後S3にデータが保存されていれば成功です

おまけ

S3にPUTが行われたときにLambdaをキックしてDynamoDBにデータを保存する
おおまかなフロー
1. Funnel経由でFirehoseにJSONを送信
2. FirehoseがS3にPutRecord
3. S3がLambdaをキック
4. LambdaがS3にPutされたJSONを取得しDynamoDBにPut

Lambda

LambdaのロールにはDynamoDBへの権限を持たせておきます

index.js
const AWS = require("aws-sdk");
const co = require("co");

const dynamodb = new AWS.DynamoDB.DocumentClient({
  region: "us-west-2"
});
const s3 = new AWS.S3();

const dynamoPutData = require("./lib/dynamo_put_data");
const s3GetObject = require("./lib/s3_get_object");


exports.handler = (event, context, callback) => {
  console.log(JSON.stringify(event));
  co(function *() {
    // S3のBucketNameとPUTされたJSONのKey情報を取得
    const bucketName = event["Records"][0]["s3"]["bucket"]["name"];
    const objectKey = event["Records"][0]["s3"]["object"]["key"];
    // S3からJSONファイルを取得
    const s3GetData = yield s3GetObject.getObject(s3, bucketName, objectKey);
    const item = {
      id: "test",
      payloads: s3GetData
    };
    return yield dynamoPutData.putDynamoDB(dynamodb, item);
  }).then(() => {
    console.log("success");
  }).catch((err) => {
    console.log(err);
  });

};
s3_get_object.js
class s3GetObject {

  /**
   * S3からObjectを取得する
   * @param s3
   * @param bucket
   * @param key
   * @returns {Promise}
   */
  static getObject(s3, bucket, key) {
    return new Promise((resolve, reject) => {
      const params = {
        Bucket: bucket,
        Key: key
      };
      s3.getObject(params, (err, data) => {
        if(err) {
          return reject(err);
        } else {
          const getData = JSON.parse(data.Body.toString());
          return resolve(getData["payloads"]);
        }
      });
    });
  }
}

module.exports = s3GetObject;
dynamo_put_data.js
class dynamoPutData {

   /**
   * DynamoDBにデータを保存する(Put)
   * @param {DocumentClient} dynamoDB
   * @param item
   * @returns {Promise}
   */
  static putDynamoDB(dynamoDB, item) {
    return new Promise((resolve, reject) => {
      const params = {
        TableName: "tableName",
        Item: item
      };
      return dynamoDB.put(params, (err, data) => {
        if (err) {
          console.log(err);
          return reject(err);
        } else {
          console.log(data);
          return resolve(data);
        }
      });
    });
  }
}

module.exports = dynamoPutData;

S3

  1. S3のコンソール画面からプロパティ⇒Eventsを選択
  2. 「Add notification」を選択
  3. 「Name」には任意の文字列、「Events」はPut、「Send to」は「Lambda Function」を選択し、先ほど作成したLambdaを選択します

これで作業完了です。
Funnel経由でJSONを送信し、S3やDynamoDBにデータが保存されているか確認してください

まとめ

とりあえずFirehoseがどうしても1分データを保持してしまうのでStreamに比べたら処理時間がかかってしまうようです
というかリアルタイム性がほしければStream使おうかってなりますよねー
あと早く東京に上陸してくれないだろうかって感じですね

ではまた!

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0