はじめに
今頃Kinesis Firehoseとか今更感がありますがちゃんと使ってみたかったってのと、FunnelからFirehoseにJSONを投げたいという要望が舞い込んできたので、あんまりニーズはないと思いますが記事にしてみた
処理フロー
※点線部分はおまけ
Create S3
FirehoseがPUTするためのS3を作成しておきます。(作成する方法は省略します)
Create Kinesis Firehose
コンソールからFirehoseの画面へ移動して「Create delivery stream」からFirehoseを作成します
まだ東京リージョンで利用することはできないので、オレゴンリージョンで作成しました
- 「Delivery stream name」に任意の文字列を入力し、そのまま「next」をクリック
- FirehoseからLambdaをキックすることはしないのでそのまま「next」をクリック
- 「Destination」でS3を選択し(おそらくデフォルトで選択されています)、PUTするバケットを選択
- 「S3 buffer conditions」の「Buffer interval」を最小の「60」に設定。これで60秒待ってからS3にPUTします
- その他のデータを圧縮するか、暗号化するかなどのを任意に設定して、IAMroleを作成します
- 新しいIAMroleを作成すると今までの設定で必要な権限がついているIAMroleが作成されます
- 「Status」が「ACTIVE」になれば作成完了です
これでFirehoseにPUTRecordされたデータはS3にPUTObjectされます
※AWS-CLIでテストを行う
aws firehose put-record --delivery-stream-name deliveryStreamName --record Data="test"
SORACOM Funnelの設定
新しくSIMグループを作成し、Funnelの設定を行っていきます
- 「転送先サービス」から「Amazon Kinesis Firehose」を選択
- 「転送先URL」に「https://firehose.[region].amazonaws.com/[deliveryStreamName] 」を入力
- 「認証情報」には、Kinesis Firehoseの実行権限があるKey情報を持ったものを作成し、適用します
- 「送信データ形式」には「JSON」を選択します
これで、「http://funnel.soracom.io」にリクエストを送るとFirehoseにデータが送信されます
さぁテストの時間だ
先ほど設定したSIMグループのSIMが刺さったデバイスからPOSTメソッドを送信します
curl -X POST --data @test.json -H "Content-Type: application/json" http://funnel.soracom.io
送信後S3にデータが保存されていれば成功です
おまけ
S3にPUTが行われたときにLambdaをキックしてDynamoDBにデータを保存する
おおまかなフロー
- Funnel経由でFirehoseにJSONを送信
- FirehoseがS3にPutRecord
- S3がLambdaをキック
- LambdaがS3にPutされたJSONを取得しDynamoDBにPut
Lambda
LambdaのロールにはDynamoDBへの権限を持たせておきます
const AWS = require("aws-sdk");
const co = require("co");
const dynamodb = new AWS.DynamoDB.DocumentClient({
region: "us-west-2"
});
const s3 = new AWS.S3();
const dynamoPutData = require("./lib/dynamo_put_data");
const s3GetObject = require("./lib/s3_get_object");
exports.handler = (event, context, callback) => {
console.log(JSON.stringify(event));
co(function *() {
// S3のBucketNameとPUTされたJSONのKey情報を取得
const bucketName = event["Records"][0]["s3"]["bucket"]["name"];
const objectKey = event["Records"][0]["s3"]["object"]["key"];
// S3からJSONファイルを取得
const s3GetData = yield s3GetObject.getObject(s3, bucketName, objectKey);
const item = {
id: "test",
payloads: s3GetData
};
return yield dynamoPutData.putDynamoDB(dynamodb, item);
}).then(() => {
console.log("success");
}).catch((err) => {
console.log(err);
});
};
class s3GetObject {
/**
* S3からObjectを取得する
* @param s3
* @param bucket
* @param key
* @returns {Promise}
*/
static getObject(s3, bucket, key) {
return new Promise((resolve, reject) => {
const params = {
Bucket: bucket,
Key: key
};
s3.getObject(params, (err, data) => {
if(err) {
return reject(err);
} else {
const getData = JSON.parse(data.Body.toString());
return resolve(getData["payloads"]);
}
});
});
}
}
module.exports = s3GetObject;
class dynamoPutData {
/**
* DynamoDBにデータを保存する(Put)
* @param {DocumentClient} dynamoDB
* @param item
* @returns {Promise}
*/
static putDynamoDB(dynamoDB, item) {
return new Promise((resolve, reject) => {
const params = {
TableName: "tableName",
Item: item
};
return dynamoDB.put(params, (err, data) => {
if (err) {
console.log(err);
return reject(err);
} else {
console.log(data);
return resolve(data);
}
});
});
}
}
module.exports = dynamoPutData;
S3
- S3のコンソール画面からプロパティ⇒Eventsを選択
- 「Add notification」を選択
- 「Name」には任意の文字列、「Events」はPut、「Send to」は「Lambda Function」を選択し、先ほど作成したLambdaを選択します
これで作業完了です。
Funnel経由でJSONを送信し、S3やDynamoDBにデータが保存されているか確認してください
まとめ
とりあえずFirehoseがどうしても1分データを保持してしまうのでStreamに比べたら処理時間がかかってしまうようです
というかリアルタイム性がほしければStream使おうかってなりますよねー
あと早く東京に上陸してくれないだろうかって感じですね
ではまた!