#サンプルアプリケーション
参考
ストリームデータ処理サービスAmazon Kinesisについて調べた結果
公式
Getting Started
Developing Record Consumer Applications with the Amazon Kinesis Client Library
準備
Amazon Kinesis Client Library
Kinesisのstreamへアクセスするアプリケーションを記述するためにAmazon Kinesis Client Library
githubよりクローン
git clone https://github.com/awslabs/amazon-kinesis-client.git
mavenコマンドでライブラリをインストールします。
mvn clean install -Dgpg.skip=true
Amazon Kinesis streamの作成
Services > Analytics > Kinesisを選択し、Amazon Kinesisの画面へ。
Create Streamを押下。
Stream NameとNumber of Shardsを設定し、Createを押下。ここでは、Shard数は1に設定
依存関係の設定
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>[1.0,)</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>[1.7,)</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>[1.7,)</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>[1.7,)</version>
</dependency>
</dependencies>
クライアント実装
/* slf4j */
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
/**
* kinesis クライアントアプリケーション
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// アプリケーション名
final String streamName = "samplestream";
// kinesis クライアントを初期化
// ClasspathPropertiesFileCredentialsProvider()
AWSCredentialsProvider credentialsProvider = null;
try {
credentialsProvider = new InstanceProfileCredentialsProvider();
credentialsProvider.getCredentials();
} catch (AmazonClientException e) {
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
credentialsProvider.getCredentials();
}
final AmazonKinesis kinesis = new AmazonKinesisClient(
credentialsProvider);
// ランダムな文字列を生成してPutRecordRequestオブジェクトにput
while (true) {
final String key = RandomStringUtils.randomAlphanumeric(10);
final String data = "KEY_"
+ Calendar.getInstance().getTime().getTime() + ":" + key;
final PutRecordRequest request = new PutRecordRequest();
request.setStreamName(streamName);
request.setData(ByteBuffer.wrap(data.getBytes("UTF-8")));
request.setPartitionKey(key);
final PutRecordResult putRecord = kinesis.putRecord(request);
LOG.info("key:{} ,record:{}", key, data, putRecord);
LOG.info("--------");
}
}
アプリケーション実装
private static final Logger LOG = LoggerFactory.getLogger(SampleRecordProcessorFactory.class);
public SampleRecordProcessorFactory() {
super();
}
@Override
public IRecordProcessor createProcessor() {
return new SampleRecordProcessor();
}
private static final Logger LOG = LoggerFactory.getLogger(SampleRecordProcessor.class);
private final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
private long nextCheckpointTimeInMillis;
private String sId = null;
/**
*
*/
public SampleRecordProcessor() {
// コンストラクタ
}
/*
* (非 Javadoc)
*
* @see
* com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
* #initialize(java.lang.String)
*/
@Override
public void initialize(String shardId) {
// 初期化
sId = shardId;
}
/*
* (非 Javadoc)
*
* @see
* com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
* #processRecords(java.util.List,
* com.amazonaws.services.kinesis.clientlibrary
* .interfaces.IRecordProcessorCheckpointer)
*/
@Override
public void processRecords(List<Record> records,
IRecordProcessorCheckpointer checkpointer) {
// Streamを作り直すと前回のAppNameが使用できなくなる
if (records.isEmpty()) {
return;
}
try {
for (Record record : records) {
// records処理
String data = null;
try {
data = decoder.decode(record.getData()).toString() + "\n";
} catch (CharacterCodingException e) {
e.printStackTrace();
}
LOG.info("ShardId : {} ,Data : {}", sId, data);
}
checkpointer.checkpoint();
} catch (KinesisClientLibDependencyException | InvalidStateException
| ThrottlingException | ShutdownException e) {
}
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
try {
checkpointer.checkpoint();
} catch (KinesisClientLibDependencyException
| InvalidStateException | ThrottlingException
| ShutdownException e) {
// 何か処理
}
nextCheckpointTimeInMillis = System.currentTimeMillis()
+ CHECKPOINT_INTERVAL_MILLIS;
}
}
/*
* (非 Javadoc)
*
* @see
* com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
* #shutdown(com.amazonaws.services.kinesis.clientlibrary.interfaces.
* IRecordProcessorCheckpointer,
* com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason)
*/
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer,
ShutdownReason reason) {
if (reason == ShutdownReason.TERMINATE) {
try {
checkpointer.checkpoint();
} catch (KinesisClientLibDependencyException
| InvalidStateException | ThrottlingException
| ShutdownException e) {
// 何か処理
}
}
}
private static final Logger LOG = LoggerFactory.getLogger(Apps.class);
private static KinesisClientLibConfiguration kinesisClientLibConfiguration;
private static String streamName = "pipeline";
private static String applicationName = "kinesis-apps";
private static int maxShardCount = 1;
private static int taskIndex = 0;
/**
* @param args
*/
public static void main(String[] args) {
String workerId = null;
try {
workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":"
+ UUID.randomUUID();
} catch (UnknownHostException e) {
e.printStackTrace();
}
AWSCredentialsProvider credentialsProvider = null;
try {
credentialsProvider = new InstanceProfileCredentialsProvider();
credentialsProvider.getCredentials();
} catch (AmazonClientException e) {
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
credentialsProvider.getCredentials();
}
LOG.info("Using credentials with access key id: "
+ credentialsProvider.getCredentials().getAWSAccessKeyId());
kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
applicationName, streamName, credentialsProvider, workerId);
SampleRecordProcessorFactory recordProcessorFactory = new SampleRecordProcessorFactory();
Worker worker = new Worker(recordProcessorFactory,
kinesisClientLibConfiguration);
int exitCode = 0;
try {
worker.run();
} catch (Throwable t) {
exitCode = 1;
}
System.exit(exitCode);
}
設定ファイル
クラスパス上にAWSCredentials.propertiesがある場合、new ClasspathPropertiesFileCredentialsProvider()#getCredentials
で取得できる。
ローカル環境から実行する場合 又は、EC2インスタンス上だが、IAM ロールを設定していない場合 には必要。
EC2インスタンスにロールを設定すべきかと思う。
secretKey={発行したシークレットキー}
accessKey={発行したアクセスキー}
実行結果
client側ログ
2014-7-22 18:43:17.043 INFO jp.co.opst.kinesis.sample.Client - key:Wjls8vv6JE ,record:KEY_1406022196819:Wjls8vv6JE
2014-7-22 18:43:17.044 INFO jp.co.opst.kinesis.sample.Client - --------
2014-7-22 18:43:17.267 INFO jp.co.opst.kinesis.sample.Client - key:gXThptVxac ,record:KEY_1406022197044:gXThptVxac
2014-7-22 18:43:17.268 INFO jp.co.opst.kinesis.sample.Client - --------
2014-7-22 18:43:17.496 INFO jp.co.opst.kinesis.sample.Client - key:2u3XvnmZxJ ,record:KEY_1406022197268:2u3XvnmZxJ
2014-7-22 18:43:17.497 INFO jp.co.opst.kinesis.sample.Client - --------
2014-7-22 18:43:17.723 INFO jp.co.opst.kinesis.sample.Client - key:IQgEmGiFfi ,record:KEY_1406022197497:IQgEmGiFfi
2014-7-22 18:43:17.723 INFO jp.co.opst.kinesis.sample.Client - --------
2014-7-22 18:43:17.944 INFO jp.co.opst.kinesis.sample.Client - key:2Krh9Wtyi6 ,record:KEY_1406022197724:2Krh9Wtyi6
2014-7-22 18:43:17.945 INFO jp.co.opst.kinesis.sample.Client - --------
2014-7-22 18:43:18.168 INFO jp.co.opst.kinesis.sample.Client - key:l3GQX1KOV4 ,record:KEY_1406022197945:l3GQX1KOV4
アプリケーション側ログ
2014-7-22 18:43:19.746 INFO jp.co.opst.kinesis.sample.SampleRecordProcessor - ShardId : shardId-000000000000 ,Data : KEY_1406022197044:gXThptVxac
2014-7-22 18:43:20.679 INFO jp.co.opst.kinesis.sample.SampleRecordProcessor - ShardId : shardId-000000000000 ,Data : KEY_1406022196374:wuTdy5QSnG
2014-7-22 18:43:21.744 INFO jp.co.opst.kinesis.sample.SampleRecordProcessor - ShardId : shardId-000000000000 ,Data : KEY_1406022197268:2u3XvnmZxJ
2014-7-22 18:43:21.745 INFO jp.co.opst.kinesis.sample.SampleRecordProcessor - ShardId : shardId-000000000000 ,Data : KEY_1406022198169:HypveQeovT
2014-7-22 18:43:21.745 INFO jp.co.opst.kinesis.sample.SampleRecordProcessor - ShardId : shardId-000000000000 ,Data : KEY_1406022197945:l3GQX1KOV4
2014-7-22 18:43:21.745 INFO jp.co.opst.kinesis.sample.SampleRecordProcessor - ShardId : shardId-000000000000 ,Data : KEY_1406022197497:IQgEmGiFfi
続く。。。