LoginSignup
4
4

More than 5 years have passed since last update.

Kinesis S3へデータを書き込むサンプルアプリケーション

Posted at

参考

Kinesisってなんじゃ?(Java実装編)

サンプルアプリケーション実装

前回につづいて、S3へファイルを書き込むアプリケーションの実装サンプル。
同じように、AwsCredentials.properties又は、EC2ロールで、S3へのアクセス権限を持つものに設定をしておくこと。サンプルではとりあえず、PowerUserのロールで行っています。

SampleS3putter.java

    private static final Logger LOG = LoggerFactory
            .getLogger(SampleS3Putter.class);

    public SampleS3Putter() {
    }

    private AmazonS3Client s3;
    private static final String BUCKET_NAME = "<バケット名>";
    private static final String DIRECTORY_PATH = "log/";
    private static final String EXTENSION = ".txt";
    private final CharsetDecoder decoder = Charset.forName("UTF-8")
            .newDecoder();
    private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
    private long nextCheckpointTimeInMillis;

    @Override
    public void initialize(String shardId) {
        try {
            credentialsProvider = new InstanceProfileCredentialsProvider();
            credentialsProvider.getCredentials();
        } catch (AmazonClientException e) {
            credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
            credentialsProvider.getCredentials();
        }
        this.s3 = new AmazonS3Client(credentialsProvider);
    }

    @Override
    public void processRecords(List<Record> records,
            IRecordProcessorCheckpointer checkpointer) {
        process(records);
        if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
             try {
               checkpointer.checkpoint();
          } catch (ShutdownException se) {
              // 処理
          } catch (ThrottlingException e) {
              // 処理
          } catch (InvalidStateException e) {
              // 処理
          }
            nextCheckpointTimeInMillis = System.currentTimeMillis()
                    + CHECKPOINT_INTERVAL_MILLIS;
        }
    }

    private void process(List<Record> records) {

        if (records.isEmpty()) {
            LOG.info("record empty.");
            return;
        }

        byte[] dst = new byte[0];
        for (Record record : records) {
            LOG.debug(
                    "---- Data : {} ,PartitionKey : {} , SequenceNumber : {} ----",
                    record.getData(), record.getPartitionKey(),
                    record.getSequenceNumber());

            try {
                String data = decoder.decode(record.getData()).toString()
                        + "\n";
                byte[] dstBytes = dst;
                byte[] dataBytes = data.getBytes("UTF-8");
                dst = new byte[dstBytes.length + dataBytes.length];
                System.arraycopy(dstBytes, 0, dst, 0, dstBytes.length);
                System.arraycopy(dataBytes, 0, dst, dstBytes.length,
                        dataBytes.length);
            } catch (CharacterCodingException e) {
                // 何か処理
            } catch (UnsupportedEncodingException e) {
                // 何か処理
            }

        }

        // s3への書き込み
        ObjectMetadata metadata = new ObjectMetadata();
        metadata.setContentLength(dst.length);
        PutObjectResult putObjectResult = s3.putObject(BUCKET_NAME,
                DIRECTORY_PATH + System.currentTimeMillis()+"_log"+ EXTENSION,
                new ByteArrayInputStream(dst), metadata);

        LOG.debug(putObjectResult.toString());
    }

    @Override
    public void shutdown(IRecordProcessorCheckpointer checkpointer,
            ShutdownReason reason) {
        if (reason == ShutdownReason.TERMINATE) {
             try {
               checkpointer.checkpoint();
          } catch (ShutdownException se) {
              // 処理
          } catch (ThrottlingException e) {
              // 処理
          } catch (InvalidStateException e) {
              // 処理
          }
        }
    }



Factoryクラス

S3PutterRecordProcessorFactory.java

    public S3PutterRecordProcessorFactory() {
    }

    /* (非 Javadoc)
     * @see com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory#createProcessor()
     */
    @Override
    public IRecordProcessor createProcessor() {
        return new SampleS3Putter();
    }

前回と同じように起動するようにすれば、いけるはず。

実装のポイント
1. AmazonS3Clientを使う。S3にアクセスできる権限をもったユーザでアクセスする。
2. 一度に出力するデータ(レコード数)を考える。100レコードで一回出力するとか。データのサイズで出力するなど。

また何かあったら追記

次く。。。

4
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
4