概要
今回は、第一回で構築したConfluentのクラスタを利用しConfluent Control CenterにログインしてksqlDBの機能を確認してみたいと思います。
Confluent Control Centerにログイン
1.以下のURLでWebブラウザからコンフルエントコントロールセンターのGUIにアクセスします。
http://localhost:9021
ここでは、ksqlDBは認証されKafkaクラスターに接続してksqlDBのコマンドファイルで定義されているクエリを既に実行していることが確認できます。
2.左メニューから「ksqlDB」をクリックします。
3.ksqlDB Applicationの箇所から「wikipedia」を選択します。
4.2の手順でwikipediaをクリックして、「Flow」を表示することでこのデモで作成されたストリームとテーブルおよびそれらの関連性を確認することができます。GUIだとわかりやすいですね。
5.次にksqlDBCLIを実行してksqlDBCLIプロンプトを表示します。
docker-compose exec ksqldb-cli bash -c 'ksql -u ksqlDBUser -p ksqlDBUser http://ksqldb-server:8088'
6.既存のksqlDBのストリーム(Streams)を表示します。
7.ここで「WIKIPEDIA」をクリックして既存のksqlDBストリームのスキーマの詳細を表示します。
8.既存のksqlDBテーブルを表示します。このテーブルはタンブリンウィンドウ内の発生をカウントします。
9.継続して実行されている既存のksqlDBのクエリ(Persistent queries)を見てみましょう。
10.次にさまざまなksqlDBストリームおよびテーブルからのメッセージを表示します。選択したストリームをクリックして、「Query stream」をクリックしてクエリエディタを開きます。エディタには「select * from WIKIPEDIA EMIT CHANGES;」
といったクエリがあらかじめ入力されており、このクエリの指示に従って新しく届いたデータの結果がここに表示されます。
11.ksqlDB Editorをクリックして「SHOW PROPERTIES;」と入力し実行します。この実行結果からksqlDBサーバのプロパティが確認できます。この内容をdocker-compose.ymlで照合も可能です。
12.ksqlDBプロセスログは、処理中のレコードのエラーをキャプチャできるので開発者はksqlDBクエリのデバッグに役立ちます。この例のプロセスログはlog4jproperties fileで構成されているmTLSの認証を使用してkafkaトピックにエントリを書き込みます。動作確認のために以下の不完全なクエリを20秒間ksqlDBエディタで実行します。
SELECT 1/0 FROM wikipedia EMIT CHANGES;
このクエリからレコードは返ってこないはずです。ksqlDBはレコードごとにプロセスログにエラーを書き込みます。ここでは、対応するksqlDBストリーム"KSQL_PROCESSING_LOG"をksqlDBエディタで表示をしてみます。(この時auto.offset.reset=Earliestを設定します。)
SELECT * FROM KSQL_PROCESSING_LOG EMIT CHANGES;
以下のような表示がされプロセスログにエラーが出力されていることが確認できました。
上記を試してみてksqlDBはSQLライクな構文でkafkaのデータにアクセスできるのとGUIから利用できるので運用担当者にとっても扱いやすと思いました。
次回は、コンシューマーについてみていきたいと思っています。
今回も最後までお読み頂きありがとうございました!