これは個人の意見/メモであり、所属する組織を代表するものではありません。
これは何?
最近Amazon Kinesisを使っていろいろ開発することがある。その際、レコードを流し込んだはじから、確認のためにTailしたいというケースがよくあるので自分メモとしてスニペットにした。指定したストリームのすべてのシャードのレコードをTailしてくれるツール。みんなやってると思うんだけど、自分でも3回くらい書いてしまってアレなのでQiitaに投稿しとく。
JSである必要は全然ないんだけど、ちょうどnode.jsのコードを書いている時だったのでJSで書いた。
Usage
$ node kinesis_getter.js [REGION] [STREAM_NAME] [STRATEGY]
REGION: us-east-1とかap-northeast-1とか
STREAM_NAME: stream名
STRATEGY: TRIM_HORIZON or LATEST (DefaultでLATEST)
Output
ShardID, PartitionKey, SequenceNumber, Data
shardId-000000000002, zhfddlqahmqbtccwbrkaibilwtjadidddkuo, 49545830252860334642300441320662317302669106337672069138, TestString-71
shardId-000000000002, oleerhraymybmeuhrrlmwezrxzeglixumbye, 49545830252860334642300441320665944080127950225196187666, TestString-85
shardId-000000000002, gadbbwpuddmjdioqcxrequvjodymlsqsepth, 49545830252860334642300441320669570857586794112720306194, TestString-94
shardId-000000000002, ezbwgwpfvtapwsezoixtkwsctylxleqcbncb, 49545830252860334642300441320675615486684867327313313810, TestString-90
shardId-000000000002, dbaxygsvyludzdpaoodorczsdzrswrelxvag, 49545830252860334642300441320692540448159472135759200274, TestString-69
コード
kinesis_getter.js
var aws = require('aws-sdk');
var region = process.argv[2]
var stream = process.argv[3];
var strategy = process.argv[4] || 'LATEST';
var kinesis = new aws.Kinesis({region:region});
kinesis.describeStream({StreamName:stream},function(err,result){
var shards = result.StreamDescription.Shards;
console.log(shards);
for(var i = 0; i < shards.length; i++){
var shardId = shards[i].ShardId;
var params = {
ShardId: shardId,
ShardIteratorType: strategy,
StreamName: stream
};
kinesis.getShardIterator(params,function(err,result){
if(err) console.log(err);
else getRecords(kinesis,shardId,result.ShardIterator);
});
}
});
function getRecords(kinesis,shardId,shardIterator){
kinesis.getRecords({ShardIterator: shardIterator, Limit: 10000},function(err,result){
if(err) console.log(err);
else {
if(result.Records.length){
for(var i = 0; i < result.Records.length; i++){
r = result.Records[i];
console.log(shardId + ', ' + r.PartitionKey + ', ' + r.SequenceNumber + ', ' + r.Data);
}
}
setTimeout(function() {
getRecords(kinesis, shardId, result.NextShardIterator);
},0);
}
})
}