Edited at

Azure Queue Storage を活用して、IoTな分散処理(的)システムを実現する

この記事はMicrosoft Azure Advent Calendar 2018 12/20 の記事です。

https://qiita.com/advent-calendar/2018/azure


この記事について

Azure Queue Storage を活用して、IoTな分散処理(的)システムを実現できるようなネタをご紹介します。とあるIoTなサービスの裏側で採用している仕組みで、Microsoft Tech Summit 2018 でお話しした内容から切り出して記事にしています。


システムの概要

このシステムでは、振動センサーのデータを収集しています。振動センサーのデータは、IoTHub(とBlob)に送信されます。そのデータを加工して特徴量を抽出したり、予測を行ったり、可視化のために永続化を行ったりしています。

IoTHubからはWebJobを使って、EventProcessorHostの実装を利用してQueue Storage にデータを配っています。Fanoutパターン的に、複数の「機能単位」があって、これらに同じメッセージを配っています。この「機能単位」が、データの加工であったり、分析や予測であったり、可視化のための永続化という機能ごとに存在しています。「機能単位」はQueue Storage、Job、他のStorage(TableやBlob)というサービスで構成されています。ちなみに、仮想マシン(IaaS)は利用していません。「VM使ったら負け」を合言葉にしてます。


なぜ、Azure Queue Storage を利用するのか


  • 利用するサービスを絞り込みたい

  • 表示タイムアウトを使ってリトライの仕組みをお気軽に実現できる。(Queue Storageでもここまでできちゃいますよ!!)


利用するサービスの絞り込み

たまたま、Azure Storageを使うシステムであったため、Storageサービスに含まれるQueueを使うことで、利用するサービスを絞り込んでいます(IoTHubを使うためには、Azure Storage必須ですね。また、Storageを使わないで実装というのはあまりないのかもしれませんが)。使うサービスを絞り込むと、シンプルになってうれしいですよね。価格面で有利であったり、障害に強くなる可能性もあります。Azure Storageはいろいろなサービスの基礎になっているとも聞きますので、ここが障害となれば他にも影響がでます。ということで一番プリミティブと思われるQueue Storageを使うと判断したということになります。

また、使うサービスが増えると、気にしないといけないことが増えてうれしくないと思ってます。価格の変更、サービスの変更や終了、デプロイ対象の種類が増える、Client SDKのアップデートなど。例えば、Queue Storage は StorageアカウントさえAzure Portalで準備してしまえば動作する、といったお手軽さもあります。Portalでいろいろ準備しなくてよいのは、デプロイの自動化をするときにもシンプルで助かります。


表示タイムアウトを使ったリトライの仕組み

単純なQueueでは、一度取り出すとQueue側にメッセージがなくなり、ただ1つのプロセスなりスレッドがそのメッセージを保持することになります。この場合にプロセスやスレッドが異常停止するとメッセージが失われます。

Queue Storage では「表示タイムアウト」をデキュー時に設定し、Queueに存在するが、非表示状態 にすることができます。この仕組みによって、メッセージをデキューしたプロセスやスレッドが異常終了しても、他のプロセスやスレッドによって一定時間後にリトライされることになります。Amazon SQSでは「可視化タイムアウト」と呼ばれるようです。

無限にリトライを繰り返さないように、表示タイムアウトの設定と同時にデキューカウント(図ではdeq= に続く数値として表現)が一定以上のメッセージを破棄(明示的に削除)する必要もあります

リトライ動作から説明してしまったのですが、通常の動作では正常にメッセージを処理したのちに、明示的な削除の処理も必要です。


サンプルコード

上記の動きに対応した xunit.net のテストコードです。ご参考まで。


public class QueueStorageLearningTests
{
private CloudQueue queueRef;
private int MAX_DEQUEUE_COUNT = 5;

private void Setup()
{
// Queueを作成する。ストレージアカウントのみAzure Portalで準備すればよい
// ストレージアカウントの接続文字列を以下に設定する
var CONNECTION_STRING = "DefaultEndpointsProtocol=https;******";
var storageAccount = CloudStorageAccount.Parse(CONNECTION_STRING);

var queueClient = storageAccount.CreateCloudQueueClient();
var queueName = "test";
queueRef = queueClient.GetQueueReference(queueName);
queueRef.CreateIfNotExists();
}

[Fact]
public void TestEnqueue()
{
Setup();

// 何らかのクラスをシリアライズするなどしてもよいが、XMLフリーなフォーマットにする
var message = new CloudQueueMessage("test message");
queueRef.AddMessage(message);

// Queue内部での生存期間を指定したり
//queueRef.AddMessage(message, timeToLive: TimeSpan.FromMinutes(5));

// Enqueueされて5分後にConsumerから見える状態にする、ということもできる
//queueRef.AddMessage(message, initialVisibilityDelay: TimeSpan.FromMinutes(5));

}

[Fact]
public void TestDequeue()
{
Setup();

// メッセージを取得して、非表示タイムアウトを120秒としている
// 実用上は queueRef.GetMessages としてループで処理する。サンプルゆえ1件取得
// PeekMessagesというメソッドもあるが、メッセージが非表示にされないためこの用途では使わない
var queueMessage = queueRef.GetMessage(TimeSpan.FromSeconds(120));

// デキューカウントを超過したもの(リトライしてもNGなメッセージ)は削除する
if (queueMessage.DequeueCount >= MAX_DEQUEUE_COUNT)
{
queueRef.FetchAttributes(); //Fetchしないとエラー発生するので必須
queueRef.DeleteMessage(queueMessage);
return; //1件処理としているので早期returnしています
}

try
{
//メッセージ内部にある情報で何か処理する
// 何らかのクラスをシリアライズするなど
ProcessHoge(queueMessage);

// 明示的な削除(正常終了時)
queueRef.FetchAttributes(); //Fetchしないとエラー発生するので必須
queueRef.DeleteMessage(queueMessage);

// 後から消す等では、メッセージのインスタンスそのものでなく、ID と PoPReceipt を保持しておいて消すこともできる
//var messageId = queueMessage.Id;
//var popReceipt = queueMessage.PopReceipt;
//queueRef.DeleteMessage(messageId, popReceipt);
}
catch
{
// 何等か例外処理
// キューからの削除はあえて行わない→非表示タイムアウト後にリトライされることを期待する
}
}

private void ProcessHoge(CloudQueueMessage queueMessage)
{
// 実際の処理
}
}


IoTHub(とBlob)に、振動センサーのデータを送っている理由

気づかれた方もいるかもしれませんが、「IoTHub(とBlob)に」データを送信しています。実は振動センサーのデータは実際にはIoTHubに送信しておらず、Blob Storageに送信しています。Queue Storage のメッセージサイズ制限を回避することと、Queue Storageのパフォーマンスを確保するためにメッセージサイズを1KB程度の小さなものにする必要があるためです。

振動データのサイズは1秒間の振動を計測して80KBほどになります。バイナリデータで80KBほどになるので、XMLやJSONなどにシリアライズすると、もっと大きなサイズになってしまいます。

このデータをそのままQueue Storageに投入することはできません。Queue Storage は64KBのサイズ制限があるためです。この場合はメッセージの分割をすることで対応はできますが、すべてのメッセージが揃うまで待ち合わせたり、どこかに保持する、、、といううれしくない状況を引き起こします。

また、サイズ制限いっぱいの60KBで、ラッシュテストを過去に行ったことがありますが、処理遅延が数秒レベルとなってしまいました。ドキュメントにも書かれていますが、小さいサイズ、例えば1KB程度に抑える必要があります。

これらから、振動データの計測条件といったメタデータのみIoTHubに送信し、振動データはBlob Storage に送信するという形になっています。この形にすることで、IoTHubのメッセージサイズによる課金も抑えられてうれしいことが重なります。

IoTHubに送信するメッセージの容量制限を超えてデータ容量を大きくできる、というのも大きなメリットになります。

なお、IoTHubにもBlob送信に関する機能がありますが、我々は使っていないです。


注意点

この仕組みを利用するには、いくつか注意点があります。


Queue Storage はメッセージを最大7日間までしか保持しません

長期間の保持が必要な場合には、Service Bus Queue を検討します。


非表示タイムアウトの設定は、処理に応じて適切に設定する


  • メッセージの処理時間に対して、非表示タイムアウトは十分に長い時間とする必要があります

  • デキューしたメッセージに対する処理(特徴量の抽出など)にかかる所要時間が、1分かかるものに対して非常時タイムアウトを10秒としてしまうと、別のConsumerが同じメッセージを処理する可能性が出てきます


処理順の保証が必要な「部分」には利用できません


  • どの範囲で保証するべきか、というニュアンスで「部分」と言ってます

  • たとえば、ここにあるService Bus Queue(SBS)との比較表 では、順序性についてSBSは「FIFOできる」と書かれており、Storage Queueには「いいえ」(順序性は担保されない)と書かれています


    • ただ、Queueから取り出す部分だけで順序保証しても、問題発生時のリトライを考えるとEnd-EndでFIFOを実現するのはそもそも困難です。それを実現しようとすれば処理のスループットも上げにくいです。・・・そういう観点で、ある部分のFIFOをあきらめれば、いろいろなシステムに適用できると思われます

    • 何らかのタイムスタンプ(IoT分野ならセンシングのタイムスタンプ)など、他の方法で順序性を担保したほうがイイと思ってます




べき等にできない処理にも適用できません


  • べき等な処理にしか適用できません(複数回の実行を行っても、データの矛盾やシステムに問題を引き起こさないような処理)


    • 例えばRDBへの永続化の場合には、一意制約違反への対処をして複数回のINSERTを引き起こさない、UPSERTの処理を実現するという考慮が必要です




デッドレターキューには未対応


  • デッドレターキューの仕組みがAzure Queue Storageにはありません。ログ等で対応するか、必要な場合はService Bus Queue を検討します

  • 念のため記載しますが、Queue Storageは最大7日間のメッセージ保持ですから、Queue Storage上で別のQueueを作ってデッドレターキューとする場合には、デッドレターキューも最大7日間保持ですね


ストレージはV1かV2か意識して検討する



  • データ操作の課金がV1とV2で10倍差があります。Queueの操作が意図せず増大した場合、10倍の開きがあると課金への影響が心配になります。よってV1としておくのが賢明と考えています。V1とV2ではデータ量による課金ではV2が少し安いです。V2はRA-GRSにデータ転送課金がかかるのも気になるところ


    • RA-GRSでクラス2の操作(エンキューやデキュー的操作を含む)は、1万操作あたり、汎用V1は0.0404円、汎用V2は0.45円