3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Java経験ゼロからのKinesis Data Streams(3.1)

Posted at

チュートリアル: Kinesis Data Streams を使用した株式データのリアルタイム分析をやっていきます。

KCL(Kinesis Consumer Library)の利用に慣れるのが目的です。本格的にJavaを触ることになるので下準備パートも入念に・・・と思ってたら下準備だけで終わってしまうパートです

(下準備が大変なのがJavaの特徴かよ・・・と思ってしまう)

下準備

実行環境はmacを想定しています。

エディタのインストール

InteliJ IDEAのCommunity Editionをインストール(PyCharmに慣れていたので)。
ダウンロードしたものダブルクリック、開いたウィンドウでアプリケーションフォルダへドラッグ&ドロップ、

  • デフォルトのままいじらなければMavenプラグインも導入される
  • Featured Pluginはお好みで。筆者は全部入り。

Java, Mavenのインストール

予めHomebrewをインストールしておくこと

brew cask install java  # パスワード入力求められるので入力すること

# 確認
java -version
> openjdk version "11.0.2" 2019-01-15
> OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
> OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)

Java のバージョンを確認しておく(現在は11の模様)

PATHの設定

bash_profileに下記記載

~/.bash_profile
export JAVA_HOME=`/usr/libexec/java_home -v "11"`
PATH=${JAVA_HOME}/bin:${PATH}

記載後 source ~/.bash_profile を実行。
-vの値をずらせばJavaのバージョンスイッチができる模様)

mavenのインストール

brew install maven

# 確認
mvn -version
> Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-25T03:41:47+09:00)
> Maven home: /usr/local/Cellar/maven/3.6.0/libexec
> Java version: 11.0.2, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/openjdk-11.0.2.jdk/Contents/Home
> Default locale: en_JP, platform encoding: UTF-8
> OS name: "mac os x", version: "10.14.2", arch: "x86_64", family: "mac"

チュートリアルのソースコード

Learning Amazon Kinesis Developmentにコードがあります・・・が、正直あまりアップデートされていません。

  • master: 全コード記載された正解
  • learning-module-1: チュートリアル見ながら埋める学習用
    と言ったブランチ構成のようですが、API更新に伴うアップデートなどはmasterブランチのみに反映されている模様。
    そのためmasterブランチを元に進めます。
git clone https://github.com/aws-samples/amazon-kinesis-learning.git

この後移動するので適当なディレクトリへ。

プロジェクトの作成

初期設定

IntelliJを起動

  • [Create new project] を選択
  • **[Maven]**タブを選択 > [Create from archetype] にチェック > **[org.apache.maven.archetypes:maven-archetype-quickstart]**を選択 > [Next]
  • [group-id]: 本来はcom.github.{id}のように世界で一意に取れそうなもの、なのだがコード修正簡略化のため com.amazonaws.services.kinesis.samplesとしておく。
    [artifact-id]: プロジェクト名 (e.g. kinesis-stocktrade-tutorial) > [Next]
  • [maven home directory]: brewで入れたもの > [Next] > [Finish] (/usr/local/Cellar/maven/3.6.0/libexec)

すると自動で初期化スクリプトが走行します。
完了するとpomファイル含めた下記のようなディレクトリが出来上がるはずです。

image.png

また右下に**[Maven projects need to be imported]**というポップアップが上がっているかと思うので、Enable Auto Importを選択しておきます。

pom.xmlの編集

mavenではpom.xmlにより必要なライブラリやビルドの手順を記述します。
ライブラリについては

<dependencies>
  <dependency>
    ライブラリ
  </dependency>
  <dependency>
    ライブラリ
  </dependency>
     ...
</dependencies>

を追加していく形です。(Auto ImportをOnにしておけばIntelliJがよしなにDLしてくれる模様)

このチュートリアルにライブラリに必要なライブラリは

  • Amazon Kinesis Client Library (KCL)
    • AWS SDK(正確にはこのうちKinesisk DynamoDB, CloudWatchが必要)
    • Guava (Java 用の Google コアライブラリ)
    • Protocol Buffers
    • Apache Commons Lang
    • Apache Commons Logging
  • Apache HttpCore
  • Apache HttpClient
  • Jackson Databind
    • Jackson Annotations
    • Jackson Core
  • Jackson Dataformat: CBOR
  • Joda Time

(ドキュメントに記載されていないものも追加、一段下げてあるのは依存関係にあるもの)
これらを https://mvnrepository.com から検索、出てきたxmlを貼り付けて行きます。
KCLは最新がv2なのですが、このチュートリアルがv1止まりのようなのでv1.9.3を選択。
Apache Maven での SDK の使用

ひたすらめんどい。(Javaにはpipやyarnみたいなものはないのかな・・・)

ソースコード配置と修正

cloneしたコードの中からsrc/main/.../stocktrades以下を、IntelliJによって作られたApp.javaと同じディレクトリへとコピーします。
groupIdをcom.amazonaws.services.kinesis.samplesとしていれば、同じ構造になっているはずなので、構造を保つように置いてやればOKです。

ここでメニューバーから [Build] > [Build Project] とするとStockTradeRecordProcesser:28にエラーが出るかと思います。
IFの変更が行われたことに起因しているので、

StockTradeRecordProcesser.java
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;

とエラーが出ている行を修正します。
com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker has been deprecated のようなWarning が出続けるかとは思いますが、これはKCLがv2へとアップグレードする際に変更が必要なもの、という感じです

また、groupIdを他のものにしてしまった場合も、Buildを繰り返して発生するエラーを置換していけば大丈夫。IntelliJだと赤線引かれているところをクリック>赤い豆電球が出るのでクリック=>Set package name to ...を押せば自動で修正されるハズ、です。

実行確認

~/.aws以下にクレデンシャルを配置した上で

書き込み側

ダミーの株取引データを指定したKinesisへと書き込む関数を確認します。
stocktrades/writer/StockTradesWriter.javaをIntelliJ上で選択した上で、メニューバー [Run] > [Run ...] を選択、上がってきたポップアップでStockTradesWriterを選択します。
すると

Usage: StockTradesWriter <stream name> <region>

と出てきます。引数に何も渡していないのが原因なので、改めて
メニューバー [Run] > [Edit Configurations] > [StockTradesWriter] > [Program Arguments]{作成済みのKinesis名} {リージョン}と入力後 [OK]

image.png

再度StockTradesWriterを**[Run]**し、

INFO: Putting trade: ID 1: BUY 2383 shares of DIS for $114.35
Feb 11, 2019 9:45:47 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 2: SELL 1022 shares of JNJ for $93.94
Feb 11, 2019 9:45:47 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 3: BUY 7069 shares of WMT for $97.06
Feb 11, 2019 9:45:48 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 4: BUY 5939 shares of GOOG for $437.30

のようなログが流れていけば成功です。

読み取り側

Kinesisからレコードを読み取り、取引情報の集約をする関数の確認をします。

stocktrades/processor/StockTradesProcessor.javaについて、読み取り側と同様に **[Run]**すると

Usage: StockTradesProcessor <application name> <stream name> <region>

と出ます。
streamnameregionは書き込み側と同じ値を設定すればいいのですが、application nameで、これはKCLが状態の管理に使うDynamoDBのテーブル名 となるので一応注意です。無ければ勝手に作成してくれるようなので、streamnameと同じ名前で良いかと思います。
書き込み側と同様に再度設定して**[Run]** すると

INFO: Initializing record processor for shard: shardId-000000000057

のような表示が出てくればひとまず成功です。

ビルドの設定追加

WriterProcessorを独立して動かすために、別々にコンパイルしそれぞれ独立して動かせるようにします。下記のようにpom.xmlに追記します。

pom.xml
<project>
  ...
  <build>
    <pluginManagement><!-- バージョンはこちらで管理 -->
        ...
        <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <version>3.1.1</version>
        </plugin>
      </plugins>
    </pluginManagement>
    <plugins>
      <plugin>
        <!-- 実行クラス類をまとめたパッケージ(jar)を作成 -->
        <!-- assembly-plugin は依存関係のライブラリ含めた単一での実行可能なjarを作成 -->
        <artifactId>maven-assembly-plugin</artifactId>
        ...
        <executions>
          <execution>
            <id>build-writer</id>
            <phase>package</phase><!-- mvn package を打った際同時に生成 -->
            <goals>
              <goal>single</goal><!-- assembly plugin の基本のgoal -->
            </goals>
            <configuration>
              <archive>
                <manifest>
                  <!-- このjarのmain()とするファイルを指定 -->
                  <mainClass>com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter</mainClass>
                </manifest>
              </archive>
              <descriptorRefs>
                <!-- プロジェクトと外部依存ライブラリを1つのjarにまとめるオプション -->
                <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
              <finalName>StockTradesWriter</finalName>
            </configuration>
          </execution>
          <execution>
            <id>build-processor</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
            <configuration>
              <archive>
                <manifest>
                  <mainClass>com.amazonaws.services.kinesis.samples.stocktrades.processor.StockTradesProcessor</mainClass>
                </manifest>
              </archive>
              <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
              <finalName>StockTradesProcessor</finalName>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

追記したのち、IntelliJ右の**[Maven]**タブより [プロジェクト名] > [Lifecycle] > [package] を選択、▶️マークをクリックするとtarget直下に

  • StockTradesWriter-jar-with-dependencies.jar
  • StockTradesProcessor-jar-with-dependencies.jar

が生成されます。

後は先ほどのIntelliJでの動作確認と同様に

java -jar StockTradesWriter.jar-jar-with-dependencies.jar {streamname} {region}
# 別タブで
java -jar StockTradesProcessor.jar-jar-with-dependencies.jar {appname} {streamname} {region}

と、ターミナルで打つと動作確認ができるかと思います。


下準備はここまで、次は本来ならば穴埋めになるはずだった各クラスやメソッドについて学習しようと思います。

3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?