GYAOのtsです。
我々のチームは、オールパブリッククラウドで、Microservice Architectureを採用した次期バックエンドを設計中です。
経緯
前回の投稿でjsonをBlobコンテナに用意したので、今回はそれを取り込んで、1ドキュメントずつServicebusにpublishする。また、別のfunctionでそれをsubscribeし、DocumentDBにstoreする。
json:testLines.json
{ "original_id" : "1", "b":false , "c":0 , "d":1 , "e":2 , "f":0.345 , "g":"あ" , "h":"い" }
{ "original_id" : "2", "b":false , "c":0 , "d":1 , "e":2 , "f":0.345 , "g":"あ" , "h":"い" }
{ "original_id" : "3", "b":false , "c":0 , "d":1 , "e":2 , "f":0.345 , "g":"あ" , "h":"い" }
servicebusの準備
![スクリーンショット 2016-11-24 17.15.06.png](https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.amazonaws.com%2F0%2F114329%2Fa8f826e9-8aed-5ce8-a6b0-10289d3b5b3d.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&s=183f72faaf9b461345f450c227b22707)
AzureFunctions(publish)
今回は言語はC#でいく。理由は一番できることが多いから。その理由で行くと、
選択肢はC#, js, F#かな。
統合
![スクリーンショット 2016-11-24 17.44.10.png](https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.amazonaws.com%2F0%2F114329%2F99b0888c-c836-61fc-dd90-41353a71ec4d.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&s=19ba08daa2b4de2c90ac2987c9f8d2bd)
NuGet
ファイルを1行1行読むのはいいが、それを1件ずつパースする。
楽チンパースするために今回はJson.NETを使用する。NuGetでinstallする。
![スクリーンショット 2016-11-24 17.54.36.png](https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.amazonaws.com%2F0%2F114329%2Fcb0e3585-e7b7-c6a8-4876-c7eaa9f0e288.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&s=6e7168b986bb776dd1a0483da2ed20ba)
![スクリーンショット 2016-11-24 17.58.38.png](https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.amazonaws.com%2F0%2F114329%2F35ec8708-015e-c011-93bb-3928e837f480.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&s=c0eea995272c01557e06ae1b6a6f53de)
project.json
{
"frameworks": {
"net46":{
"dependencies": {
"Newtonsoft.Json": "9.0.1"
}
}
}
}
project.lock.jsonが自動で作成される。
これでinstall完了。
開発
using System.IO;
using System.Text;
using Newtonsoft.Json;
public static void Run(string myBlob, string name, Stream inputBlob, ICollector<string> outputSbMsg, TraceWriter log)
{
log.Info($"C# Blob trigger function Processed blob\n Name:{name} \n Size: {myBlob.Length} Bytes");
StreamReader reader = new StreamReader(inputBlob, Encoding.UTF8);
string line;
while ((line = reader.ReadLine()) != null)
{
Item item = JsonConvert.DeserializeObject<Item>(line);
string id = item.Id;
if (String.IsNullOrEmpty(id)) {
outputSbMsg.Add(line);
}
else {
log.Info("can't accept id.");
}
}
reader.Close();
}
public class Item
{
[JsonProperty("id")]
public string Id { get; set; }
}
AzureFunctions(subscribe)
こっちはかなりシンプル
統合
![スクリーンショット 2016-11-24 18.23.35.png](https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.amazonaws.com%2F0%2F114329%2Fadb467ba-8973-bf3e-9bdd-38823acd9ab8.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&s=2b69ea128743272255affb4a249454ec)
開発
using System;
using System.Threading.Tasks;
public static void Run(string mySbMsg, out object outputDocument, TraceWriter log)
{
// log.Info($"C# ServiceBus topic trigger function processed message: {mySbMsg}");
outputDocument = mySbMsg;
}
次回
以上でBlobのコンテンツトリガでDocumentDBへの保存までが完了。
Servicebusのtopicを通してあるので、subscriberを新たに増やすことで同じデータがブロードキャストされる利点はある。
次回はMachineLearningと連携してみる。