参考
サンプルアプリケーション実装
前回につづいて、S3へファイルを書き込むアプリケーションの実装サンプル。
同じように、AwsCredentials.properties
又は、EC2ロールで、S3へのアクセス権限を持つものに設定をしておくこと。サンプルではとりあえず、PowerUserのロールで行っています。
SampleS3putter.java
private static final Logger LOG = LoggerFactory
.getLogger(SampleS3Putter.class);
public SampleS3Putter() {
}
private AmazonS3Client s3;
private static final String BUCKET_NAME = "<バケット名>";
private static final String DIRECTORY_PATH = "log/";
private static final String EXTENSION = ".txt";
private final CharsetDecoder decoder = Charset.forName("UTF-8")
.newDecoder();
private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
private long nextCheckpointTimeInMillis;
@Override
public void initialize(String shardId) {
try {
credentialsProvider = new InstanceProfileCredentialsProvider();
credentialsProvider.getCredentials();
} catch (AmazonClientException e) {
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
credentialsProvider.getCredentials();
}
this.s3 = new AmazonS3Client(credentialsProvider);
}
@Override
public void processRecords(List<Record> records,
IRecordProcessorCheckpointer checkpointer) {
process(records);
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
try {
checkpointer.checkpoint();
} catch (ShutdownException se) {
// 処理
} catch (ThrottlingException e) {
// 処理
} catch (InvalidStateException e) {
// 処理
}
nextCheckpointTimeInMillis = System.currentTimeMillis()
+ CHECKPOINT_INTERVAL_MILLIS;
}
}
private void process(List<Record> records) {
if (records.isEmpty()) {
LOG.info("record empty.");
return;
}
byte[] dst = new byte[0];
for (Record record : records) {
LOG.debug(
"---- Data : {} ,PartitionKey : {} , SequenceNumber : {} ----",
record.getData(), record.getPartitionKey(),
record.getSequenceNumber());
try {
String data = decoder.decode(record.getData()).toString()
+ "\n";
byte[] dstBytes = dst;
byte[] dataBytes = data.getBytes("UTF-8");
dst = new byte[dstBytes.length + dataBytes.length];
System.arraycopy(dstBytes, 0, dst, 0, dstBytes.length);
System.arraycopy(dataBytes, 0, dst, dstBytes.length,
dataBytes.length);
} catch (CharacterCodingException e) {
// 何か処理
} catch (UnsupportedEncodingException e) {
// 何か処理
}
}
// s3への書き込み
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(dst.length);
PutObjectResult putObjectResult = s3.putObject(BUCKET_NAME,
DIRECTORY_PATH + System.currentTimeMillis()+"_log"+ EXTENSION,
new ByteArrayInputStream(dst), metadata);
LOG.debug(putObjectResult.toString());
}
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer,
ShutdownReason reason) {
if (reason == ShutdownReason.TERMINATE) {
try {
checkpointer.checkpoint();
} catch (ShutdownException se) {
// 処理
} catch (ThrottlingException e) {
// 処理
} catch (InvalidStateException e) {
// 処理
}
}
}
Factoryクラス
S3PutterRecordProcessorFactory.java
public S3PutterRecordProcessorFactory() {
}
/* (非 Javadoc)
* @see com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory#createProcessor()
*/
@Override
public IRecordProcessor createProcessor() {
return new SampleS3Putter();
}
前回と同じように起動するようにすれば、いけるはず。
実装のポイント
- AmazonS3Clientを使う。S3にアクセスできる権限をもったユーザでアクセスする。
- 一度に出力するデータ(レコード数)を考える。100レコードで一回出力するとか。データのサイズで出力するなど。
また何かあったら追記
次く。。。