4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【初心者】Amazon Managed Streaming for Apache Kafka (MSK) を使ってみる #2 (Clickstream Labの実施)

Posted at

1. 目的

2. やったこと

  • AWSが提供する公式ハンズオン集(AWS workshop Studio - Amazon MSK Labs)から、「Clickstream Lab」を実施する。
  • (理解できているかは別として)ハンズオンは一応完了したので、自分(初心者)の気づきなどをハンズオン手順の補足として記載する。
  • 大まかな内容は以下の通り。
    • Kafkaクラスターの作成
    • Producerにて、クリックストリームデータ(WEBサイトのアクセスログを模擬)を送信
    • Kinesis Data Analytics で、Kafkaクラスターに送られたメッセージを処理し、Kafkaクラスターの別のTopicに再送信するとともに、Open Search Service にも保存
    • Consumerにて、Kafkaクラスターからメッセージを取得
    • Kibanaにて、Open Search Service に保存されたデータを可視化

3. 構成図

4. 手順

ハンズオンの手順通りに実施する。詰まった点や補足的に実施したコマンドなどを記載する。

Setup

  • 指定のCloudFormationテンプレートを用いて、VPC(必要なSubnetなど含む)、MSKクラスター、Kinesis Data Analytics のアプリケーション、OpenSearch Serviceのドメインを一気に作成する。20分程度で作成完了する。
  • 東京リージョンではエラーで作成不可のため、今回はus-west-2(オレゴン)で実施。(テンプレート内で、使用するAMIが以下の3リージョンしか設定されていないため)
MSKFinkPrivate.yml
  RegionAMI:
      us-east-1:
        HVM64: ami-00dc79254d0461090
      us-west-2:
        HVM64: ami-0a85857bfc5345c38
      eu-west-1:
        HVM64: ami-040ba9174949f6de4

Run Producer

  • EC2インスタンスでSchema Registry Service(AMIにインストール済)を起動し、ProducerがSchema Registry Service を使うように設定する。
  • 4つのTopicを手動で作成する。
    • ExampleTopic: Producerがメッセージを送信し、Kinesis Data Analyticsのアプリケーションがメッセージを取得する用
    • Departments_Agg, ClickEvents_UserId_Agg_Result, User_Sessions_Aggregates_With_Order_Checkout: Kinesis Data Analyticsのアプリがメッセージを送信し、Consumerがメッセージを取得する用
  • KafkaClickstreamClient-1.0-SNAPSHOT.jar を実行して、ExampleTopic に大量のメッセージを送信する。以下のような内容のデータが送信される。
/tmp/Clickstream.txt
{"ip": "66.249.1.133", "eventtimestamp": 1655183298630, "devicetype": "tablet", "event_type": "home_page", "product_type": "N/A", "userid": 7496, "globalseq": 2, "prevglobalseq": 0}
{"ip": "66.249.1.254", "eventtimestamp": 1655183298633, "devicetype": "mobile", "event_type": "home_page", "product_type": "N/A", "userid": 6625, "globalseq": 3, "prevglobalseq": 0}
{"ip": "66.249.1.49", "eventtimestamp": 1655183298634, "devicetype": "mobile", "event_type": "home_page", "product_type": "N/A", "userid": 4000, "globalseq": 4, "prevglobalseq": 0}

Configure Amazon KDA for Java Application

  • CloudFormationにより、Kinesis Data Analyticsのアプリケーションは作成済。Kafkaクラスターへの接続設定など一部のパラメータを修正する。Producer側同様、このアプリからSchema Registry Serviceへアクセスが発生する。
  • アプリケーションを実行すると、KafkaクラスターのExampleTopicからメッセージを取得し、メッセージを処理した上で、Kafkaクラスターの3つのTopicへのメッセージ再送信、およびOpenSearch Serviceへの書き込みが行われる。
  • このKinesis Data Analytics上で動作するアプリケーションはS3に保存されているものを参照して使用しているが、ソースは公開されていない様子。

Consume From Amazon MSK

  • Producerを実行したEC2インスタンスで、今度はConsumerを実行し、3つのTopicに対してデータの取得を行う。3つのTopicで別々のメッセージが取得できることを確認する。
[ec2-user@ip-10-0-0-237 ~]$ /home/ec2-user/kafka/bin/kafka-console-consumer.sh --bootstrap-server [endpoint] --topic Departments_Agg --from-beginning
{"departmentName":"video games","departmentCount":127,"windowBeginTime":1655183300000,"windowEndTime":1655183310000}
{"departmentName":"AirPods","departmentCount":125,"windowBeginTime":1655183300000,"windowEndTime":1655183310000}
{"departmentName":"AirPods","departmentCount":147,"windowBeginTime":1655183310000,"windowEndTime":1655183320000}


[ec2-user@ip-10-0-0-237 ~]$ /home/ec2-user/kafka/bin/kafka-console-consumer.sh --bootstrap-server [endpoint] --topic ClickEvents_UserId_Agg_Result --from-beginning
{"userSessionCount":256,"userSessionCountWithOrderCheckout":100,"percentSessionswithBuy":39.0,"windowBeginTime":1655183300000,"windowEndTime":1655183310000}
{"userSessionCount":269,"userSessionCountWithOrderCheckout":99,"percentSessionswithBuy":36.0,"windowBeginTime":1655183330000,"windowEndTime":1655183340000}
{"userSessionCount":267,"userSessionCountWithOrderCheckout":121,"percentSessionswithBuy":45.0,"windowBeginTime":1655183360000,"windowEndTime":1655183370000}

[ec2-user@ip-10-0-0-237 ~]$ /home/ec2-user/kafka/bin/kafka-console-consumer.sh --bootstrap-server [endpoint] --topic User_Sessions_Aggregates_With_Order_Checkout --from-beginning
{"userId":2552,"eventCount":39,"orderCheckoutEventCount":39,"deptList":["cameras","soundbars","AirPods","ear phones","cd players","cell phones","video games"],"eventKey":1,"windowBeginTime":1655183299648,"windowEndTime":1655183300767}
{"userId":19806,"eventCount":14,"orderCheckoutEventCount":14,"deptList":["soundbars","AirPods","cd players","video games","cell phones"],"eventKey":1,"windowBeginTime":1655183300177,"windowEndTime":1655183301181}
{"userId":30690,"eventCount":28,"orderCheckoutEventCount":28,"deptList":["cameras","AirPods","laptops","ear phones","cd players","cell phones","video games"],"eventKey":1,"windowBeginTime":1655183300543,"windowEndTime":1655183301549}

Create Kibana Dashboard

  • Kinesis Data Analytics のアプリケーションがOpenSearch Serviceに保存したデータを参照し可視化する。
  • EC2インスタンスでSSHポート転送を行い、ローカルの9092/tcpポートをリモート(KibanaのURL)の443/tcpに転送する。TeraTermの場合の設定は以下。

teraterm設定.png

  • Labに誤記があり、「https://127.0.0.1:9200/_plugin/kibana」とあるが、正しくは「https://127.0.0.1:9092/_plugin/kibana」でkibanaのダッシュボードにアクセスする。
  • グラフ化は手順通り問題なく実施可能。

5. 所感

  • 不明点はいろいろあるが、当初ゴールの「処理の雰囲気がなんとなく分かること」はいったん達成したためよしとしたい。
  • ついでにKinesis Data Analytics も初めて設定して、よい体験になった。
4
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?