Help us understand the problem. What is going on with this article?

Java経験ゼロからのKinesis Data Streams(3.2)

More than 1 year has passed since last update.

チュートリアル: Kinesis Data Streams を使用した株式データのリアルタイム分析について引き続き。

学習用のlearning-module-1ブランチには、

// TODO: Implement method

と歯抜けになっている部分があります。(現在はビルドできないようですが)このブランチを元に、,masterブランチと比較しつつ学習を進めていきます。

プロデューサーの実装

ステップ 4: プロデューサーを実装するから始めます。AWS SDK for Java を使ってレコードの書き込みを実施するパート(KPLの利用はない模様)。
チュートリアルに書かれている通り、手順としては

  1. ストリーム名、リージョン名を入力
  2. ClientBuilder を作成
  3. リージョン、認証情報、およびクライアント構成を設定
  4. ClientBuilderよりKinesisクライアントを構成
  5. ストリームの状態を確認
  6. 100 ミリ秒ごとにランダムな取引をストリームに送信

が行われます。歯抜けになっているのは

のみです。

writer.StockTradesWriter.SendStockTrade()

/**
 * Kinesisクライアントを用いて、与えられたストリームに株式取引情報を送信
 *
 * @param trade 株式取引を表すインスタンス
 * @param kinesisClient Kinesisクライアント
 * @param streamName ストリーム名
 */
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient,
            String streamName) {
    byte[] bytes = trade.toJsonAsBytes();
    // Jackson(JSONライブラリ)がうまく動かなかった際bytesがnullになる可能性に対処
    if (bytes == null) {
        LOG.warn("Could not get JSON bytes for stock trade");
        return;
    }

    LOG.info("Putting trade: " + trade.toString());
    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
    // ティッカーシンボルをパーティションキーに使用
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    try {
        kinesisClient.putRecord(putRecord);
    } catch (AmazonClientException ex) {
        LOG.warn("Error sending record to Amazon Kinesis.", ex);
    }
}
  • PutRecord APIではバイト配列で書き込みを行うので、株式取引情報を表すオブジェクトをtoJsonAsBytes()を用いてバイト配列へ変換(内部でjackson.databind.ObjectMapperwriteValueAsBytes()を利用)
  • パーティションキーとしてティッカーシンボルを利用。チュートリアル曰く、シャード1つに対しに数百個または数千個のパーティションキーが目安の模様(どのように格納先シャードが決定されるかは以前の記事参照)

  • 1シャードあたりの書き込み制限やAPIコール数制限、またNW接続エラーなど書き込み失敗に対処(今回はtry...catchで挟むだけ)

    • 今回はPutRecord APIを利用したが、大量のレコードが生成される場合はPutRecords APIを利用して複数のレコードをまとめて送信することを検討(詳細はストリームへのデータの追加を参照)

コンシューマーの実装

次にステップ 5: コンシューマーを実装するへと進みます。歯抜けになっているのは

の3つですが、理解のためこれらを呼び出すprocessor.StockTradeProcessorクラスから追っていきます。

processor.StockTradeProcessor

main()メソッドの重要な箇所を抜粋してみます

public static void main(String[] args) throws Exception {

    ...

    // KCLの設定
    KinesisClientLibConfiguration kclConfig =
            new KinesisClientLibConfiguration(applicationName, streamName, credentialsProvider, workerId)
        .withRegionName(region.getName())
        .withCommonClientConfig(ConfigurationUtils.getClientConfigWithUserAgent());

    // IRecordProcessorインターフェイスのクラスファクトリ
    IRecordProcessorFactory recordProcessorFactory = new StockTradeRecordProcessorFactory();

    // workerの作成
    Worker worker = new Worker(recordProcessorFactory, kclConfig);

    int exitCode = 0;
    try {
        // worker走行
        worker.run();
    } catch (Throwable t) {
        LOG.error("Caught throwable while processing data.", t);
        exitCode = 1;
    }
    System.exit(exitCode);

}

KCLの実行には大きく分けて分けて

  1. KinesisClientLibConfigurationを通し設定
  2. IRecordProcessorインターフェイスを実装したクラスを返す関数(ファクトリ)の作成
  3. 1,2 を引数に与えたworkerの作成・走行

の3ステップがあります。
このうち2番、RecordProcessorインターフェイスをもつクラスと、そのファクトリ(今回はStockTradeRecordProcessorStockTradeRecordProcessorFactory)が、ユーザーが実装する対象になります。

processor.StockTradeRecordProcessor

空欄になっていたprocessor.StockTradeRecordProcessor.reportStats()processor.StockTradeRecordProcessor.resetStats()processor.StockTradeRecordProcessor.processRecord() があるのがこのクラスです。

StockTradeRecordProcessor.java
public class StockTradeRecordProcessor implements IRecordProcessor {

    ...

    /**
     * インスタンスの処理対象のシャードIDを受け取る、初期化のメソッド
     */
    @Override
    public void initialize(String shardId) {
        LOG.info("Initializing record processor for shard: " + shardId);
        this.kinesisShardId = shardId;
        nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
        nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
    }

    /**
     * 取得したレコードを処理するメソッド
     */
    @Override
    public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
        for (Record record : records) {
            // レコードを処理
            processRecord(record);
        }

        // インターバルに設定した期間が経過したらレポート
        if (System.currentTimeMillis() > nextReportingTimeInMillis) {
            reportStats();
            resetStats();
            nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
        }

        // checkpointインターバルのたびcheckpoint
        if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
            checkpoint(checkpointer);
            nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
        }
    }
    private void reportStats() {
        // TODO: Implement method
        System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
                stockStats + "\n" +
                "****************************************************************\n");
    }
    private void resetStats() {
        // TODO: Implement method
        stockStats = new StockStats();
    }
    private void processRecord(Record record) {
        // TODO: Implement method
        // レコードをバイト配列からオブジェクトへ
        StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
        if (trade == null) {
            LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey());
            return;
        }
        stockStats.addStockTrade(trade);
    }

    /**
     * 処理が終了、もしくは反応がないときに呼び出されるメソッド
     */
    @Override
    public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
        LOG.info("Shutting down record processor for shard: " + kinesisShardId);
        // シャードの終端に到達したらcheckpointすることで、子シャードからのデータの処理が開始できる
        if (reason == ShutdownReason.TERMINATE) {
            checkpoint(checkpointer);
        }
    }

    private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
        LOG.info("Checkpointing shard " + kinesisShardId);
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException se) {
            // processorインスタンスがshutdownされた時はcheckpointは無視(フェールオーバー)
            LOG.info("Caught shutdown exception, skipping checkpoint.", se);
        } catch (ThrottlingException e) {
            // スロットリングにあった場合はcheckpointを行わない. バックオフとリトライを実運用の際は考慮すること.
            LOG.error("Caught throttling exception, skipping checkpoint.", e);
        } catch (InvalidStateException e) {
            // DynamoDBに問題があった場合のエラー
            LOG.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
        }
    }

}

このクラスはIRecordProcessorのインターフェースを満たすような実装となっています。
IRecordProcessorには、

public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) 

の3つが実装すべきメソッドです(※このチュートリアルで使用されているインターフェースはバージョン1で、現在はより高機能なバージョン2があります)。

このうちprocessRecords()の内部で利用するメソッドがチュートリアルの実装対象だった、というわけでした。
このような単純なアプリケーションでも、KCLを使うことでprocessRecords()の中をいじるだけで簡単に実装ができるとのことです。また

新しいレコードが使用可能になると、KCL がレコードを取得してレコードプロセッサを呼び出すため、Kinesis Data Streams からレコードを取得する方法を心配しなくて済みます。また、シャード数やコンシューマーインスタンス数についても心配しなくて済みます。ストリームがスケールアップされても、複数のシャードやコンシューマーインスタンスを処理するためにアプリケーションを書き直す必要はありません。

と言ったメリットがあるとのこと。

補足

受け取ったList<Record>の取り扱い

このチュートリアルでは以下のように扱っています

for (Record record : records) {
    processRecord(record);
}
...
StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());

Recordクラス自体についてはAWS SDKのドキュメントに記載あり。
getData()を呼ぶとByteBufferクラスが返って来るので、array()でバイト配列へと出力し、それをfromJsonAsBytesでJacksonのオブジェクトへと変換し取り扱いができるようにしている、と言った感じです。(内部でJacksonのreadValue()を読んでいます。)

checkpointについて

ストリームにおける特定のポイントを示しているものとなります。(DynamoDBへの書き込まれるデータの詳細がわかりやすかったです)
processRecords()が呼ばれるたびにworkerから渡されるcheckpointerが、現在そのworkerが処理している位置を示しています(コード多分この辺)。このチュートリアルでは規定秒数ごとに処理位置を報告、という形を取っています。本格利用の際は異なった戦略を取ることもありそうです。


これにて3つあったチュートリアルは終了、AWS SDK for JavaでのPutRecord APIと取り扱いや、KCLの簡単な使い方を学べた形となります。
概要は理解できた一方で、詳細な使い方はチュートリアルだけでは学べず、ドキュメントやソースコード自体を当たる必要がある印象を受けました。

このチュートリアル自体も書かれてから大分時間が経っているようで、利用しているインターフェースやバージョンも古いことが見て取れます。なので次はこのチュートリアルにそって各所アップデートかけてみようかなと思います。

leo-mon
Cloud Architect / Developer
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away