1. 目的
- Kafkaの学習をしている。前の記事「【初心者】Amazon Managed Streaming for Apache Kafka (MSK) / MSK Serverless を使ってみる」にて、AWS上でのKafkaクラスターの作成、Producerからのメッセージ送信/Consumerからのメッセージ取得を行った。
- もう少し実際のユースケースに近いハンズオンを実施し、理解を深める。(ゴール: 処理の流れの雰囲気がなんとなく分かること)
2. やったこと
- AWSが提供する公式ハンズオン集(AWS workshop Studio - Amazon MSK Labs)から、「Clickstream Lab」を実施する。
- (理解できているかは別として)ハンズオンは一応完了したので、自分(初心者)の気づきなどをハンズオン手順の補足として記載する。
- 大まかな内容は以下の通り。
- Kafkaクラスターの作成
- Producerにて、クリックストリームデータ(WEBサイトのアクセスログを模擬)を送信
- Kinesis Data Analytics で、Kafkaクラスターに送られたメッセージを処理し、Kafkaクラスターの別のTopicに再送信するとともに、Open Search Service にも保存
- Consumerにて、Kafkaクラスターからメッセージを取得
- Kibanaにて、Open Search Service に保存されたデータを可視化
3. 構成図
-
ハンズオンの説明にも図があるが、自分の理解を記載。
4. 手順
ハンズオンの手順通りに実施する。詰まった点や補足的に実施したコマンドなどを記載する。
Setup
- 指定のCloudFormationテンプレートを用いて、VPC(必要なSubnetなど含む)、MSKクラスター、Kinesis Data Analytics のアプリケーション、OpenSearch Serviceのドメインを一気に作成する。20分程度で作成完了する。
- 東京リージョンではエラーで作成不可のため、今回はus-west-2(オレゴン)で実施。(テンプレート内で、使用するAMIが以下の3リージョンしか設定されていないため)
MSKFinkPrivate.yml
RegionAMI:
us-east-1:
HVM64: ami-00dc79254d0461090
us-west-2:
HVM64: ami-0a85857bfc5345c38
eu-west-1:
HVM64: ami-040ba9174949f6de4
Run Producer
- EC2インスタンスでSchema Registry Service(AMIにインストール済)を起動し、ProducerがSchema Registry Service を使うように設定する。
- 4つのTopicを手動で作成する。
- ExampleTopic: Producerがメッセージを送信し、Kinesis Data Analyticsのアプリケーションがメッセージを取得する用
- Departments_Agg, ClickEvents_UserId_Agg_Result, User_Sessions_Aggregates_With_Order_Checkout: Kinesis Data Analyticsのアプリがメッセージを送信し、Consumerがメッセージを取得する用
- KafkaClickstreamClient-1.0-SNAPSHOT.jar を実行して、ExampleTopic に大量のメッセージを送信する。以下のような内容のデータが送信される。
/tmp/Clickstream.txt
{"ip": "66.249.1.133", "eventtimestamp": 1655183298630, "devicetype": "tablet", "event_type": "home_page", "product_type": "N/A", "userid": 7496, "globalseq": 2, "prevglobalseq": 0}
{"ip": "66.249.1.254", "eventtimestamp": 1655183298633, "devicetype": "mobile", "event_type": "home_page", "product_type": "N/A", "userid": 6625, "globalseq": 3, "prevglobalseq": 0}
{"ip": "66.249.1.49", "eventtimestamp": 1655183298634, "devicetype": "mobile", "event_type": "home_page", "product_type": "N/A", "userid": 4000, "globalseq": 4, "prevglobalseq": 0}
- このProducerのJavaアプリはGitHubで公開されているため、ソースをちゃんと見れば詳しい挙動が分かるかもしれない。
- Schema Registry Serviceの動作を理解するのが初心者には難しいが、スキーマを定義したり、データのシリアライズをしたりしているとざっくり理解する。(以下参照)
Configure Amazon KDA for Java Application
- CloudFormationにより、Kinesis Data Analyticsのアプリケーションは作成済。Kafkaクラスターへの接続設定など一部のパラメータを修正する。Producer側同様、このアプリからSchema Registry Serviceへアクセスが発生する。
- アプリケーションを実行すると、KafkaクラスターのExampleTopicからメッセージを取得し、メッセージを処理した上で、Kafkaクラスターの3つのTopicへのメッセージ再送信、およびOpenSearch Serviceへの書き込みが行われる。
- このKinesis Data Analytics上で動作するアプリケーションはS3に保存されているものを参照して使用しているが、ソースは公開されていない様子。
Consume From Amazon MSK
- Producerを実行したEC2インスタンスで、今度はConsumerを実行し、3つのTopicに対してデータの取得を行う。3つのTopicで別々のメッセージが取得できることを確認する。
[ec2-user@ip-10-0-0-237 ~]$ /home/ec2-user/kafka/bin/kafka-console-consumer.sh --bootstrap-server [endpoint] --topic Departments_Agg --from-beginning
{"departmentName":"video games","departmentCount":127,"windowBeginTime":1655183300000,"windowEndTime":1655183310000}
{"departmentName":"AirPods","departmentCount":125,"windowBeginTime":1655183300000,"windowEndTime":1655183310000}
{"departmentName":"AirPods","departmentCount":147,"windowBeginTime":1655183310000,"windowEndTime":1655183320000}
[ec2-user@ip-10-0-0-237 ~]$ /home/ec2-user/kafka/bin/kafka-console-consumer.sh --bootstrap-server [endpoint] --topic ClickEvents_UserId_Agg_Result --from-beginning
{"userSessionCount":256,"userSessionCountWithOrderCheckout":100,"percentSessionswithBuy":39.0,"windowBeginTime":1655183300000,"windowEndTime":1655183310000}
{"userSessionCount":269,"userSessionCountWithOrderCheckout":99,"percentSessionswithBuy":36.0,"windowBeginTime":1655183330000,"windowEndTime":1655183340000}
{"userSessionCount":267,"userSessionCountWithOrderCheckout":121,"percentSessionswithBuy":45.0,"windowBeginTime":1655183360000,"windowEndTime":1655183370000}
[ec2-user@ip-10-0-0-237 ~]$ /home/ec2-user/kafka/bin/kafka-console-consumer.sh --bootstrap-server [endpoint] --topic User_Sessions_Aggregates_With_Order_Checkout --from-beginning
{"userId":2552,"eventCount":39,"orderCheckoutEventCount":39,"deptList":["cameras","soundbars","AirPods","ear phones","cd players","cell phones","video games"],"eventKey":1,"windowBeginTime":1655183299648,"windowEndTime":1655183300767}
{"userId":19806,"eventCount":14,"orderCheckoutEventCount":14,"deptList":["soundbars","AirPods","cd players","video games","cell phones"],"eventKey":1,"windowBeginTime":1655183300177,"windowEndTime":1655183301181}
{"userId":30690,"eventCount":28,"orderCheckoutEventCount":28,"deptList":["cameras","AirPods","laptops","ear phones","cd players","cell phones","video games"],"eventKey":1,"windowBeginTime":1655183300543,"windowEndTime":1655183301549}
Create Kibana Dashboard
- Kinesis Data Analytics のアプリケーションがOpenSearch Serviceに保存したデータを参照し可視化する。
- EC2インスタンスでSSHポート転送を行い、ローカルの9092/tcpポートをリモート(KibanaのURL)の443/tcpに転送する。TeraTermの場合の設定は以下。
- Labに誤記があり、「https://127.0.0.1:9200/_plugin/kibana」とあるが、正しくは「https://127.0.0.1:9092/_plugin/kibana」でkibanaのダッシュボードにアクセスする。
- グラフ化は手順通り問題なく実施可能。
5. 所感
- 不明点はいろいろあるが、当初ゴールの「処理の雰囲気がなんとなく分かること」はいったん達成したためよしとしたい。
- ついでにKinesis Data Analytics も初めて設定して、よい体験になった。