Kafka Trigger
で Deployment Slot を使うとどうなるか?というのを聞かれたので実際に自分で試してみた。
Kafka Trigger サンプル
次のようなサンプルを作ってみる。バージョンは最新の<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Kafka" Version="3.1.0" />
を使っている。アプリはとても簡単なのもので、次のようになっている。
| Property | Production | Staging |
| -------- + ---------- + -------- |
| TOPIC | message | message_staging |
| SLOT_KIND | Production | Staging |
上記のように、Production/Staging で設定を変えてアプリのバージョンアップをしながら次の操作を行う。
- Message を message トピックに送信
- スワップ
- Message 2 を message トピックに送信
下記のアプリケーションは、途中で3分停止するようになっていて、その後処理を再開する。その3分の間にスワップをするとどのような挙動になるのかを調べてみたい。
public class SimpleKafkaTrigger
{
[FunctionName(nameof(SampleConsumerAsync))]
public async Task SampleConsumerAsync(
[KafkaTrigger(
"%ConfluentBrokerList%",
"%TOPIC%",
ConsumerGroup = "$Default",
Username = "%ConfluentCloudUsername%",
Password = "%ConfluentCloudPassword%",
Protocol = BrokerProtocol.SaslSsl,
SslCaLocation = "confluent_cloud_cacert.pem",
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string> kafkaEvent,
ILogger logger)
{
logger.LogInformation($"Ver 1.0.1 Slot [{Environment.GetEnvironmentVariable("SLOT_KIND")}] Consumed: {kafkaEvent.Value.ToString()}");
logger.LogInformation("Going To Sleep");
await Task.Delay(TimeSpan.FromMinutes(3));
logger.LogInformation($"Ver 1.0.1 Slot [{Environment.GetEnvironmentVariable("SLOT_KIND")}] Finished: {kafkaEvent.Value.ToString()}");
}
}
今回は Consumption Plan - Windows 試している。ホストのバージョンは3.0.14492.0
だった。デプロイメントスロットのスワップを設定する。AppSettings をまず Production 側で設定した後に、Staging 側を作成すると設定がコピーされて楽だった。そして、Deployment slot setting
を、SLOT_KIND
と TOPIC
のみチェックを入れる。これによって、それらのセッティングがスロットスワップでも入れ替わらなくなる。
準備は完了。試しに Confluence Cloud にメッセージを送信して、正しく読み込まれるかを確認する。Production には Ver 1.0.0 を Staging には Ver 1.0.1 をデプロイしている。
Message (before)
Confluent Cloud から Message を送信する
Swap
Message で起動されたFunctionが3分間停止している間に、Deployment Slots の Swap
ボタンをクリックしてスワップを行う。
上記に SLOT_KIND
と TOPIC
が含まれていないことを確認する。
Message (After)
Messageを送信する
Log
Application Insgihtsのログを時系列にソートしてどのようなことが起こっているかを分析する。
最初のメッセージが ver 1.0.0
として受け取られて、そのままスリープ状態になっている。その後、Host
が停止して、Functions自体のリスナーも停止している。ただし、このメッセージは終了していない。
基本的な挙動では、Swapの前に、Staging側のスロットのプリウォームが行われるので、ちゃんとFunctionsを起動させておくため、ここで、Production側 (V1.0.1, tsushiswap3) 側ホストのスタートが行われている。
その後に、該当のFunction が実行されているのがわかる。3つ動いているが、1つは私のオペミスなので、忘れてほしい。
面白いことに、Swapのあとのものだけではなく、Swap前のものが、新しいバージョンで起動している。Functions側でRe-Run が起こったのではなく、Kafka Trigger 側で、Functionsの実行が終了する前に途中でホストが止まったので、メッセージが再度 TOPIC に戻って再度 Consumpe されたと考えるのが自然だと思う。
Kafka Trigger 時の Swap Slot の挙動のまとめ
メッセージの実行中のFunctionsがあるときに、スワップが実行されるとホストが停止して、Production側に新しいバージョンで、再実行される。それは、Swap前のメッセージのFunction実行が終了してないからで、そのケースでは、TOPICにメッセージが戻されるため、Production側で、新しいアプリケーションとして、再度実行される。
つまり、この挙動から考えられることは、SwapしたいKafkaTriggerがある場合は、KafkaTriggerをIdempotent、つまり、何回実行しても結果は同じというコードにしておく必要がある。しかし、それを実行しておけば、Slotを使うと、バージョンの切り替えはメッセージのロスなく、簡単に行うことができるだろう。
P.S. 自分の実験では上記の通りだが、本当にこの解釈であってるかはコードを読んで本人に聞いてみたい。あと、Premium でも同じ挙動だと思うけど、どうなるかは気になるところ。Premium は testing in production の機能があるので、少しづつ切り替えることが可能なので。
NOTE: 本人に聞いてみたが、私の観察したとおりの振る舞いの様子(Premiumも) 10/5/2020