18
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Amazon Kinesis アプリケーション サンプル

Last updated at Posted at 2014-07-22

サンプルアプリケーション

参考
ストリームデータ処理サービスAmazon Kinesisについて調べた結果
公式
Getting Started
Developing Record Consumer Applications with the Amazon Kinesis Client Library

準備

Amazon Kinesis Client Library

Kinesisのstreamへアクセスするアプリケーションを記述するためにAmazon Kinesis Client Library
githubよりクローン
git clone https://github.com/awslabs/amazon-kinesis-client.git

mavenコマンドでライブラリをインストールします。
mvn clean install -Dgpg.skip=true

Amazon Kinesis streamの作成

Services > Analytics > Kinesisを選択し、Amazon Kinesisの画面へ。
Create Streamを押下。
kinesis1.JPG

Stream NameとNumber of Shardsを設定し、Createを押下。ここでは、Shard数は1に設定
kinesis2.JPG

しばらくするとACTIVEになれば、準備完了。
kinesis3.JPG

依存関係の設定

pom.xml
    <dependencies>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk</artifactId>
            <version>${aws-java-sdk.version}</version>
        </dependency>
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.3</version>
        </dependency>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-kinesis-client</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>[1.0,)</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>[1.7,)</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>[1.7,)</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>log4j-over-slf4j</artifactId>
            <version>[1.7,)</version>
        </dependency>
    </dependencies>

クライアント実装

Client.java

    /* slf4j */
    private static final Logger LOG = LoggerFactory.getLogger(Client.class);

    /**
     * kinesis クライアントアプリケーション
     *
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {

        // アプリケーション名
        final String streamName = "samplestream";

        // kinesis クライアントを初期化
        // ClasspathPropertiesFileCredentialsProvider()
        AWSCredentialsProvider credentialsProvider = null;
        try {
            credentialsProvider = new InstanceProfileCredentialsProvider();
            credentialsProvider.getCredentials();
        } catch (AmazonClientException e) {
            credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
            credentialsProvider.getCredentials();
        }
        final AmazonKinesis kinesis = new AmazonKinesisClient(
                credentialsProvider);

        // ランダムな文字列を生成してPutRecordRequestオブジェクトにput
        while (true) {
            final String key = RandomStringUtils.randomAlphanumeric(10);
            final String data = "KEY_"
                    + Calendar.getInstance().getTime().getTime() + ":" + key;

            final PutRecordRequest request = new PutRecordRequest();
            request.setStreamName(streamName);
            request.setData(ByteBuffer.wrap(data.getBytes("UTF-8")));
            request.setPartitionKey(key);
            final PutRecordResult putRecord = kinesis.putRecord(request);

            LOG.info("key:{} ,record:{}", key, data, putRecord);
            LOG.info("--------");

        }
    }


アプリケーション実装

SampleRecordProcessorFactory.java
    private static final Logger LOG = LoggerFactory.getLogger(SampleRecordProcessorFactory.class);

    public SampleRecordProcessorFactory() {
        super();
    }

    @Override
    public IRecordProcessor createProcessor() {
        return new SampleRecordProcessor();
    }
SampleRecordProcessor.java

    private static final Logger LOG = LoggerFactory.getLogger(SampleRecordProcessor.class);

    private final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
    private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
    private long nextCheckpointTimeInMillis;
    private String sId = null;

    /**
     *
     */
    public SampleRecordProcessor() {
        // コンストラクタ
    }

    /*
     * (非 Javadoc)
     *
     * @see
     * com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
     * #initialize(java.lang.String)
     */
    @Override
    public void initialize(String shardId) {
        // 初期化
        sId = shardId;
    }

    /*
     * (非 Javadoc)
     *
     * @see
     * com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
     * #processRecords(java.util.List,
     * com.amazonaws.services.kinesis.clientlibrary
     * .interfaces.IRecordProcessorCheckpointer)
     */
    @Override
    public void processRecords(List<Record> records,
            IRecordProcessorCheckpointer checkpointer) {

        // Streamを作り直すと前回のAppNameが使用できなくなる

        if (records.isEmpty()) {
            return;
        }
        try {
            for (Record record : records) {

                // records処理
                String data = null;
                try {
                    data = decoder.decode(record.getData()).toString() + "\n";
                } catch (CharacterCodingException e) {
                    e.printStackTrace();
                }
                LOG.info("ShardId : {} ,Data : {}", sId, data);

            }
            checkpointer.checkpoint();
        } catch (KinesisClientLibDependencyException | InvalidStateException
                | ThrottlingException | ShutdownException e) {
        }
        if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
            try {
                checkpointer.checkpoint();
            } catch (KinesisClientLibDependencyException
                    | InvalidStateException | ThrottlingException
                    | ShutdownException e) {
                   // 何か処理
            }
            nextCheckpointTimeInMillis = System.currentTimeMillis()
                    + CHECKPOINT_INTERVAL_MILLIS;
        }
    }

    /*
     * (非 Javadoc)
     *
     * @see
     * com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
     * #shutdown(com.amazonaws.services.kinesis.clientlibrary.interfaces.
     * IRecordProcessorCheckpointer,
     * com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason)
     */
    @Override
    public void shutdown(IRecordProcessorCheckpointer checkpointer,
            ShutdownReason reason) {
        if (reason == ShutdownReason.TERMINATE) {
            try {
                checkpointer.checkpoint();
            } catch (KinesisClientLibDependencyException
                    | InvalidStateException | ThrottlingException
                    | ShutdownException e) {
                // 何か処理
            }
        }
    }
Apps.java

    private static final Logger LOG = LoggerFactory.getLogger(Apps.class);

    private static KinesisClientLibConfiguration kinesisClientLibConfiguration;

    private static String streamName = "pipeline";
    private static String applicationName = "kinesis-apps";
    private static int maxShardCount = 1;
    private static int taskIndex = 0;

    /**
     * @param args
     */
    public static void main(String[] args) {
        String workerId = null;
        try {
            workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":"
                    + UUID.randomUUID();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }

        AWSCredentialsProvider credentialsProvider = null;
        try {
            credentialsProvider = new InstanceProfileCredentialsProvider();
            credentialsProvider.getCredentials();
        } catch (AmazonClientException e) {
            credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
            credentialsProvider.getCredentials();
        }

        LOG.info("Using credentials with access key id: "
                + credentialsProvider.getCredentials().getAWSAccessKeyId());
        kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
                applicationName, streamName, credentialsProvider, workerId);
        SampleRecordProcessorFactory recordProcessorFactory = new SampleRecordProcessorFactory();
        Worker worker = new Worker(recordProcessorFactory,
                kinesisClientLibConfiguration);
        int exitCode = 0;
        try {
            worker.run();
        } catch (Throwable t) {
            exitCode = 1;
        }
        System.exit(exitCode);

    }

設定ファイル

クラスパス上にAWSCredentials.propertiesがある場合、new ClasspathPropertiesFileCredentialsProvider()#getCredentialsで取得できる。
ローカル環境から実行する場合 又は、EC2インスタンス上だが、IAM ロールを設定していない場合 には必要。
EC2インスタンスにロールを設定すべきかと思う。
AwsCredentials.properties
secretKey={発行したシークレットキー}
accessKey={発行したアクセスキー}

実行結果

client側ログ

2014-7-22 18:43:17.043 INFO   jp.co.opst.kinesis.sample.Client - key:Wjls8vv6JE ,record:KEY_1406022196819:Wjls8vv6JE
2014-7-22 18:43:17.044 INFO   jp.co.opst.kinesis.sample.Client - --------
2014-7-22 18:43:17.267 INFO   jp.co.opst.kinesis.sample.Client - key:gXThptVxac ,record:KEY_1406022197044:gXThptVxac
2014-7-22 18:43:17.268 INFO   jp.co.opst.kinesis.sample.Client - --------
2014-7-22 18:43:17.496 INFO   jp.co.opst.kinesis.sample.Client - key:2u3XvnmZxJ ,record:KEY_1406022197268:2u3XvnmZxJ
2014-7-22 18:43:17.497 INFO   jp.co.opst.kinesis.sample.Client - --------
2014-7-22 18:43:17.723 INFO   jp.co.opst.kinesis.sample.Client - key:IQgEmGiFfi ,record:KEY_1406022197497:IQgEmGiFfi
2014-7-22 18:43:17.723 INFO   jp.co.opst.kinesis.sample.Client - --------
2014-7-22 18:43:17.944 INFO   jp.co.opst.kinesis.sample.Client - key:2Krh9Wtyi6 ,record:KEY_1406022197724:2Krh9Wtyi6
2014-7-22 18:43:17.945 INFO   jp.co.opst.kinesis.sample.Client - --------
2014-7-22 18:43:18.168 INFO   jp.co.opst.kinesis.sample.Client - key:l3GQX1KOV4 ,record:KEY_1406022197945:l3GQX1KOV4

アプリケーション側ログ

2014-7-22 18:43:19.746 INFO   jp.co.opst.kinesis.sample.SampleRecordProcessor - ShardId : shardId-000000000000 ,Data : KEY_1406022197044:gXThptVxac
2014-7-22 18:43:20.679 INFO   jp.co.opst.kinesis.sample.SampleRecordProcessor - ShardId : shardId-000000000000 ,Data : KEY_1406022196374:wuTdy5QSnG
2014-7-22 18:43:21.744 INFO   jp.co.opst.kinesis.sample.SampleRecordProcessor - ShardId : shardId-000000000000 ,Data : KEY_1406022197268:2u3XvnmZxJ
2014-7-22 18:43:21.745 INFO   jp.co.opst.kinesis.sample.SampleRecordProcessor - ShardId : shardId-000000000000 ,Data : KEY_1406022198169:HypveQeovT
2014-7-22 18:43:21.745 INFO   jp.co.opst.kinesis.sample.SampleRecordProcessor - ShardId : shardId-000000000000 ,Data : KEY_1406022197945:l3GQX1KOV4
2014-7-22 18:43:21.745 INFO   jp.co.opst.kinesis.sample.SampleRecordProcessor - ShardId : shardId-000000000000 ,Data : KEY_1406022197497:IQgEmGiFfi

続く。。。

18
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
18
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?