LoginSignup
9

More than 5 years have passed since last update.

Amazon KinesisのデータをTailするユーティリティ

Last updated at Posted at 2014-12-08

これは個人の意見/メモであり、所属する組織を代表するものではありません。

これは何?

最近Amazon Kinesisを使っていろいろ開発することがある。その際、レコードを流し込んだはじから、確認のためにTailしたいというケースがよくあるので自分メモとしてスニペットにした。指定したストリームのすべてのシャードのレコードをTailしてくれるツール。みんなやってると思うんだけど、自分でも3回くらい書いてしまってアレなのでQiitaに投稿しとく。

JSである必要は全然ないんだけど、ちょうどnode.jsのコードを書いている時だったのでJSで書いた。

Usage

$ node kinesis_getter.js [REGION] [STREAM_NAME] [STRATEGY]

REGION: us-east-1とかap-northeast-1とか
STREAM_NAME: stream名
STRATEGY: TRIM_HORIZON or LATEST (DefaultでLATEST)

Output

ShardID, PartitionKey, SequenceNumber, Data
shardId-000000000002, zhfddlqahmqbtccwbrkaibilwtjadidddkuo, 49545830252860334642300441320662317302669106337672069138, TestString-71
shardId-000000000002, oleerhraymybmeuhrrlmwezrxzeglixumbye, 49545830252860334642300441320665944080127950225196187666, TestString-85
shardId-000000000002, gadbbwpuddmjdioqcxrequvjodymlsqsepth, 49545830252860334642300441320669570857586794112720306194, TestString-94
shardId-000000000002, ezbwgwpfvtapwsezoixtkwsctylxleqcbncb, 49545830252860334642300441320675615486684867327313313810, TestString-90
shardId-000000000002, dbaxygsvyludzdpaoodorczsdzrswrelxvag, 49545830252860334642300441320692540448159472135759200274, TestString-69

コード

kinesis_getter.js
var aws = require('aws-sdk');

var region = process.argv[2]
var stream = process.argv[3];
var strategy = process.argv[4] || 'LATEST';
var kinesis = new aws.Kinesis({region:region});

kinesis.describeStream({StreamName:stream},function(err,result){
   var shards = result.StreamDescription.Shards;
   console.log(shards);
   for(var i = 0; i < shards.length; i++){
       var shardId = shards[i].ShardId;
       var params = {
         ShardId: shardId,
         ShardIteratorType: strategy,
         StreamName: stream
       };
       kinesis.getShardIterator(params,function(err,result){
          if(err) console.log(err);
          else getRecords(kinesis,shardId,result.ShardIterator);
       });
   }
});

function getRecords(kinesis,shardId,shardIterator){
    kinesis.getRecords({ShardIterator: shardIterator, Limit: 10000},function(err,result){
        if(err) console.log(err);
        else {
            if(result.Records.length){
                for(var i = 0; i < result.Records.length; i++){
                    r = result.Records[i];
                    console.log(shardId + ', ' + r.PartitionKey + ', ' + r.SequenceNumber + ', ' + r.Data);
                }
            }
            setTimeout(function() {
                getRecords(kinesis, shardId, result.NextShardIterator);
            },0);
        }
    })
}


Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9