0
1

More than 1 year has passed since last update.

IoT リアルタイムデータを Kinesis Data Analytics for Apache Flink で集計してみた

Last updated at Posted at 2022-07-02

はじめに

Amazon Kinesis Data Analytics は、一言で表現するとストリーミングデータに対する分析・集計・データ加工を行うためのサービスです。構成図で表現すると、こんな感じで他サービスと組み合わせることができます。

image-20220627003313727.png

この構成図のポイントを整理します。

  • IoT デバイスから生まれる多くのストリームデータを、IoT Core が受け取る
  • IoT Core のルールを指定して、Kinesis Data Streams にストリームデータを流す
  • Kinesis Data Streams から、Kinesis Data Analytics にストリームデータを連携する
  • Kinesis Data Analytics は流れ込んでくるストリームデータに対する、データ分析・集計・加工が行える
    • 例えば 1 分おきの平均データを集計して、後続の Kinesis Data Streams に流すことが出来る
  • それ以降は、Lambda など自由に構成できるので、好きなようにデータをハンドリングする

一般的に、リアルタイムにデータが生成されるストリームデータを使って、一定期間ごとの集計を行うのは大変な作業です。データが少ないうちは問題なくても、データが増えたときのパフォーマンスを考慮すると、困難さが予想できます。そこで、Kinesis Data Analytics を使うと、AWS 側にインフラの管理負担を任せながら Java, Scala, Python, SQL を使って、データの集計が出来ます。

例えば次のような SQL クエリーを Kinesis Data Analytics に渡すことで、ストリームデータを 1 分ごとに集計してくれます。TUMBLE_STARTTUMBLE_END, TUMBLE の部分で、集計の期間を指定できます。下の SQL クエリーの例では、60 秒おきの平均データを集計しています。

%flink.ssql(type=update)

INSERT INTO people_data_destination
SELECT
	ROOM,
	TUMBLE_START(EVENTTIME, INTERVAL '60' SECOND) as WINDOW_START,
	TUMBLE_END(EVENTTIME, INTERVAL '60' SECOND) as WINDOW_END,
	AVG(PEOPLE) as AVG_PEOPLE_COUNT,
	COUNT(*) as RECORD_COUNT
FROM
	people_data
GROUP BY 
	ROOM,
	TUMBLE(EVENTTIME, INTERVAL '60' SECOND);

概要を説明したので、具体的な構成方法を見ていきましょう。

Kinesis Data Analytics は 2 種類ある

はじめに、Kinesis Data Analytics を触っていくうえで、混乱しがちな点を整理します。Kinesis Data Analytics は、大きくわけて 2 種類のアプリケーション構成方法があります。

基本的には、上側の「Amazon Kinesis Data Analytics for Apache Flink」の方を利用することがおすすめです。FAQ に次の一文が書かれています。

新しいプロジェクトでは、SQL アプリケーション用の Kinesis Data Analytics ではなく、新しい Kinesis Data Analytics Studio を使用することをお勧めします。Kinesis Data Analytics Studio は使いやすさと高度な分析機能を組み合わせており、洗練されたストリーム処理アプリケーションでも数分で構築できます。

Kinesis Data Analytics Studio では、SQL クエリーやコードを書きながら、リアルタイムに集計結果を確認できるので、効率的に作業できるメリットがあると感じています。Amazon Kinesis Data Analytics for Apache Flink を先に利用できるか検討してみることをおすすめします。

また、Flink 版と SQL 版は、AWS の Document や、利用する機能が異なるので、混乱しないように進めていきましょう。

前提条件

この記事で言及する範囲は、次の点線部分です。

image-20220627003313727.png

IoT Core 周りは、こちらの記事 で構築しています。Lambda のタンブリングウィンドウに関する記事ですが、作業内容としては同じなので参考にしてみてください。

Kinesis Data Streams の設定

Kinesis Data Analytics の前後にある、Kinesis Data Streams を 2 個作成します。

  • Source : Kinesis Data Analytics の入力元
  • Destination : Kinesis Data Analytics の出力先

Kinesis Data Streams のページに移動して、Create data stream を押します。

image-20220409215422895.png

今回は、テスト用途なので、Provisioned 1 で構成していきます。

image-20220409220059250.png

Create data stream を押します

image-20220409220116852.png

作成されました。

image-20220409220203940.png

同様に、Destination 用の Kinesis Data Stream を作成しました

image-20220409220358001.png

IoT Core Rule を設定

IoT Core の MQTT Topic に送付された JSON データを、Kinesis Data Streams に出力するために、IoT Core Rule を設定します。

image-20220626215247491.png

Rule の名前を適当に指定します

KinesisDataAnalyticsTest

image-20220626215732662.png

Rule の名前や条件を指定します。

次のように FROM に MQTT Topic 名を指定することで、Topic data/kinesis-test-device のデータを全て Kinesis Data Streams に流すことが出来ます。IoT 疑似デバイス側の Amazon Linux 2 から、1秒に1回 data/kinesis-test-device にデータが送付されます。

SELECT * FROM 'data/kinesis-test-device'

image-20220626215816929.png

次のパラメータを入力します。

  • IoT Core を経由して、次に送付する Kinesis Data Streams を指定
  • Partition Key は、なんでもよいので newuuid を指定
  • IAM Role を指定
${newuuid()}

image-20220626220132532.png

Create を押します

image-20220626220224200.png

Kinesis Data Analytics Studio を作成

Kinesis Data Analytics の アプリケーションを作成するために、一旦作業環境として Kinesis Data Analytics Studio Notebook を作成します。Create Studio notebook を押します。

image-20220626222326999.png

Custom with custom settings を選択して、Notebook の名前を指定します

image-20220626222418136.png

Glue の Database に、Kinesis Data Analytics 用のものを用意して、Create を押します

image-20220626222752057.png

Kinesis Data Analytics の入力元・出力元の Kinesis Data Stream を選択して、Next を押します。

image-20220626222954722.png

Studio としてのリソースや、ログに関する設定があります。これらはデフォルトでいきます。

image-20220626223150623.png

Studio で作成したアプリケーションコードをデプロイする先として、S3 バケットを選択します。Kinesis Data Analytics 用に新たに作成した方がわかりやすいと思います。

image-20220626223251640.png

Next を押します

image-20220626223335879.png

Create Studio notebook を押します。

image-20220626223352681.png

Studio が作成されました。作成直後は Ready 状態で動いていないので、Run を押します。

image-20220626223647380.png

Studio notebook を動かしたときには、1 時間あたりの費用が発生するメッセージが表示されています。起動しっぱなしにはしないように気を付けつつ、Run を押します。

image-20220409223454868.png

Running を待機する間、IAM Role の権限設定を行います。Glue Studio が Kinesis Data Streams に接続するために、IAM Role に権限の追加が必要です。IAM Role のリンクをクリックします。

image-20220626230857218.png

Attach policies を押します。

image-20220626230951228.png

今回は作業をシンプルにするために、AdministratorAccess を利用しますが、実際の本番環境などでは最低限の権限設定が良いです。

image-20220626231029897.png

2-3 分待機すると、Studio Notebook の Status が Running となるため、Open in Apache Zeppelin を選択します

image-20220626224117740.png

動作確認用の Note 作成

Zeppein の画面が開かれました。ここで、新たに Note を構成して、アプリケーションをデプロイする前のテーブル作成や動作確認などをしていきます。Create new note を選択します。

image-20220409223941848.png

好きな名前を付けて Create をします。

PreTableCreateNote

image-20220626224257789.png

新しい Note の画面が表示されました

image-20220626224329305.png

IoT Core から Kinesis Data Streams に送られてくる JSON データは次の形式です

{
  "ROOM": 7,
  "EVENTTIME": "2022-04-10T12:56:09",
  "PEOPLE": 32
}

上の JSON データに合うように、Studio 上で CREATE TABLE をしていきます。

%flink.ssql

CREATE TABLE people_data (
    ROOM INTEGER,
    EVENTTIME TIMESTAMP(3),
    PEOPLE DOUBLE,
    WATERMARK FOR EVENTTIME AS EVENTTIME - INTERVAL '5' SECOND
)
PARTITIONED BY (ROOM)
WITH (
    'connector' = 'kinesis',
    'stream' = 'kinesis-data-analytics-source',
    'aws.region' = 'ap-northeast-1',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
)

Note で上記の CREATE TABLE 文を実行すると、記事の環境では、約 40 秒後に、Table has been created と表示され、テーブル定義を作成できます。

image-20220626224642837.png

余談ですが、CREATE TABLE を行った結果、Glue Data Catalog にテーブル定義が作成されました。Athena や Redshift Spectrum と同じような仕組みですね。

image-20220626224743002.png

Studio 上で、作成した TABLE に SELECT できます。次のクエリーを入力して再生ボタンを押します。

  • (type=update) と指定することで、Kinesis Data Streams から流れてくるデータをリアルタイムに更新しつづけてくれます
%flink.ssql(type=update)

SELECT * FROM people_data;

image-20220410130856012.png

デフォルトでは、テーブル形式でデータが表示されます。1秒に1回あらたなデータが送られてきて、リアルタイムに更新され続けています

image-20220410131014593.png

Note 上で可視化の見栄えもカスタマイズができます。例えば、平均人数をパイチャートで表示できます。

image-20220410131152388.png

では次に、実際に 1 分おきに集計を行うクエリーを実行してみましょう。次の Document を参考にしてクエリーを組み立てました

%flink.ssql(type=update)

SELECT
	ROOM,
	TUMBLE_START(EVENTTIME, INTERVAL '60' SECOND) as WINDOW_START,
	TUMBLE_END(EVENTTIME, INTERVAL '60' SECOND) as WINDOW_END,
	AVG(PEOPLE) as AVG_PEOPLE_COUNT,
	COUNT(*) as RECORD_COUNT
FROM
	people_data
GROUP BY 
	ROOM,
	TUMBLE(EVENTTIME, INTERVAL '60' SECOND)

以下の実行結果のように、部屋ごとに、1分周期で平均人数が集計されていることがわかります。

image-20220410134718481.png

こんな感じに見た目の変更も可能です。

image-20220410134844353.png

この集計したデータを、出力先の Kinesis Data Streams に接続するために、Create Table を行います。

%flink.ssql

CREATE TABLE people_data_destination (
    ROOM INTEGER,
    WINDOW_START TIMESTAMP(3),
    WINDOW_END TIMESTAMP(3),
    AVG_PEOPLE_COUNT DOUBLE,
    RECORD_COUNT BIGINT
)
WITH (
    'connector' = 'kinesis',
    'stream' = 'kinesis-data-analytics-destination',
    'aws.region' = 'ap-northeast-1',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
)

実行します

image-20220410140730371.png

さきほどと同様に、自動的に Glue の Table に登録されています

image-20220626230441408.png

この次の SQL クエリーが、実際に集計した結果を次の Kinesis Data Streams に流すものです。INSERT INTO に SELECT 句を付けています。SELECT 句には、TUMBLE に関する指定で 60 秒ごとに集計するように指定しています。

%flink.ssql(type=update)

INSERT INTO people_data_destination
SELECT
	ROOM,
	TUMBLE_START(EVENTTIME, INTERVAL '60' SECOND) as WINDOW_START,
	TUMBLE_END(EVENTTIME, INTERVAL '60' SECOND) as WINDOW_END,
	AVG(PEOPLE) as AVG_PEOPLE_COUNT,
	COUNT(*) as RECORD_COUNT
FROM
	people_data
GROUP BY 
	ROOM,
	TUMBLE(EVENTTIME, INTERVAL '60' SECOND);

image-20220410140931894.png

集計されているか、出力先の Kinesis Data Streams の中身を確認します。

%flink.ssql(type=update)

SELECT * FROM people_data_destination;

無事に 出力先 Kinesis Data Streams に連携されていることがわかります。素敵ですね。

image-20220410141605000.png

デプロイ用の Note 作成

ここまでの手順で、正常に集計用のクエリーが動作することがわかりました。Kinesis Data Analytics のアプリケーションをデプロイ用の、新しい Note を作成します。Create new note を選びます。

image-20220626231808619.png

適当な名前を入れて、Create を押します。

DataAggregation

image-20220626231838255.png

次のクエリーを入力します。1 分の集計データを、Kinesis Data Streams に格納するクエリーです。

%flink.ssql(type=update)

INSERT INTO people_data_destination
SELECT
	ROOM,
	TUMBLE_START(EVENTTIME, INTERVAL '60' SECOND) as WINDOW_START,
	TUMBLE_END(EVENTTIME, INTERVAL '60' SECOND) as WINDOW_END,
	AVG(PEOPLE) as AVG_PEOPLE_COUNT,
	COUNT(*) as RECORD_COUNT
FROM
	people_data
GROUP BY 
	ROOM,
	TUMBLE(EVENTTIME, INTERVAL '60' SECOND);

image-20220626232444020.png

Studio で作成したアプリケーションのデプロイ

デプロイするために、まずビルドが必要です。Studio Note 上で、Build DataAggregateNote and export to Amazon S3 を選択します。

image-20220410142042154.png

Build and export を選択します

image-20220410142150099.png

Started となり、開始されました

image-20220410142208828.png

ぐるぐるまわって、Build 中なことがわかります

image-20220410142350770.png

3分ほどまつと、Build されました

image-20220410142447244.png

Deploy を押します

image-20220410142521726.png

Deploy using AWS console を選択します。

image-20220410142545365.png

AWS マネージメントコンソールに切り替わります。Studio notebook で利用した IAM Role を指定します。

image-20220626232731648.png

Create streaming application を押します。

image-20220410142809351.png

Streaming Application が作成されました。

image-20220626232935424.png

デプロイしたアプリケーションは、Run を押すことで動作開始します。

image-20220410143605287.png

このまま Run を押します。

image-20220410143620335.png

Running になりました。作成したアプリケーションが稼働し、自動的に IoT から流れてくるデータを 1 分おきに集計して、Kinesis Data Streams に格納してくれているはずです。

image-20220410143851951.png

動作確認

Studio Notebook 上で、出力先の Kinesis Data Stream の内容を SELECT で確認してみます

%flink.ssql(type=update)

SELECT * FROM people_data_destination;

すると、正しく 1 分ごとのデータが出力されています!デプロイしたアプリケーションが正しく動作していることがわかりました。

image-20220626233501001.png

ただの興味ですが、Kinesis Data Analytics で動かしているアプリケーションで、Flink Dashboard を確認してみます。

image-20220626233650021.png

すると、なにやら動作していることがわかります。

image-20220626233802989.png

こんな画面が見えます

image-20220626233917062.png

こんな画面が見えます

image-20220626234017479.png

検証を通じてわかったこと

  • Kinesis Data Analytics には、データ加工のための SQL クエリやコードを書きながら、リアルタイムに加工結果を確認できるエディターの Studio notebook が存在している
  • Studio notebook で作成したコードは、Studio notebook の画面上からデプロイを進められる
  • Java, Scala, Python, SQL を利用してデータ変換のコードを書ける
  • 1分ごとに集計をしたときに、その期間内にデータが来なかった場合、何も処理されない
    • 表現を変えると、欠損値として Null や 0 を出すようなことは難しい。その期間の 行データは生成されない
  • Studio 上で、CREATE TABLE を行うと、Glue Data Catalog にテーブル定義が作成される
  • Legacy SQL アプリケーションよりも、Flink 版を利用した方が良い
  • 新しい Flink の SQL でも、タンプリングウィンドウやスライディングウィンドウがある
  • Kinesis Data Analytics には、オートスケール機能がある。スケール条件は Document に詳細が書かれている
  • オートスケーリングで、parallelism が変更されると、一時的にダウンタイムが発生する
  • Studio Note で、同時にジョブを実行できる上限が parallelism で制限されている
    • parallelism が 4 の場合は、Note 上で実行できるジョブの同時実行数が 4 になる
  • Studio Note で、parallelism の上限を変更した場合、一時的に Studio Note が開けなくなるダウンタイムが発生する
    • 数分のダウンタイムあり
    • ダウンタイム前に実行していたジョブは、停止される

参考URL

Kinesis Data Analytics に関する AWS Blog
https://aws.amazon.com/jp/blogs/news/introducing-amazon-kinesis-data-analytics-studio-quickly-interact-with-streaming-data-using-sql-python-or-scala/

Kinesis Data Analytics の Document
Tumbling window や Sliding window のサンプルが掲載されている
https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-zeppelin-sql-examples.html

Apache Flink SQL Document
SQL クエリーに関する Document
https://nightlies.apache.org/flink/flink-docs-release-1.7/dev/table/sql.html

ウォーターマークについて

一般に、ウォーターマークとは、ストリームのその時点までに、あるタイムスタンプまでのすべてのイベントが到着しているはずだという宣言である。

https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/time/#event-time-and-watermarks
https://acro-engineer.hatenablog.com/entry/2017/05/11/120000
http://vishnuviswanath.com/flink_eventtime.html
https://www.oreilly.com/library/view/introduction-to-apache/9781491977132/ch04.html

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1