Help us understand the problem. What is going on with this article?

Kinesis S3へデータを書き込むサンプルアプリケーション

More than 5 years have passed since last update.

参考

Kinesisってなんじゃ?(Java実装編)

サンプルアプリケーション実装

前回につづいて、S3へファイルを書き込むアプリケーションの実装サンプル。
同じように、AwsCredentials.properties又は、EC2ロールで、S3へのアクセス権限を持つものに設定をしておくこと。サンプルではとりあえず、PowerUserのロールで行っています。

SampleS3putter.java
    private static final Logger LOG = LoggerFactory
            .getLogger(SampleS3Putter.class);

    public SampleS3Putter() {
    }

    private AmazonS3Client s3;
    private static final String BUCKET_NAME = "<バケット名>";
    private static final String DIRECTORY_PATH = "log/";
    private static final String EXTENSION = ".txt";
    private final CharsetDecoder decoder = Charset.forName("UTF-8")
            .newDecoder();
    private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
    private long nextCheckpointTimeInMillis;

    @Override
    public void initialize(String shardId) {
        try {
            credentialsProvider = new InstanceProfileCredentialsProvider();
            credentialsProvider.getCredentials();
        } catch (AmazonClientException e) {
            credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
            credentialsProvider.getCredentials();
        }
        this.s3 = new AmazonS3Client(credentialsProvider);
    }

    @Override
    public void processRecords(List<Record> records,
            IRecordProcessorCheckpointer checkpointer) {
        process(records);
        if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
             try {
               checkpointer.checkpoint();
          } catch (ShutdownException se) {
              // 処理
          } catch (ThrottlingException e) {
              // 処理
          } catch (InvalidStateException e) {
              // 処理
          }
            nextCheckpointTimeInMillis = System.currentTimeMillis()
                    + CHECKPOINT_INTERVAL_MILLIS;
        }
    }

    private void process(List<Record> records) {

        if (records.isEmpty()) {
            LOG.info("record empty.");
            return;
        }

        byte[] dst = new byte[0];
        for (Record record : records) {
            LOG.debug(
                    "---- Data : {} ,PartitionKey : {} , SequenceNumber : {} ----",
                    record.getData(), record.getPartitionKey(),
                    record.getSequenceNumber());

            try {
                String data = decoder.decode(record.getData()).toString()
                        + "\n";
                byte[] dstBytes = dst;
                byte[] dataBytes = data.getBytes("UTF-8");
                dst = new byte[dstBytes.length + dataBytes.length];
                System.arraycopy(dstBytes, 0, dst, 0, dstBytes.length);
                System.arraycopy(dataBytes, 0, dst, dstBytes.length,
                        dataBytes.length);
            } catch (CharacterCodingException e) {
                // 何か処理
            } catch (UnsupportedEncodingException e) {
                // 何か処理
            }

        }

        // s3への書き込み
        ObjectMetadata metadata = new ObjectMetadata();
        metadata.setContentLength(dst.length);
        PutObjectResult putObjectResult = s3.putObject(BUCKET_NAME,
                DIRECTORY_PATH + System.currentTimeMillis()+"_log"+ EXTENSION,
                new ByteArrayInputStream(dst), metadata);

        LOG.debug(putObjectResult.toString());
    }

    @Override
    public void shutdown(IRecordProcessorCheckpointer checkpointer,
            ShutdownReason reason) {
        if (reason == ShutdownReason.TERMINATE) {
             try {
               checkpointer.checkpoint();
          } catch (ShutdownException se) {
              // 処理
          } catch (ThrottlingException e) {
              // 処理
          } catch (InvalidStateException e) {
              // 処理
          }
        }
    }



Factoryクラス

S3PutterRecordProcessorFactory.java
    public S3PutterRecordProcessorFactory() {
    }

    /* (非 Javadoc)
     * @see com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory#createProcessor()
     */
    @Override
    public IRecordProcessor createProcessor() {
        return new SampleS3Putter();
    }

前回と同じように起動するようにすれば、いけるはず。

実装のポイント
1. AmazonS3Clientを使う。S3にアクセスできる権限をもったユーザでアクセスする。
2. 一度に出力するデータ(レコード数)を考える。100レコードで一回出力するとか。データのサイズで出力するなど。

また何かあったら追記

次く。。。

opst
情報技術と社員力でお客様を成功に導く Make IT Your Success
https://www.opst.co.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした