##やりたいこと
Event Hubsでデータを受信し、Stream AnalyticsからCosmos DBにデータ投入するアーキテクチャを構築します。
構築後は、PostmanからEvent Hubsにデータ投入して動作確認をします。
ただ投げ込むだけではつまらないので、「リアルタイムの不正検出」というシナリオにしました。
クライアントから簡単なjsonファイルを受け取り、そのデータ内容に従ってStream Analyticsで分岐処理し、それぞれCosmos DBのOKデータコンテナ、NGデータコンテナに振り分けます。
##手順
1.Event Hubsの作成
2.Cosmos DBの作成
3.Stream Analyticsの作成
4.Stream Analyticsのクエリ作成
5.SASトークンの取得
6.Postmanでのテスト
7.ハマったところ
8.感想
##1.Event Hubsの作成
参考:クイック スタート:Azure portal を使用したイベント ハブの作成
####1.イベントハブ名前空間の作成
####2.イベントハブの作成
##2.Cosmos DBの作成
参考:クイック スタート:Azure portal を使用して Azure Cosmos のアカウント、データベース、コンテナー、および項目を作成する
####1.Cosmos DB アカウントの作成
####2.コンテナの作成
- コンテナは「OK用データ格納コンテナ」と「NG用データ格納コンテナ」の2つを作成します。
- コンテナIDは「ok_items」と「ng_items」で、パーティションキーはどちらも「/category」です。
##3.Stream Analyticsの作成
参考:Azure Stream Analytics を使用してイベント ハブからのデータを処理する
##4.Stream Analyticsのクエリ作成
####1.入力の設定
####2.出力の設定
####3.クエリの作成
- 下記SQLをクエリに入力する。
SELECT
*
INTO
[cosmosdb-ok-request]
FROM
[eventhubs-request]
WHERE
[eventhubs-request].category = 'ok'
SELECT
*
INTO
[cosmosdb-ng-request]
FROM
[eventhubs-request]
WHERE
[eventhubs-request].category = 'ng'
##5.SASトークンの取得
参考:
Azure EventhubsにSASトークンを使う方法(Java)
Azure Event Hubs に REST API でログを送りつける話
Generate SAS token
####1.Event Hubs ベーシック用 のSASトークンをJavascriptで作成
- イベントハブ名前空間 -> イベントハブ -> 共有アクセスポリシー
①共有アクセスポリシー名
②主キー
③接続文字列 主キー
①~③を取得し、④はすべてにチェックを付けます
####2.上記の情報をもって、SASトークン取得スクリプトを作成する。
var crypto = require('crypto');
var saName = "①共有アクセスポリシー名";
var saKey = "②主キー";
var uri = "③sb://イベントハブ名前空間.servicebus.windows.net/イベントハブ名";
createSharedAccessToken(uri, saName, saKey);
function createSharedAccessToken(uri, saName, saKey) {
if (!uri || !saName || !saKey) {
throw "Missing required parameter";
}
var encoded = encodeURIComponent(uri);
var now = new Date();
var week = 60*60*24*7;
var ttl = Math.round(now.getTime() / 1000) + week;
var signature = encoded + '\n' + ttl;
var signatureUTF8 = encode_utf8(signature);
var hash = crypto.createHmac('sha256', saKey).update(signatureUTF8).digest('base64');
console.log('SharedAccessSignature sr=' + encoded + '&sig=' +
encodeURIComponent(hash) + '&se=' + ttl + '&skn=' + saName);
}
function encode_utf8(s) {
return unescape(encodeURIComponent(s));
}
####3.作成したスクリプトで、SASトークンを取得する
C:\Users>node create_sas.js
SharedAccessSignature sr=sb%3A%2F%2F ~中略~ _policy
##6.Postmanでのテスト
- 4までで作成したサービスに対して、5で作成したSASトークンを使ってPostmanで動作確認
####1.Postmanの準備
項目 | 値 | |
---|---|---|
① | URL | https://名前空間.servicebus.windows.net/イベントハブ名/messages |
② | Headersタブ | Authorization:5で取得したSASトークン Content-Type:application/json |
③ | Bodyタブ | Jsonデータ |
{
"id": "1",
"category": "ok",
"name": "ok_request",
"description": "test data 1"
}
####2.動作確認
Event Hubs
メッセージの受信が確認できます。
Incomingが1、Outgoingが2になっているのは、おそらくStream AnalyticsのクエリでEvent Hubsからのメッセージを
2回呼んでいる(FROM句が2回入っている)からです。
Stream Analytics
こちらも、イベントの受信が確認できます。
クエリでEvent Hubsからのイベントを2回呼び、Cosmos DBに対して1つのデータを受け渡しています。
Cosmos DB
OKデータ格納コンテナに、Postmanから投入したデータがちゃんと入っています。
「EventProcessedUtcTime」以降の項目は、Stream Analyticsからデータが渡されると自動で付与されるものです。
NGデータを投入すると、ちゃんとNGデータ格納コンテナにデータが入ってきました。
##7.ハマったところ
####1.SASトークンの取得
Event Hubsに対するSASトークンの取得で詰まりました。
今回Event Hubsはベーシックプランで作成したのですが、スタンダードプランとベーシックプランでSASトークンの取得方法が変わるようです。この記事では、ベーシックプラン用のSASトークン取得スクリプトを載せています。
####2.Postmanで401が返ってくる。
Event Hubsの共有アクセスポリシーの設定が足りていませんでした。
共有アクセスポリシーは、管理・送信・リッスンのすべてにチェックを付けてください。
##8.感想
ポータル上ですべて構築完了するので、とても簡単です。
ハマったところは全部、サービス構築外の部分です。導入コストが低い!
また、今回はStream Analyticsのクエリは簡素なものにしましたが、次はもうちょっと複雑な条件でやってみたいです。