3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

EventHubs→Stream Analytics→CosmosDBにデータ投入する

Posted at

##やりたいこと
Event Hubsでデータを受信し、Stream AnalyticsからCosmos DBにデータ投入するアーキテクチャを構築します。
構築後は、PostmanからEvent Hubsにデータ投入して動作確認をします。
構成.PNG
ただ投げ込むだけではつまらないので、「リアルタイムの不正検出」というシナリオにしました。
クライアントから簡単なjsonファイルを受け取り、そのデータ内容に従ってStream Analyticsで分岐処理し、それぞれCosmos DBのOKデータコンテナ、NGデータコンテナに振り分けます。
データの流れ.PNG

##手順
1.Event Hubsの作成
2.Cosmos DBの作成
3.Stream Analyticsの作成
4.Stream Analyticsのクエリ作成
5.SASトークンの取得
6.Postmanでのテスト
7.ハマったところ
8.感想

##1.Event Hubsの作成
参考:クイック スタート:Azure portal を使用したイベント ハブの作成
####1.イベントハブ名前空間の作成

  • 名前空間の価格レベルはBasicで作成します。
    名前空間の作成.png

####2.イベントハブの作成

  • 名前空間の中に、1つイベントハブを作成します。
    イベントハブ作成.png

##2.Cosmos DBの作成
参考:クイック スタート:Azure portal を使用して Azure Cosmos のアカウント、データベース、コンテナー、および項目を作成する
####1.Cosmos DB アカウントの作成

  • APIは必ず「コア(SQL)」で作成します。(そのほかのAPIはStream AnalyticsからのINPUTを保証していないため。)
    cosmosdbの作成.png

####2.コンテナの作成

  • コンテナは「OK用データ格納コンテナ」と「NG用データ格納コンテナ」の2つを作成します。
  • コンテナIDは「ok_items」と「ng_items」で、パーティションキーはどちらも「/category」です。
    image.png
    image.png

##3.Stream Analyticsの作成
参考:Azure Stream Analytics を使用してイベント ハブからのデータを処理する

  • ホスティング環境は「クラウド」です。
    StreamAnalytics作成.png

##4.Stream Analyticsのクエリ作成
####1.入力の設定

  • ジョブ トポロジ -> 入力 -> ストリーム入力の追加 -> イベントハブを選択

  • 入力のエイリアス:任意の値を入力。この値がクエリ内で変数として使用できます。
    入力.png

####2.出力の設定

  • ジョブ トポロジ -> 出力 -> 追加 -> Cosmos DBを選択
  • 出力先は、OKコンテナとNGコンテナの2つ。
  • 出力のエイリアス:任意の値を入力。この値がクエリ内で変数として使用できる。
    出力1.png

####3.クエリの作成

  • 下記SQLをクエリに入力する。
クエリ

SELECT
    *
INTO
    [cosmosdb-ok-request]
FROM
    [eventhubs-request]
WHERE
    [eventhubs-request].category = 'ok'
SELECT
    *
INTO
    [cosmosdb-ng-request]
FROM
    [eventhubs-request]
WHERE
    [eventhubs-request].category = 'ng'
  • ポータル上では下記のように見える。
  • クエリ内の変数は、入力・出力エイリアス名となっている。
    クエリ作成.png

##5.SASトークンの取得
参考:
Azure EventhubsにSASトークンを使う方法(Java)
Azure Event Hubs に REST API でログを送りつける話
Generate SAS token

####1.Event Hubs ベーシック用 のSASトークンをJavascriptで作成

  • イベントハブ名前空間 -> イベントハブ -> 共有アクセスポリシー

 ①共有アクセスポリシー名
 ②主キー
 ③接続文字列 主キー
 ①~③を取得し、④はすべてにチェックを付けます
共有アクセスポリシーの取得.png

####2.上記の情報をもって、SASトークン取得スクリプトを作成する。

create_sas.js
var crypto = require('crypto');
var saName = "①共有アクセスポリシー名";
var saKey = "②主キー";
var uri = "③sb://イベントハブ名前空間.servicebus.windows.net/イベントハブ名";

createSharedAccessToken(uri, saName, saKey);

function createSharedAccessToken(uri, saName, saKey) { 
    if (!uri || !saName || !saKey) { 
            throw "Missing required parameter"; 
        } 
    var encoded = encodeURIComponent(uri); 
    var now = new Date(); 
    var week = 60*60*24*7;
    var ttl = Math.round(now.getTime() / 1000) + week;
    var signature = encoded + '\n' + ttl; 
    var signatureUTF8 = encode_utf8(signature); 
    var hash = crypto.createHmac('sha256', saKey).update(signatureUTF8).digest('base64'); 
    console.log('SharedAccessSignature sr=' + encoded + '&sig=' +  
    encodeURIComponent(hash) + '&se=' + ttl + '&skn=' + saName);
}

function encode_utf8(s) {
    return unescape(encodeURIComponent(s));
}

####3.作成したスクリプトで、SASトークンを取得する

C:\Users>node create_sas.js
SharedAccessSignature sr=sb%3A%2F%2F ~中略~ _policy

##6.Postmanでのテスト

  • までで作成したサービスに対して、で作成したSASトークンを使ってPostmanで動作確認

####1.Postmanの準備

項目
URL https://名前空間.servicebus.windows.net/イベントハブ名/messages
Headersタブ Authorization:で取得したSASトークン 
Content-Type:application/json
Bodyタブ Jsonデータ
postman1.png
postman2.png
Bodyデータ
{
    "id": "1",
    "category": "ok",
    "name": "ok_request",
    "description": "test data 1"
}

####2.動作確認

  • Stream Analyticsを開始してから、Postman実行します。
    開始.png

  • 201が返ってきたら成功!
    成功.png

  • ポータルから、各サービスを見ていきます。

 Event Hubs
 メッセージの受信が確認できます。
 Incomingが1、Outgoingが2になっているのは、おそらくStream AnalyticsのクエリでEvent Hubsからのメッセージを
 2回呼んでいる(FROM句が2回入っている)からです。
EventHubs_OK.png

 Stream Analytics
 こちらも、イベントの受信が確認できます。
 クエリでEvent Hubsからのイベントを2回呼び、Cosmos DBに対して1つのデータを受け渡しています。
StreamAnalytics_OK.png

 Cosmos DB
 OKデータ格納コンテナに、Postmanから投入したデータがちゃんと入っています。
 「EventProcessedUtcTime」以降の項目は、Stream Analyticsからデータが渡されると自動で付与されるものです。
CosmoDB_OK.png

 NGデータを投入すると、ちゃんとNGデータ格納コンテナにデータが入ってきました。
CosmoDB_NG.png

##7.ハマったところ
####1.SASトークンの取得
Event Hubsに対するSASトークンの取得で詰まりました。
今回Event Hubsはベーシックプランで作成したのですが、スタンダードプランとベーシックプランでSASトークンの取得方法が変わるようです。この記事では、ベーシックプラン用のSASトークン取得スクリプトを載せています。
####2.Postmanで401が返ってくる。
Event Hubsの共有アクセスポリシーの設定が足りていませんでした。
共有アクセスポリシーは、管理・送信・リッスンのすべてにチェックを付けてください。

##8.感想
ポータル上ですべて構築完了するので、とても簡単です。
ハマったところは全部、サービス構築外の部分です。導入コストが低い!
また、今回はStream Analyticsのクエリは簡素なものにしましたが、次はもうちょっと複雑な条件でやってみたいです。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?