2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

confluentのksqlで遊ぼう!

Last updated at Posted at 2018-05-19

はじめに

Confluentも4.x系にUpdateが入り、Kafkaも遂に1.x!
ということで社内にあるアクセスログ周りのデータフローを構成するKafka環境を一新しました。

構造は以前と大きく変わらないので割愛しますが、リリース当初よりは大分良くなってきているのでksqlを紹介しようと思います。

KSQLってなに?

Kafkaについては上記の記事にざっくりかいてあるので割愛。
公式はコチラ

Streaming SQL for Apache Kafka

リアルタイムに流れてくるStreamingデータに対してKafkaを用いてSQLによってQueringするためのアプリケーションです。

構成要素

KSQLを生成するためには、2つの構成要素を理解する必要があります。

原文はこちら。

Stream

DataSourceからStreamデータを生成します。
具体的には

  • KafkaのTopic
  • KSQLで生成したStream
  • KSQLで生成したTable

原文の例を挙げれば、

  • 「アリスが100ドルをボブに送った」
  • 「チャーリーが50ドルをボブに送った」
    このようなStreamデータが続々と届いて増えていきます。

Table

いわゆるViewの機能を提供します。
Streamデータを選択・加工・集計して、RDBなどでいうテーブルを構築します。

こちらもDataSourceとなるのは、Streamと一緒です。

  • KafkaのTopic
  • KSQLで生成したStream
  • KSQLで生成したTable

例は以下のような感じ。

  • ボブの口座残高は150ドル
  • アリスの口座残高は150ドル
  • チャーリーの講座残高は50ドル

ただしレコードが続々と増えていくStreamと違って、Streamデータを元にしてテーブルを構築していくものになります。

で、どう違うん?

ある程度語弊が生まれることを呑み込んでいえば、Streamが続々と届く処理データ、Tableはそれを元に集計したデータになります。

例のテーブル

  • ボブの口座残高は150ドル
  • アリスの口座残高は150ドル
  • チャーリーの講座残高は50ドル
    に対して、

Streamによって

  • 「アリスが100ドルをボブに送った」
  • 「チャーリーが50ドルをボブに送った」

このデータがTableに届くと

  • ボブの口座残高は150ドル → 300ドル
  • アリスの口座残高は150ドル → 50ドル
  • チャーリーの講座残高は50ドル → 0ドル
    のようにTableは変化していきます。

蛇足

勿論、Keyとするものの選び方によってはStreamデータがひたすら溜まっていくTableなんてものも作り得ます。

上記の例で、2つの役割を組み合わせて言えば、
・StreamはQueryによる更新のためのデータ
・TableはQueryを受けて更新され、また参照されるためのView
であります。

勿論StreamはStreamingデータのちょっとした加工にも使えますし、どのように利用するかはアイディア次第です。
今まで上げてきたものは一つの例ですので、どう使えば便利かそれぞれで考えてみて下さい。

あそぼう!

簡単にやりたいかたはQuickStartをそのまま実行されることをオススメします。
今回は弊社で溜め込んでいるLTSV化したアクセスログデータをちょっと弄ってみたのでご紹介。

Topicについて

アクセスログをLTVS形式でNginxに吐き出させたものを、fluentdでKafkaへ送っています。そのあと、ちょっと加工して年月日時分秒をカラムに分離したり、不要なデータを加工して外すKafkaStreamsを噛ませたあとのデータを利用します。

Streamを作ろう

分かり難いと思うのでさくっとStream作っちゃいましょ。
以下、ksqlへ投入するクエリです。

ksql> CREATE STREAM parsed_json_stream
(
 VirtualHost STRING,
 Server STRING,
 ClientIP STRING,
 Status INTEGER,
 ResponseTime INTEGER,
 Year INTEGER,
 Month INTEGER,
 Day INTEGER,
 Hour INTEGER,
 Min INTEGER
) WITH (
 KAFKA_TOPIC='parsed_json',
 VALUE_FORMAT='JSON'
);

アクセスログに見慣れている方ならざっと見ただけで理解可能かと思います。
元のTopicの方にはコレ以外のアクセスログデータがもっと色々載っていますが、集計させたいデータ群と年月日時分までを対象項目としてデータ量を制限してます。

Success! 的なOutputが得られたらチェックしてみます。

ksql> show streams;

 Stream Name                   | Kafka Topic | Format
--------------------------------------------------
 PARSED_JSON_STREAM | parsed_json | JSON
--------------------------------------------------

この時点でKafka上にTopicが出来たりはしません。
StreamはあくまでTopicに流れてくるデータをConsumeしているに過ぎません。

Tableを作ろう

先程作ったStreamを元にして、Tableを作成します。

CREATE TABLE RESP_STATUS_DAILY
WITH (VALUE_FORMAT='JSON')
AS SELECT 
 VirtualHost, Server, Status, Year, Month, Day, COUNT(*) AS Count
FROM PARSED_JSON_STREAM
GROUP BY VirtualHost, Server, Status, Year, Month, Day
HAVING COUNT(*) > 1;

クエリを見ていただけばわかるとおり、サーバ名・vhost・ステータスコードをKeyにして日次集計している形ですね。

こちらはClientIPを集計するテーブル。
ちょくちょくDDoSみたいな攻撃があるので、アクセス元をリアルタイムに集計しておくためのテーブルですね。

CREATE TABLE CLIENT_COUNT
WITH (VALUE_FORMAT='JSON')
AS SELECT 
 VirtualHost, Server, ClientIP, Year, Month, Day, Hour, Min,  COUNT(*) AS Count
FROM PARSED_JSON_STREAM
GROUP BY VirtualHost, Server, Status, Year, Month, Day, Hour, Min
HAVING COUNT(*) > 1;    

さて、この時点でKafkaの中をご確認いただくと、Topicが幾つか生成されていると思います。テーブル名と同じクエリ、及びなんか妙に長いやつ。
前者はともかく、後者は名前をよく見ると分かるようにStreamからTableを作る際のAggregationを行うためのTopicです。
中身まではまだ確認していませんが、データ量は元のTopicとほぼ同じ量が流れているので事前加工の段階かなと思います。
ぼーっとしてTable量産すると一瞬でDiskを食いつぶしかねないのでDiskScaleには要注意ですね。

もうちょっとしっかり設計してTable作らないといけないかなーという印象です。これくらいなら無理にksqlでやるよりも別のアプリケーションからConsumeしてRDBにでも叩き込んだほうが現実的かもしれません。

テーブルにクエリを投げよう

出来上がったテーブルにクエリを投げてみましょう。

ksql> select VirtualHost, Server, Status, Year, Month, Day, Count from DAILY_RESP_STATUS WHERE Status >= 400 LIMIT 10;
xxx-xxx.domain.net | server03 | 403 | 2018 | 5 | 18 | 4
xxx-xxx.domain.net | server01 | 403 | 2018 | 5 | 18 | 8
xxx-xxx.domain.net | server01 | 401 | 2018 | 5 | 18 | 3
xxx-xxx.domain.net | server02 | 403 | 2018 | 5 | 18 | 6

結果自体はまぁ正直特筆することもありませんね。

なお、テーブルに対してのクエリにAggregationをかけることは出来ません。(2018/05/19時点)

https://github.com/confluentinc/ksql/issues/1228
 Currently KSQL does not support aggregations on tables (this functionality is coming very soon, though only for the un-windowed case).

おわりに

リアルタイムデータ処理の一貫として、ksqlの説明とやってみたこと(ちょっと遊んだだけですが)をご紹介しました。
KafkaStreamsをもっと手軽につかおう!という感じのアプリケーションですね。リリースして間もないのでまだまだ制約も多い印象ですが上手く使えばScala/Javaを書かないと使えなかったKafkaStreams周りの工数削減にはとても役に立つのではないでしょうか。

ここでは触れてませんがREST APIもあるので、上手くTABLE作れたらアプリケーションからがりがり呼んであげたいところです。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?