0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Kinesis Data Analytics for Flink Applicationsを試してみた

Posted at

背景・目的

以前、こちらでKinesis Data Analytics for SQL Applicationを試してみましたが、今回はJavaのチュートリアルを試してみます。

概要

こちらをもとに理解します。
なお、Kinesis Data Analitics(KDA)自体の仕様については、こちらの記事で整理したので省略します。

Architecture

こちらのWhat is Apache Flink? のドキュメントを元に整理します。

  • Apache Flink は、無制限および制限付きのデータ ストリームに対するステートフルな計算のためのフレームワークおよび分散処理エンジン。
  • Flink は、すべての一般的なクラスター環境で実行し、メモリ内速度であらゆる規模で計算を実行するように設計されている。

Flinkの特徴を次に記載します。

Process Unbounded and Bounded Data

あらゆる種類のデータは、イベントのストリームとして生成される。以下のようなデータはすべてストリームとして生成される。

  • クレジット カード トランザクション
  • センサー測定値
  • マシン ログ
  • Web サイトやモバイル アプリケーションでのユーザー インタラクションなど、

データは、制限のないストリームまたは制限のあるストリームとして処理できる。

image.png
※出典:https://flink.apache.org/flink-architecture.html

Flinkは、制限のないデータ セットと制限のあるデータ セットの処理に優れており、時間と状態の正確な制御により、Flink のランタイムは無制限のストリームであらゆる種類のアプリケーションを実行可能。 バインドされたストリームは、固定サイズのデータ セット用に特別に設計されたアルゴリズムとデータ構造によって内部的に処理されるため、優れたパフォーマンスが得られる。

無制限のストリームについて

  • 開始点があるが、終了点は定義されない。これらは生成されたときにデータを終了したり提供したりしない。
  • 制限のないストリームは、継続的に処理する必要がある。イベントは、取り込まれた後すぐに処理する必要がある。
  • 入力は無制限であり、どの時点でも完了しないため、すべての入力データが到着するのを待つことはできない。
  • 制限のないデータを処理するには、多くの場合、結果の完全性を判断できるようにするために、イベントが発生した順序などの特定の順序でイベントを取り込む必要がある。

制限付きストリームについて

  • 開始と終了が定義されている。バインドされたストリームは、計算を実行する前にすべてのデータを取り込むことで処理できる。
  • バインドされたデータ セットは常に並べ替えることができるため、バインドされたストリームを処理するために順序付けられた取り込みは必要はない。
  • バインドされたストリームの処理は、バッチ処理とも呼ばれる。

Deploy Applications Anywhere

  • Flink は分散システムであり、アプリケーションを実行するにはコンピューティング リソースが必要。
  • Flink は、Hadoop YARN や Kubernetes などのすべての一般的なクラスター リソース マネージャーと統合されるが、スタンドアロン クラスターとして実行するようにセットアップすることも可能。
  • Flink は、上記のリソース マネージャーのそれぞれが適切に機能するように設計されており、 これは、Flink が各リソース マネージャーと慣用的な方法で対話できるようにするリソース マネージャー固有の展開モードによって実現される。
  • アプリケーションをデプロイするとき、Flink はアプリケーションの構成された並列処理に基づいて必要なリソースを自動的に識別し、リソース マネージャーからそれらを要求し、障害が発生した場合、Flink は新しいリソースをリクエストして、障害が発生したコンテナーを置き換える。
  • アプリケーションを送信または制御するためのすべての通信は、REST 呼び出しを介して行われ、これにより、多くの環境での Flink の統合が容易になる。

Run Applications at any Scale

  • Flink は、あらゆる規模でステートフル ストリーミング アプリケーションを実行するように設計されている。 アプリケーションは、おそらく数千のタスクに並列化され、クラスター内で分散され、同時に実行される。 したがって、アプリケーションは事実上無制限の量の CPU、メイン メモリ、ディスク、およびネットワーク IO を活用でき、さらに、Flink は非常に大きなアプリケーションの状態を簡単に維持する。
  • その非同期およびインクリメンタル チェックポイント アルゴリズムは、1 回限りの状態の一貫性を保証しながら、処理の待機時間への影響を最小限に抑える。

ユーザーは、本番環境で実行されている Flink アプリケーションのスケーラビリティの例は以下の通り。

  • 1 日あたり数兆のイベントを処理するアプリケーション
  • 数テラバイトの状態を維持するアプリケーション、および数千のコアで実行されるアプリケーション。

Leverage In-Memory Performance

  • ステートフルな Flink アプリケーションは、ローカル状態アクセス用に最適化されている。 タスクの状態は、常にメモリ内に維持され、状態のサイズが使用可能なメモリを超える場合は、アクセス効率の高いディスク上のデータ構造内に維持される。 したがって、タスクはローカルの、多くの場合メモリ内の状態にアクセスすることですべての計算を実行し、処理のレイテンシを非常に低く抑える。
  • Flink は、ローカル状態を耐久性のあるストレージに定期的かつ非同期的にチェックポイントすることにより、障害が発生した場合に 1 回限りの状態の一貫性を保証する。

image.png
※出典 https://flink.apache.org/flink-architecture.html

Application

  • Flink は、無制限および制限付きのデータ ストリームに対するステートフルな計算のためのフレームワーク。
  • Flink は、さまざまなレベルの抽象化で複数の API を提供し、一般的なユース ケース用の専用ライブラリを提供する。

以降では、Flink の使いやすく表現力豊かな API とライブラリを記載する。

Building Blocks for Streaming Applications

ストリーム処理フレームワークで構築および実行できるアプリケーションのタイプは、フレームワークがストリーム、状態、および時間をどの程度制御するかによって定義されている。以下では、ストリーム処理アプリケーションのこれらのビルディング ブロックについて説明し、それらを処理するための Flink のアプローチについて説明する。

ストリーム

  • ストリームには、ストリームの処理方法および処理方法に影響を与えるさまざまな特性がある。 Flink は、あらゆる種類のストリームを処理できる汎用性の高い処理フレームワークである。
制限付きストリームと制限なしストリーム
  • ストリームは、制限なしまたは制限付き (つまり、固定サイズのデータ​​ セット) にすることが可能。
  • Flink には、制限のないストリームを処理する洗練された機能があるが、制限のあるストリームを効率的に処理する専用のオペレーターもある。
リアルタイムおよび記録されたストリーム

すべてのデータはストリームとして生成され、データを処理するには 2 つの方法がある。

  • 生成時にリアルタイムで処理
  • ファイル システムやオブジェクト ストアなどのストレージ システムにストリームを保持し、後で処理

State

すべての非自明なストリーミング アプリケーションはステートフルである。つまり、個々のイベントに変換を適用するアプリケーションのみが状態を必要としない。基本的なビジネス ロジックを実行するすべてのアプリケーションは、イベントまたは中間結果を記憶して、後でアクセスできるようにする必要がある。

Multiple State Primitives

Flink は、原子値、リスト、またはマップなど、さまざまなデータ構造の状態プリミティブを提供する。開発者は、関数のアクセス パターンに基づいて最も効率的な状態プリミティブを選択できる。

Pluggable State Backends

アプリケーションの状態は、プラグ可能な状態のバックエンドによって管理され、チェックポイントが作成される。 Flink は、状態をメモリまたは RocksDB (効率的な組み込みオンディスク データ ストア) に保存するさまざまな状態バックエンドを備えている。カスタム状態のバックエンドもプラグイン可能。

Exactly-once state consistency

Flink のチェックポイントと回復アルゴリズムは、障害が発生した場合にアプリケーションの状態の一貫性を保証する。したがって、障害は透過的に処理され、アプリケーションの正確性には影響しない。

Very Large State

Flink は、非同期およびインクリメンタル チェックポイント アルゴリズムにより、サイズが数テラバイトのアプリケーション状態を維持できる。

Scalable Applications

Flink は、状態を多かれ少なかれワーカーに再分配することで、ステートフル アプリケーションのスケーリングをサポートする。

Time

Timeは、ストリーミング アプリケーションのもう 1 つの重要な要素。各イベントは特定の時点で生成されるため、ほとんどのイベント ストリームには固有の時間セマンティクスがある。
さらに、ウィンドウ集約、セッション化、パターン検出、時間ベースの結合など、多くの一般的なストリーム計算はTimeに基づく。
ストリーム処理の重要な側面は、アプリケーションが時間を測定する方法、つまり、イベント時間と処理時間の差とのこと。

Flink は時間関連の機能の豊富なセットを提供する。

Event-time Mode
  • イベント時間セマンティクスでストリームを処理するアプリケーションは、イベントのタイムスタンプに基づいて結果を計算する。
  • イベントタイム処理により、記録されたイベントが処理されるかリアルタイムのイベントが処理されるかに関係なく、正確で一貫した結果が得られる。
Watermark Support
  • Watermarkを使用して、イベント時間アプリケーションの時間を推論する。
  • Watermarkは、遅延と結果の完全性をトレードオフする柔軟なメカニズムでもある。
Late Data Handling
  • Watermarkを使用してイベント時間モードでストリームを処理する場合、関連するすべてのイベントが到着する前に計算が完了することがある。このようなイベントはlate eventと呼ばれる
  • Flink は、副出力を介して再ルーティングしたり、以前に完了した結果を更新したりするなど、遅延イベントを処理するための複数のオプションを備えている。
Processing-time Mode
  • イベント時間モードに加えて、Flink は、処理マシンの実時間によってトリガーされる計算を実行する処理時間セマンティクスもサポートする。
  • 処理時間モードは、おおよその結果を許容できる厳密な低遅延要件を持つ特定のアプリケーションに適している。

Layered APIs

Flink は 3 つの階層化された APIを提供する。

image.png
※出典:https://flink.apache.org/flink-applications.html

  • The ProcessFunctions
  • The DataStream API
  • SQL & Table API

Operations

ストリーム プロセッサは、実行中にアプリケーションを監視および維持するためのツールだけでなく、優れた障害回復も提供する必要がある。
Flink は、ストリーム処理の運用面に重点を置いている。

Run Your Applications Non-Stop 24/7

Flink は、アプリケーションが実行し続け、一貫性を維持することを保証するいくつかの機能を提供する。

Consistent Checkpoints

  • 障害が発生した場合、アプリケーションが再起動され、その状態が最新のチェックポイントから読み込まれる。
  • リセット可能なストリーム ソースと組み合わせることで、この機能は 1 回限りの状態の一貫性を保証できる。

Efficient Checkpoints

  • アプリケーションが数TBの状態を維持している場合、アプリケーションの状態のチェックポイントは非常に高くつく可能性がある。
  • Flink は、アプリケーションのレイテンシ SLA に対するチェックポイントの影響を非常に小さく保つために、非同期およびインクリメンタル チェックポイントを実行できる。

End-to-End Exactly-Once

  • Flink は、障害が発生した場合でも、データが 1 回だけ書き出されることを保証する特定のストレージ システム用のトランザクション Sinkを備えています。

Integration with Cluster Managers

  • Flink は、すべての単一障害点を排除する高可用性モードを備えている。
  • HA モードは Apache ZooKeeper に基づいている。

Update, Migrate, Suspend, and Resume Your Applications

Flink のSave Pointは、ステートフル アプリケーションの更新の問題やその他の多くの関連する課題を解決する、ユニークで強力な機能。
Save Pointは、アプリケーションの状態の一貫したスナップショットであるため、チェックポイントに非常に似ている。
ただし、チェックポイントとは対照的に、Save Pointは手動でトリガーする必要があり、アプリケーションが停止しても自動的に削除されない。 Save Pontを使用して、状態互換アプリケーションを開始し、その状態を初期化できる。

Monitor and Control Your Applications

Flink は、多くの一般的なロギングおよびモニタリング サービスとうまく統合され、アプリケーションを制御して情報をクエリするための REST API を提供する。

実践

こちらを参考に試してみます。

2 つの Amazon Kinesis Data Streams(KDS)を作成

KDAのインプットと、アウトプット用の2つのKDSを作成します。

  1. 以下の2つのKDSを作成する。
  • ExampleInputStream
  • ExampleOutputStream
    image.png

S3バケットを作成

データ出力用のバケット

後述で作成するKDFのアウトプット用のS3を用意します。

  1. 新しくS3バケットを作成する。
    image.png

KDAのjar配置用のバケット

後述するKDAのコードをコンパイルした結果を配置するS3を用意します。

  1. バケットを作成します。
    image.png

Kinesis Data Firehose(KDF)を作成

上記で作成したKDSをインプットに、S3をアウトプットに設定します。

  1. ソースにKDSを指定する。
    image.png

  2. 配信ストリーム名を「kda-test-firehose」とします。
    image.png

  3. 送信先に作成したS3を指定します。
    image.png

  4. バッファサイズを「1MiB」、バッファ間隔を「60秒」に設定し、作成する。
    即確認できるようにサイズを小さくしておきます。
    image.png

入力ストリームへのサンプルレコードを書き込み

こちらにあるように、以下のコードをEC2に用意します。

$ pwd
/home/ssm-user
$ ls -l
total 4
-rwxr--r-- 1 ssm-user ssm-user 642 Oct 15 12:25 stock.py
$ 
サンプルコード
import datetime
import json
import random
import boto3

STREAM_NAME = "ExampleInputStream"


def get_data():
    return {
        'EVENT_TIME': datetime.datetime.now().isoformat(),
        'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
        'PRICE': round(random.random() * 100, 2)}


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey="partitionkey")


if __name__ == '__main__':
    generate(STREAM_NAME, boto3.client('kinesis'))

Apache Flink Streaming Java Code のダウンロードと検証

事前準備

gitコマンドのインストール

  1. EC2にgitコマンドをインストールします。
$ sudo yum install git
インストール時のログ
$ sudo yum install git
Loaded plugins: extras_suggestions, langpacks, priorities, update-motd
amzn2-core                                                                                                                                                                                                            | 3.7 kB  00:00:00
amzn2extra-docker                                                                                                                                                                                                     | 3.0 kB  00:00:00
amzn2extra-postgresql13                                                                                                                                                                                               | 3.0 kB  00:00:00
Resolving Dependencies
--> Running transaction check
---> Package git.x86_64 0:2.37.1-1.amzn2.0.1 will be installed
--> Processing Dependency: perl-Git = 2.37.1-1.amzn2.0.1 for package: git-2.37.1-1.amzn2.0.1.x86_64
--> Processing Dependency: git-core-doc = 2.37.1-1.amzn2.0.1 for package: git-2.37.1-1.amzn2.0.1.x86_64
--> Processing Dependency: git-core = 2.37.1-1.amzn2.0.1 for package: git-2.37.1-1.amzn2.0.1.x86_64
--> Processing Dependency: perl(Term::ReadKey) for package: git-2.37.1-1.amzn2.0.1.x86_64
--> Processing Dependency: perl(Git::I18N) for package: git-2.37.1-1.amzn2.0.1.x86_64
--> Processing Dependency: perl(Git) for package: git-2.37.1-1.amzn2.0.1.x86_64
--> Running transaction check
---> Package git-core.x86_64 0:2.37.1-1.amzn2.0.1 will be installed
---> Package git-core-doc.noarch 0:2.37.1-1.amzn2.0.1 will be installed
---> Package perl-Git.noarch 0:2.37.1-1.amzn2.0.1 will be installed
--> Processing Dependency: perl(Error) for package: perl-Git-2.37.1-1.amzn2.0.1.noarch
---> Package perl-TermReadKey.x86_64 0:2.30-20.amzn2.0.2 will be installed
--> Running transaction check
---> Package perl-Error.noarch 1:0.17020-2.amzn2 will be installed
--> Finished Dependency Resolution

Dependencies Resolved

=============================================================================================================================================================================================================================================
 Package                                                      Arch                                               Version                                                        Repository                                              Size
=============================================================================================================================================================================================================================================
Installing:
 git                                                          x86_64                                             2.37.1-1.amzn2.0.1                                             amzn2-core                                              71 k
Installing for dependencies:
 git-core                                                     x86_64                                             2.37.1-1.amzn2.0.1                                             amzn2-core                                             6.4 M
 git-core-doc                                                 noarch                                             2.37.1-1.amzn2.0.1                                             amzn2-core                                             2.8 M
 perl-Error                                                   noarch                                             1:0.17020-2.amzn2                                              amzn2-core                                              32 k
 perl-Git                                                     noarch                                             2.37.1-1.amzn2.0.1                                             amzn2-core                                              46 k
 perl-TermReadKey                                             x86_64                                             2.30-20.amzn2.0.2                                              amzn2-core                                              31 k

Transaction Summary
=============================================================================================================================================================================================================================================
Install  1 Package (+5 Dependent packages)

Total download size: 9.3 M
Installed size: 40 M
Is this ok [y/d/N]: y
Downloading packages:
(1/6): git-2.37.1-1.amzn2.0.1.x86_64.rpm                                                                                                                                                                              |  71 kB  00:00:00
(2/6): git-core-doc-2.37.1-1.amzn2.0.1.noarch.rpm                                                                                                                                                                     | 2.8 MB  00:00:00
(3/6): perl-Error-0.17020-2.amzn2.noarch.rpm                                                                                                                                                                          |  32 kB  00:00:00
(4/6): git-core-2.37.1-1.amzn2.0.1.x86_64.rpm                                                                                                                                                                         | 6.4 MB  00:00:00
(5/6): perl-Git-2.37.1-1.amzn2.0.1.noarch.rpm                                                                                                                                                                         |  46 kB  00:00:00
(6/6): perl-TermReadKey-2.30-20.amzn2.0.2.x86_64.rpm                                                                                                                                                                  |  31 kB  00:00:00
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Total                                                                                                                                                                                                         23 MB/s | 9.3 MB  00:00:00
Running transaction check
Running transaction test
Transaction test succeeded
Running transaction
  Installing : git-core-2.37.1-1.amzn2.0.1.x86_64                                                                                                                                                                                        1/6
  Installing : git-core-doc-2.37.1-1.amzn2.0.1.noarch                                                                                                                                                                                    2/6
  Installing : 1:perl-Error-0.17020-2.amzn2.noarch                                                                                                                                                                                       3/6
  Installing : perl-TermReadKey-2.30-20.amzn2.0.2.x86_64                                                                                                                                                                                 4/6
  Installing : perl-Git-2.37.1-1.amzn2.0.1.noarch                                                                                                                                                                                        5/6
  Installing : git-2.37.1-1.amzn2.0.1.x86_64                                                                                                                                                                                             6/6
  Verifying  : perl-TermReadKey-2.30-20.amzn2.0.2.x86_64                                                                                                                                                                                 1/6
  Verifying  : git-core-doc-2.37.1-1.amzn2.0.1.noarch                                                                                                                                                                                    2/6
  Verifying  : perl-Git-2.37.1-1.amzn2.0.1.noarch                                                                                                                                                                                        3/6
  Verifying  : git-2.37.1-1.amzn2.0.1.x86_64                                                                                                                                                                                             4/6
  Verifying  : git-core-2.37.1-1.amzn2.0.1.x86_64                                                                                                                                                                                        5/6
  Verifying  : 1:perl-Error-0.17020-2.amzn2.noarch                                                                                                                                                                                       6/6

Installed:
  git.x86_64 0:2.37.1-1.amzn2.0.1

Dependency Installed:
  git-core.x86_64 0:2.37.1-1.amzn2.0.1         git-core-doc.noarch 0:2.37.1-1.amzn2.0.1         perl-Error.noarch 1:0.17020-2.amzn2         perl-Git.noarch 0:2.37.1-1.amzn2.0.1         perl-TermReadKey.x86_64 0:2.30-20.amzn2.0.2

Complete!
$

Mavenのインストール

1.Mavenをインストールします。
Maven Projectから対象のファイルを確認します。apache-maven-3.8.6-bin.tar.gzが使用できそうです。

image.png

2.wgetで取得します。

sudo wget https://dlcdn.apache.org/maven/maven-3/3.8.6/binaries/apache-maven-3.8.6-bin.tar.gz

3.gzを展開します。

sudo tar -xzvf apache-maven-3.8.6-bin.tar.gz

4.シンボリックリンクを作成します。

sudo mv apache-maven-3.8.6 /opt/
cd /opt/
sudo ln -s /opt/apache-maven-3.8.6 apache-maven

5.PATHを追加する

$ cat .bash_profile 
# .bash_profile

# Get the aliases and functions
if [ -f ~/.bashrc ]; then
        . ~/.bashrc
fi

# User specific environment and startup programs
MVN_HOME=/opt/apache-maven
PATH=/usr/bin/python3.8:$MVN_HOME/bin:$PATH:$HOME/.local/bin:$HOME/bin

export PATH
source $HOME/.nvm/nvm.sh

[[ -s "$HOME/.rvm/scripts/rvm" ]] && source "$HOME/.rvm/scripts/rvm" # Load RVM into a shell session *as a function*
$ 

6.コマンドの確認。問題なさそうです。

$ mvn --version
Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Maven home: /opt/apache-maven
Java version: 11.0.15, vendor: Amazon.com Inc., runtime: /usr/lib/jvm/java-11-amazon-corretto.x86_64
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "4.14.276-211.499.amzn2.x86_64", arch: "amd64", family: "unix"
$ 

コードをコンパイル

1.git cloneでコードをダウンロードします。

$ git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git

2.インプットとアウトプットのKDSを指定します。

$ ls -l
total 4
-rw-rw-r-- 1 ec2-user ec2-user 3765 Aug 22 15:21 BasicStreamingJob.java
$ 

$ grep Example *java
    private static final String inputStreamName = "ExampleInputStream";
    private static final String outputStreamName = "ExampleOutputStream";
$ 
BasicStreamingJob.java
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;


/**
 * A basic Kinesis Data Analytics for Java application with Kinesis data
 * streams as source and sink.
 */
public class BasicStreamingJob {
    private static final String region = "ap-northeast-1";
    private static final String inputStreamName = "ExampleInputStream";
    private static final String outputStreamName = "ExampleOutputStream";


    private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
        Properties inputProperties = new Properties();
        inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");


        return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
    }

    private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException {
        Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
        return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
                applicationProperties.get("ConsumerConfigProperties")));
    }

    private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
        Properties outputProperties = new Properties();
        outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        outputProperties.setProperty("AggregationEnabled", "false");

        FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
        sink.setDefaultStream(outputStreamName);
        sink.setDefaultPartition("0");
        return sink;
    }

    private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
        Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
        FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(),
                applicationProperties.get("ProducerConfigProperties"));

        sink.setDefaultStream(outputStreamName);
        sink.setDefaultPartition("0");
        return sink;
    }

    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        System.out.println("main start.");

        /* if you would like to use runtime configuration properties, uncomment the lines below
         * DataStream<String> input = createSourceFromApplicationProperties(env);
         */
        DataStream<String> input = createSourceFromStaticConfig(env);

        /* if you would like to use runtime configuration properties, uncomment the lines below
         * input.addSink(createSinkFromApplicationProperties())
         */
        input.addSink(createSinkFromStaticConfig());

        env.execute("Flink Streaming Java API Skeleton");
    }
}

3.コンパイルします。

cd /home/ec2-user/amazon-kinesis-data-analytics-java-examples/GettingStarted
mvn package -Dflink.version=1.13.2
コンパイル時のログ
$ pwd
/home/ec2-user/amazon-kinesis-data-analytics-java-examples/GettingStarted
$ mvn package -Dflink.version=1.13.2
[INFO] Scanning for projects...
[INFO] 
[INFO] -----------< com.amazonaws:aws-kinesis-analytics-java-apps >------------
[INFO] Building aws-kinesis-analytics-java-apps 1.1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ aws-kinesis-analytics-java-apps ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/ec2-user/amazon-kinesis-data-analytics-java-examples/GettingStarted/src/main/resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.8.1:compile (default-compile) @ aws-kinesis-analytics-java-apps ---
[INFO] Changes detected - recompiling the module!
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 1 source file to /home/ec2-user/amazon-kinesis-data-analytics-java-examples/GettingStarted/target/classes
[INFO] 
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ aws-kinesis-analytics-java-apps ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/ec2-user/amazon-kinesis-data-analytics-java-examples/GettingStarted/src/test/resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.8.1:testCompile (default-testCompile) @ aws-kinesis-analytics-java-apps ---
[INFO] No sources to compile
[INFO] 
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ aws-kinesis-analytics-java-apps ---
[INFO] No tests to run.
[INFO] 
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ aws-kinesis-analytics-java-apps ---
[INFO] Building jar: /home/ec2-user/amazon-kinesis-data-analytics-java-examples/GettingStarted/target/aws-kinesis-analytics-java-apps-1.1.jar
[INFO] 
[INFO] --- maven-shade-plugin:3.2.1:shade (default) @ aws-kinesis-analytics-java-apps ---
[INFO] Including com.amazonaws:aws-kinesisanalytics-runtime:jar:1.2.0 in the shaded jar.
[INFO] Including org.apache.flink:flink-connector-kinesis_2.12:jar:1.13.2 in the shaded jar.
[INFO] Including joda-time:joda-time:jar:2.5 in the shaded jar.
[INFO] Including commons-io:commons-io:jar:2.8.0 in the shaded jar.
[INFO] Including commons-lang:commons-lang:jar:2.6 in the shaded jar.
[INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded jar.
[INFO] Including commons-logging:commons-logging:jar:1.1.3 in the shaded jar.
[INFO] Including com.fasterxml.jackson.core:jackson-databind:jar:2.12.1 in the shaded jar.
[INFO] Including com.fasterxml.jackson.core:jackson-annotations:jar:2.12.1 in the shaded jar.
[INFO] Including com.google.guava:guava:jar:29.0-jre in the shaded jar.
[INFO] Including com.google.guava:failureaccess:jar:1.0.1 in the shaded jar.
[INFO] Including com.google.guava:listenablefuture:jar:9999.0-empty-to-avoid-conflict-with-guava in the shaded jar.
[INFO] Including org.checkerframework:checker-qual:jar:2.11.1 in the shaded jar.
[INFO] Including com.google.errorprone:error_prone_annotations:jar:2.3.4 in the shaded jar.
[INFO] Including com.google.j2objc:j2objc-annotations:jar:1.3 in the shaded jar.
[INFO] Including commons-codec:commons-codec:jar:1.13 in the shaded jar.
[INFO] Including com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar:2.12.1 in the shaded jar.
[INFO] Including com.fasterxml.jackson.core:jackson-core:jar:2.12.1 in the shaded jar.
[INFO] Excluding org.apache.flink:force-shading:jar:1.13.2 from the shaded jar.
[INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from the shaded jar.
[INFO] Including com.amazonaws:aws-kinesisanalytics-flink:jar:2.0.0 in the shaded jar.
[INFO] Including com.amazonaws:aws-java-sdk-iam:jar:1.11.903 in the shaded jar.
[INFO] Including com.amazonaws:aws-java-sdk-kinesis:jar:1.11.903 in the shaded jar.
[INFO] Including com.amazonaws:aws-java-sdk-sts:jar:1.11.903 in the shaded jar.
[INFO] Including com.amazonaws:aws-java-sdk-logs:jar:1.11.903 in the shaded jar.
[INFO] Including com.amazonaws:aws-java-sdk-core:jar:1.11.903 in the shaded jar.
[INFO] Including org.apache.httpcomponents:httpclient:jar:4.5.13 in the shaded jar.
[INFO] Including org.apache.httpcomponents:httpcore:jar:4.4.13 in the shaded jar.
[INFO] Including software.amazon.ion:ion-java:jar:1.0.2 in the shaded jar.
[INFO] Including com.amazonaws:jmespath-java:jar:1.11.903 in the shaded jar.
[INFO] Including org.apache.logging.log4j:log4j-api:jar:2.6.1 in the shaded jar.
[INFO] Including org.apache.logging.log4j:log4j-core:jar:2.6.1 in the shaded jar.
[WARNING] Discovered module-info.class. Shading will break its strong encapsulation.
[WARNING] Discovered module-info.class. Shading will break its strong encapsulation.
[WARNING] Discovered module-info.class. Shading will break its strong encapsulation.
[WARNING] Discovered module-info.class. Shading will break its strong encapsulation.
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing /home/ec2-user/amazon-kinesis-data-analytics-java-examples/GettingStarted/target/aws-kinesis-analytics-java-apps-1.1.jar with /home/ec2-user/amazon-kinesis-data-analytics-java-examples/GettingStarted/target/aws-kinesis-analytics-java-apps-1.1-shaded.jar
[INFO] Dependency-reduced POM written at: /home/ec2-user/amazon-kinesis-data-analytics-java-examples/GettingStarted/dependency-reduced-pom.xml
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  11.901 s
[INFO] Finished at: 2022-10-15T13:22:26Z
[INFO] ------------------------------------------------------------------------
$ 

4.jarができていることを確認します。

$ ls -l target/aws-kinesis-analytics-java-apps-1.1.jar 
-rw-rw-r-- 1 ec2-user ec2-user 55700481 Oct 15 13:22 target/aws-kinesis-analytics-java-apps-1.1.jar
$ 

5.Artifact用のバケットにコピー

$ aws s3 cp aws-kinesis-analytics-java-apps-1.1.jar s3://${Artifact用のバケット}/kda/
upload: ./aws-kinesis-analytics-java-apps-1.1.jar to s3://${Artifact用のバケット}/kda/aws-kinesis-analytics-java-apps-1.1.jar
$ 

KDAアプリケーションを作成して実行する

  1. KDAの画面で以下を入力して、「ストリーミングアプリケーションの作成」します。
  • アプリケーション名に「KinesisDataApplicationTest」を指定
    image.png
    image.png

2.IAMポリシーの編集をします。
image.png

3.インプットとアウトプットのKDSを追加します。
image.png

4.KDAのアプリケーションで「設定」をクリックします。
image.png

5.Artifactでは以下を指定します。

  • S3バケットに、Artifact用のバケットを指定
  • S3オブジェクトへのパスに、「kda/aws-kinesis-analytics-java-apps-1.1.jar」
    image.png

6.ログ記録とモニタリング情報に、以下を指定します。

  • ログレベルのモニタリング (Flink-Log4J)に「デバッグ」
  • CloudWatch でメトリクスレベルをモニタリング中に「アプリケーション」を指定
    image.png

7.ランタイムプロパティ情報に以下を入力し、最後に「変更を保存」をクリックします。

  • flink.inputstream.initposに「LATEST」
  • aws:regionに「ap-northeast-1」
  • AggregationEnabledに「false」
    image.png

8.アプリケーションを実行します。
image.png

9.実行中になりました。
image.png

テストデータの投入と確認

1.上記で作成したスクリプトを実行します。

$ python3 stock.py  > log &
[1] 15228
$ 

$ tail -f log
{'EVENT_TIME': '2022-10-15T14:49:43.384228', 'TICKER': 'AAPL', 'PRICE': 18.95}
{'EVENT_TIME': '2022-10-15T14:49:43.391109', 'TICKER': 'MSFT', 'PRICE': 81.42}
{'EVENT_TIME': '2022-10-15T14:49:43.399470', 'TICKER': 'AAPL', 'PRICE': 51.93}
{'EVENT_TIME': '2022-10-15T14:49:43.406530', 'TICKER': 'AMZN', 'PRICE': 91.56}
{'EVENT_TIME': '2022-10-15T14:49:43.415289', 'TICKER': 'AAPL', 'PRICE': 13.03}
{'EVENT_TIME': '2022-10-15T14:49:43.422044', 'TICKER': 'AAPL', 'PRICE': 83.61}


2.S3にファイルが届いていることを確認します。
image.png

3.S3 Selectで中身を確認します。想定した結果が確認できました。
image.png

考察

サンプルコードをKDSのインプット、アウトプットを変えた程度なので、Flinkの機能は確認できていません。
今後は、Flinkの機能を試してみたいと思います。

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?