C#
StreamAnalytics
EventHubs
CosmosDB

EventHub -> Stream Analytics -> CosmosDB で E2E テストをしようとしてハマったこと

More than 1 year has passed since last update.

前回のStream Analytics で 凝った集計処理を実装するで書いた通り、Stream Analytics でテストデータから、Portal 上で想定通りデータがサマリされるところまで確認した。次に、EventHub -> Stream Analytics -> CosmosDB の構成なので、それがちゃんと動作するかのE2Eのスモークテストを簡単に書いておこうと思った。ところが、これが早速うまくいかなかった。出てきた問題点を次にはまらないように整理しておく。


課題

課題は前回の記事のSQL を作成して、ポータル上でテストも完成したので、まったく同じデータを使って、EventHubs のライブラリからポストすると CosmosDBに全くストアされない。なんでやねん。この問題を解決したいと思った。結構解決に時間がかかったが、様々な要因が絡み合っていたので、それを忘れないようにメモしておきたい。


EventHubs のクライアントライブラリ

EventHubs のクライアントライブラリを使ったコードは、Event Hubs のプログラミング ガイドというページがあるのだが、少なくとも .NetCore を使っている人はこの書き方はバージョンが古い上に、WindowsAzure.ServiceBus をインストールする必要がある。.NETCore 使いの人はこちら。


.Net Standard 用のドキュメントを見る


Azure Functions で使うなら、Extensions を使う

nuget パッケージも Microsoft.Azure.EventHubs のみでよい。かなりすっきりする。私はAzure Functions で使うので、‘Microsoft.Azure.WebJobs.Extensions.EventHubs (v3.0.0-beta7-11351)` のパッケージを使っている。

このバージョンでいくと、古いバージョンだとNameSpaceManager 等が必要だったのがもっとすっきり書ける。

   :

var eventHubsConnectionString = configration["EvebtHubsConnectionString"];
var eventHubsEntityPath = configration["EventHubsEntityPath"];

var connectionStringBuilder = new EventHubsConnectionStringBuilder(eventHubsConnectionString)
{
EntityPath = eventHubsEntityPath
};
eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());

await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));

ぐらいの感じでかけて相当すっきりとかけていい感じ。


Stream Analytics のクエリーから出力された結果の JSON が小文字になる

現在の制限事項らしい。泣く泣く出力先のクラスの属性を小文字にした。(属性をつけてもよかったけど)


CosmosDB にデータが出力されなかった原因


Stream Analytics を開始していない

これは、まず疑うべきところ。実際わすれていた。


timestamp / window を使ったクエリの場合、データはStream Analytics の開始時より後でないといけない

次のページが参考になった。結局クエリを実行したが、0件だからデータが吐かれていなかったということが原因だった。現在のクエリはこれでモロ該当する

WITH SelectPreviousEvent AS

(
SELECT
TeamId,
System.TimeStamp As Time,
Count(*) as Count
FROM
downtime TIMESTAMP BY Date
GROUP BY
TeamId,
TumblingWindow(second, 1)
)

SELECT
TeamId,
System.Timestamp As Time,
Count(*) As Count
INTO
[cosmosdb]
FROM SelectPreviousEvent
Group BY
TeamId,
TumblingWindow(minute, 1)


シンプルなクエリにして、設定が正しいか確認する

これはクエリを SELECT * INTO cosmosdb FROM downtime みたいな単純な全件検索にして気づいた。この場合問題なく全件がCosmosDB に書かれたということは、設定は間違っていない。

特に、Timestamp By を使用する場合は、イベントのタイムスタンプがジョブの開始時刻より後であることが必要です。また、Window 関数を使っているときに、 Window が終わっていなかったらデータが来ません。


データの時刻が現在時刻とはずれすぎているとクエリーが動かない

これはたぶん Timestamp By と思われるが、Stream Analytics 開始後であっても、タイムスタンプがデータが送られた時間と大幅にかけ離れていてもうまくクエリーできないみたいだ。

例えば、クエリー開始時間後の 5分、10分 15分 ぐらいのデータを一気に送信すると、5 分のデータは出力されたが、そのあとのは出力されなかった。このあたりの仕様はよくわからないし、ドキュメントでも見つけられないのでメーリングリストで聞くか、友人に聞いてみたい。(TODO)

つまり ほぼリアルタイムにデータを送信する必要がある


Collection を再作成すると、Output を作成しなおす必要がある

最初は E2E テストを書いているときに、最初にアウトプットをするコレクションをクリアしたかったので、一旦コレクションを消して、作成するというオペレーションをしていた。すると、アクティビティログ で次のようなエラーに会う。

        "Message": "First Occurred: 06/22/2018 08:22:07 | Resource Name: cosmosdb | Message: There was a problem writing to CosmosDB db:[leaderboard], and collection:[DowntimeRecord]. ",

"Type": "DiagnosticMessage",

もしくは

Send Events: CosmosDB Output write Data failure

おそらく、アウトプットの設定時に、コネクションを始めているのかもしれない。だから、Stream Analytics の開始時点のコレクションがなくなったら書き込みができなくなるので、そういったことをしないのがポイント。

Output.JPG


E2E テストのノウハウ


Document の全件削除は工夫が必要

コレクションを削除する代わりに、Document を全件削除するとよい。しかしこれが面倒だ。そいうったメソッドはないし、やろうとしたら、全件クエリーして、1件づつ削除なのだが、PartitionKey があるテーブルだと、それを設定しないといけない。Generic なメソッドを作ろうと思ったが、面倒になって、べたべたのコードを書いた。ここ以外で使うタイミングがないので、抽象化が割に合わない。

        private async Task<List<DowntimeRecord>> GetAllDowntimeRecordAsync(IDocumentService service, string teamId)

{
var client = service.GetClient();
var query = client.CreateDocumentQuery<DowntimeRecord>(
UriFactory.CreateDocumentCollectionUri("leaderboard", "DowntimeRecord"))
.Where(f => f.teamid == teamId)
.AsEnumerable();
return query.ToList<DowntimeRecord>();
}

private async Task DeleteAllDocuments(IDocumentService service, string teamId)
{
var client = service.GetClient();
var records = await GetAllDowntimeRecordAsync(service, teamId);
foreach(var record in records)
{
await client.DeleteDocumentAsync(UriFactory.CreateDocumentUri("leaderboard", typeof(DowntimeRecord).Name , record.id)
, new RequestOptions() { PartitionKey = new PartitionKey(teamId) });
}
}


現在時刻を使ったテストデータ

大変面倒だが、テストデータは、一気に投入できない。時間を必要とするパターンがある場合、該当の時間にデータを投入する必要がある。作戦としては、データパターンを作っておいて、そのデータパターンに現在時を入れて、その相対時間でテストデータを作るとよい。厳密にいうとタイミングによっては Fail して freaky なテストになるかもしれないが、発生するまではこれで。発生したら、もっとロジックで正しさを証明するように直すとよい。(例えば、Stream Analytics に期待するロジックをテストで書いておいて、インプットデータと、予想されるアウトプットを計算するロジックを書く。みなさんどうしているのだろう。もしもっといい方法があれば是非ご教授ください。

        private DowntimeReport[] GetPatternSample(int patternNumber, DateTime date)

{

switch(patternNumber)
{
case 0:
// Normal case failure
return new DowntimeReport[] {
GetFailureDowntimeReport("Team01", "Team01POI", date),
GetFailureDowntimeReport("Team01", "Team01POI", (date + TimeSpan.FromSeconds(1))),
GetFailureDowntimeReport("Team01", "Team01POI", (date + TimeSpan.FromSeconds(2)))
};
case 1:
return new DowntimeReport[]
{
// Multiple service failure at the same time
GetFailureDowntimeReport("Team01", "Team01POI", date),
GetFailureDowntimeReport("Team01", "Team01USER", date),
GetFailureDowntimeReport("Team01", "Team01USER", (date + TimeSpan.FromSeconds(1)))
};
case 2:
return new DowntimeReport[]
{
// Multiple teams failure at the same time
GetFailureDowntimeReport("Team01", "Team01TRIP", date),
GetFailureDowntimeReport("Team02", "Team02POI", date),
GetFailureDowntimeReport("Team01", "Team01TRIP", (date + TimeSpan.FromSeconds(1))),
GetFailureDowntimeReport("Team02", "Team02POI", (date + TimeSpan.FromSeconds(2)))
};
default:
return new DowntimeReport[]
{
// Multiple teams/services failure at the same time.
GetFailureDowntimeReport("Team01", "Team01TRIP", date),
GetFailureDowntimeReport("Team01", "Team01POI", date),
GetFailureDowntimeReport("Team02", "Team02POI", date),
GetFailureDowntimeReport("Team02", "Team02USER", date),
GetFailureDowntimeReport("Team01", "Team01TRIP", (date + TimeSpan.FromSeconds(1))),
GetFailureDowntimeReport("Team02", "Team02POI", (date + TimeSpan.FromSeconds(2)))

};
}
}


デバッグ

デバッグに関しては、クライアントは、VS のデバッグ機能でステップ実行するのが王道。また、Stream Analytics の場合、Metrics を見ると、Input はあるのに、Output がない(ということは、クエリーの結果0件)とかがわかる。

Metric.JPG

他には アクティビティログはcosmosdbの接続問題を診断するのに役立った。、診断ログが設定できるので設定しておいたが、ストレージにイベントが吐かれているのがわかった。


その他

小さな Tips だけど、上記にあるように、DateTime の演算をしたいときに、TimeSpan を渡せば演算できる


まとめ

これで無事E2Eテストが実行されて動作が確認できた。次は Change Feed + CosmosDB Trigger を使って、最終的な集計を実施してそれも、E2Eテストに組み込んでみたい。

あと、これいい記事だった。Pluralsight は2時間半なので、ちゃんと見てみよう。