1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Stream Analytics で 凝った集計処理を実装する

Posted at

Stream Analytics を初めてちゃんと触ったので、未来の自分へのメモとして書いておく。

実現したいこと

今回のゴールとしては、各チーム毎にエンドポイントが3つづつあり、エラーが発生したら毎秒ダウンタイムレポートをする。そして、チーム毎にダウンタイムを計算するというお題だ。

この課題のややこしいところは次のポイント

  • エンドポイントは、それぞれ別のタイミングでダウンタイムレポートを送ってくる。
  • 1つのチームにつき、3つのエンドポイントのいづれかが死亡したら、ダウンタイムとする
  • ダウンタイムは、1秒ごとに送られるダウンタイムレポートを集計して、チーム毎にトータルのダウンタイムを出す。

最初の解法

この課題を解くために、私は次の手をつかった。

  • チーム毎に各エンドポイントのステータスを持たせる
  • ステータスが変わったら、ステータスの変更を記録する
  • ダウンタイムが始まって終わったら、ダウンタイムをデータベースに記録する

これをがっつりテストクラスを書いて実装したので、パフォーマンス的にも問題ない。

Functions.JPG

ただ、うちのチームのメンバーはこの実装があまり好きじゃないらしく、アーキテクチャの変更を求めてきた。

この方法だと、一番の懸念である、「ダウンタイムレポート集計」を高いパフォーマンスでできる。チームメンバーは、どうやら、レポート自体とダウンタイム計算を分離したい様子。Stream Analytics を使えという話だったので、次のような構成にしてみた。

SA.JPG

本当は先頭を Azure Functions ではなく、EventHub にしたほうが構成としてはきれいなのだが、プロジェクトの都合上、k8s との接点を EventHub に変えると結構変更が入ることになるので、Endpoint を保ったまま Functions から EventHubs に投げるようにした。

Stream Analytics がサポートしている Input ソース

現在 Stream Analytics は、EventHub, IoTHub, Blob Storage をサポートしている。Blob Storage にしたら掃除が面倒だし、今回はお金は気にしなくていいプロジェクトだったので、EventHub 経由にすることにした。

Stream Analytics での解法

先ほどの問題を Stream Analytics でどう解くのか?というのは最初イメージできなかった。結局 Window で集計をしたところで、データ量が多いのは変わりないので、それを Sum するときにすごくパフォーマンスが悪くなるのでは?と考えていたがよい解法がある様子だ。

chart01.JPG

そもそも、Downtime をロジックなしでどうやってバラバラに来ているエンドポイントリクエストを同じ時間のものであるとみなすのかわからないし、毎秒送られてくるので、3日間でも相当な量がデータベースに格納されてしまう。これは面倒だ。

サンプルデータ

このようなサンプルの入力データを作ってみた。いくつかの重なりなどのパターンを網羅するように作っている。

[{"TeamId":"Team01","ServiceId":"Team01POI","Date":"2018-01-10T10:10:10","StatusCode":404,"Status":false},{"TeamId":"Team01","ServiceId":"Team01POI","Date":"2018-01-10T10:10:11","StatusCode":404,"Status":false},{"TeamId":"Team01","ServiceId":"Team01POI","Date":"2018-01-10T10:10:12","StatusCode":404,"Status":false},{"TeamId":"Team01","ServiceId":"Team01POI","Date":"2018-01-10T10:15:10","StatusCode":404,"Status":false},{"TeamId":"Team01","ServiceId":"Team01USER","Date":"2018-01-10T10:15:10","StatusCode":404,"Status":false},{"TeamId":"Team01","ServiceId":"Team01USER","Date":"2018-01-10T10:15:11","StatusCode":404,"Status":false},{"TeamId":"Team01","ServiceId":"Team01TRIP","Date":"2018-01-10T10:20:10","StatusCode":404,"Status":false},{"TeamId":"Team02","ServiceId":"Team02POI","Date":"2018-01-10T10:20:10","StatusCode":404,"Status":false},{"TeamId":"Team01","ServiceId":"Team01TRIP","Date":"2018-01-10T10:20:11","StatusCode":404,"Status":false},{"TeamId":"Team02","ServiceId":"Team02POI","Date":"2018-01-10T10:20:12","StatusCode":404,"Status":false},{"TeamId":"Team01","ServiceId":"Team01TRIP","Date":"2018-01-10T10:25:10","StatusCode":404,"Status":false},{"TeamId":"Team01","ServiceId":"Team01POI","Date":"2018-01-10T10:25:10","StatusCode":404,"Status":false},{"TeamId":"Team02","ServiceId":"Team02POI","Date":"2018-01-10T10:25:10","StatusCode":404,"Status":false},{"TeamId":"Team02","ServiceId":"Team02USER","Date":"2018-01-10T10:25:10","StatusCode":404,"Status":false},{"TeamId":"Team01","ServiceId":"Team01TRIP","Date":"2018-01-10T10:25:11","StatusCode":404,"Status":false},{"TeamId":"Team02","ServiceId":"Team02POI","Date":"2018-01-10T10:25:12","StatusCode":404,"Status":false}]

ダウンタイムを判定する

ダウンタイムの判定は簡単だった。tumbling windowという関数を使うと時系列にデータを扱って、一定期間の Windowの中身を集計できる。つまり、例えば1秒間隔の間で、チーム毎に3つのエンドポイントの一つが落ちていたら、Downtime とする関数を書けばよい。

SELECT
    TeamId,
    System.TimeStamp As Time,
    Count(*) as Count
INTO
    [YourOutputAlias]
FROM
    downtime TIMESTAMP BY Date
GROUP BY
  TeamId,
  TumblingWindow(second, 1)

ポイントは、Date で、Tumbling Window を1秒間に設定して、その間にある TeamId をグループ化して、カウント関数を使っている。ここで、カウント関数を使わないと、Group by があるのに、集計関数を使っていないと怒られてしまう。Stream Analytics で先ほどの、インプットデータをアップロードしてテストを実行する。予想通り、同じ Tumbling window かつ、TeamId のものは、1件のデータになっている(Count は複数になるが)つまり、これが、各秒間のTeamId 毎のダウンタイムレポートだ。

query01.JPG

大量のデータの Sum の負荷を軽減する

さて、このままだと、このまま CosmosDB の方で Sum 関数を実行するとかなりコストの高い処理になりそうだ。そこで、上記の結果をさらに Tumbling window にかけてみる。例えば1分間の Tumbling window にかけてみて、そのカウントをとる。データ件数は、毎秒だったのが、毎分のオーダーになるので、1/60 のデータ量になる。そこで、Count した項目を合計すれば、1カウントが、1秒のダウンタイムなので、その合計がダウンタイムの合計になるはず。次のような複合クエリを書いてみよう。

WITH SelectPreviousEvent AS
(
SELECT
    TeamId,
    System.TimeStamp As Time,
    Count(*) as Count
FROM
    downtime TIMESTAMP BY Date
GROUP BY
  TeamId,
  TumblingWindow(second, 1)
)

SELECT 
  TeamId,
  System.Timestamp As Time,
  Count(*) As Count
INTO
    [YourOutputAlias]
FROM SelectPreviousEvent
Group BY
  TeamId,
  TumblingWindow(minute, 1)

ポイントは服問い合わせをつかって、最初のクエリの出力を次のクエリのFROM に渡している。実行したら想定通り、毎分毎のダウンタイムが記録されている。うむ。いい感じ。

query02.JPG

あとは、これをCosmosDB につないで、出力を記録すれば、普通に Sum 関数を使ってサマリしてもそこまでひどい目にあわないはず。

Change Feed を使って集計する

Stream Analytics を使ってWindow 関数をうまく使って、問題を解決してみた。Sum 関数を実行するトリガーとしては、CosmosDB の change feed で実現する予定。

changefeed.JPG

Change feed が発火するのは、1分間にチーム数だけ。つまり30チームだったら、1分間に30回トリガーされる程度なのて大したことないだろう。よしこれでいこう。

Stream Analytics のワークフロー

Stream Analytics のワークフローは、最初テストデータがないと検証できないので、E2Eテストから書き始めたが、結局テストデータパターンを生成するプログラムを書いて、そのあと、Stream Analytics のページでテストをした。そして、あとでスモークテストとして、E2E を書いておくといったことをしたらよさげ。

まとめ

Stream Analytics を使って、ステートを持たない方法で問題解決をすることができた。Window の感覚をマスターするといろんな問題が楽に解けるようになりそう。この前やった Reactive Extension をもう少し勉強するとこういうプログラミングに強くなれるかな。

リソース

下記のページが特に有用だった。

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?