LoginSignup
4
5

More than 5 years have passed since last update.

Azure Functions の Custom Trigger を書いて理解してみた

Last updated at Posted at 2018-02-26

Azure Functions の Trigger のメカニズムを理解するために、Custom Trigger を書いてみた。お題としては Kubernetes の Pod を監視して、一定時間うまく動いていなさげだったら通知するというトリガーになっている。このお題を元にWebJobs と、Azure Functions の双方で動くKubernetes Trigger Extension を書いてみた。

KubernetesTrigger.png

サンプルコード

既に、@yu_ka1984 さんが出たてなのにすでに実施してブログを書いている。いつもながら凄いセンスです。

あと、正式なページが出来ているみたいです。

ここから、サンプルプログラムもたどれます。

これらさえあれば、もう十分なはず。今からは自分の理解のためにブログを書きます。

私の今回のサンプルコードはこちら。

アーキテクチャ

私には結構ややこしく感じたのでコードを理解して図にしてみました。

Custom Trigger Class Diagram.jpg

それぞれ解説します。

Attribute クラス

Azure Functions のコードがこんな感じになるとすると、[KubernetesTrigger] を表すクラス。単なるアトリビュート。ここに、トリガーに渡す値を書いておく。私はデフォルト値にしているので、パラメータは無いように見えるが複数持っています。

    public static class KubernetesTriggerSample
    {
        [FunctionName("KubernetesTriggerSample")]
        public static void Run([KubernetesTrigger] KubernetesTriggerValue value, TraceWriter log)
        {
            log.Info("**** Something Wrong with your Pods. *** ");
            // Console.WriteLine(value.Result);
        }
    }

コードのポイントは、[Binding] を持っていることです。 [AttributeUsage] はクラスをAttribute にするためのものです。[Binding] をつけることで、Azure Functions に通知することになります。

KubernetesTriggerAttribute

    [AttributeUsage(AttributeTargets.Parameter)]
    [Binding]
    public class KubernetesTriggerAttribute : Attribute
    {

        public string Token { get; set; }

        public int PendingTimeLimit { get; set; }

        public KubernetesTriggerAttribute()
        {
            Token = System.Environment.GetEnvironmentVariable("kubernetesToken");
            PendingTimeLimit = 5;
        }
    }

ちなみにマニュアルによると、[AppSetting(Default = "kubernetesToken")] みたいなコードが書ける雰囲気で実際にやってみましたが、Trigger ではうまく動かなかったです。(何かミスしているのかもしれませんが、、、)

ExtensionConfigProvider

この Extension を WebJobs や Azure Functions に登録(レジスト)するために存在します。WebJobs では、Program.cs 等で自ら登録しないといけません。

Program.cs

        static void Main(string[] args)
        {
            JobHostConfiguration config = new JobHostConfiguration();
            FilesConfiguration filesConfig = new FilesConfiguration();
            if(config.IsDevelopment)
            {
                config.UseDevelopmentSettings();
                filesConfig.RootPath = @"C:\temp\files";
            }

            config.UseFiles(filesConfig);
            config.KubernetesSample();

しかしながら、Azure Functions では、IExtensionConfigProvider を実装したクラスが、public のスコープになっていれば、自動的にレジストしてくれるようになっています。(node の場合は、環境変数にextensionのディレクトリを指定する必要あり。オフィシャルのサンプル参照) 単にライブラリをリファレンスすればOK。

ここで、次のBindingProvider を渡しています。

        public class KubernetesExtensionConfig : IExtensionConfigProvider
        {
            private TraceWriter _tracer;
            public void Initialize(ExtensionConfigContext context)
            {
                if (context == null)
                {
                    throw new ArgumentNullException("context");
                }
                if (context.Trace == null)
                    throw new ArgumentNullException("context.Trace");

                _tracer = context.Trace;
                // Register our extension bindings providers
                context.Config.RegisterBindingExtensions(
                    new KubernetesTriggerAttributeBindingProvider());
            }
        }

ITriggerBindingProvider

ここでは、Azure Functions に、トリガーをバインドするためのプロバイダ。ここがメインみたいなものです。TryCreateAsync メソッドで、ITriggerBinding を返しています。TriggerBinding では、大きく二つのことをしています。

  1. パラメータの型変換 (BindAsync)
  2. Listner の登録

Azure Functions のトリガーや、バインディングは、複数の型をサポートしていると思いますが、それは、このBindingの中で変換するロジックを提供することで実現しています(BindAsync)。私のTrigger は特に必要ないのでいろいろしていません。あとは、Listener の登録です。トリガーの場合、トリガーを発火させるためのロジックがListener に書かれますが、そのLister を登録します。

KubernetesTriggerBinding

   private class KubernetesTriggerBinding : ITriggerBinding
        {
            private readonly ParameterInfo _parameter;
            private readonly IReadOnlyDictionary<string, Type> _bindingContract;

            public KubernetesTriggerBinding(ParameterInfo parameter)
            {
                _parameter = parameter;
                _bindingContract = CreateBindingDataContract();
            }

            public IReadOnlyDictionary<string, Type> BindingDataContract
            {
                get { return _bindingContract; }
            }

            public Type TriggerValueType
            {
                get { return typeof(KubernetesTriggerValue); }
            }


            public Task<ITriggerData> BindAsync(object value, ValueBindingContext context)
            {
                // TODO: Perfrom any required conversions on the value 
                KubernetesTriggerValue triggerValue = value as KubernetesTriggerValue;
                IValueBinder valueBinder = new KubernetesValueBinder(_parameter, triggerValue);
                return Task.FromResult<ITriggerData>(new TriggerData(valueBinder, GetBindingData(triggerValue)));
            }

            public Task<IListener> CreateListenerAsync(ListenerFactoryContext context)
            {
                return Task.FromResult<IListener>(new Listener(context.Executor, _parameter.GetCustomAttribute<KubernetesTriggerAttribute>(false)));
            }

            public ParameterDescriptor ToParameterDescriptor()
            {
                return new KubernetesTriggerParameterDescriptor
                {
                    Name = _parameter.Name,
                    DisplayHints = new ParameterDisplayHints
                    {
                        // TODO: Customize your Dashboard display strings
                        Prompt = "Kubernetes",
                        Description = "Kubernetes Trigger fired",
                        DefaultValue = "Kubernets"
                    }
                };
            }

            private IReadOnlyDictionary<string, Type> CreateBindingDataContract()
            {
                Dictionary<string, Type> contract = new Dictionary<string, Type>(StringComparer.OrdinalIgnoreCase);
                contract.Add("KubernetesTrigger", typeof(KubernetesTriggerValue));
                return contract;
            }

            private IReadOnlyDictionary<string, object> GetBindingData(KubernetesTriggerValue value)
            {
                Dictionary<string, object> bindingData = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase);
                bindingData.Add("KubernetesTrigger", value);
                // TOD: Add any additional binding data
                return bindingData;
            }
        }

        private class KubernetesTriggerParameterDescriptor : TriggerParameterDescriptor
        {
            public override string GetTriggerReason(IDictionary<string, string> arguments)
            {
                // TODO: Customize your Dashboard display string
                return string.Format("Kubernetes trigger fired at {0}", DateTime.Now.ToString("o"));
            }
        }

Listener

最後にトリガーを実行するリスナーです。KubernetesTriggerValue は単なる POCO で、Kubernetes トリガーの引数として渡されます。

    public static class KubernetesTriggerSample
    {
        [FunctionName("KubernetesTriggerSample")]
        public static void Run([KubernetesTrigger] KubernetesTriggerValue value, TraceWriter log)
        {
            log.Info("**** Something Wrong with your Pods. *** ");
            // Console.WriteLine(value.Result);
        }
    }

ですので、トリガーがかかったらそこに値を詰めて返します。Timer がFire したら、OnTimer メソッドのなかでその処理をするようにしています。StartAsync で、トリガー処理を開始します。ですので、Azure Functions の Trigger の挙動を確認したい場合は、Listener を探してそのコードを読むと挙動がわかりそうです。ちなみに、プロダクションチームによると、Azure Functions の Scale Controller の制御は、Custom Trigger はできないので、負荷が高いとスケールさせるとかは、無理みたいですね。(QueueTriggerとか、EventHub Trigger はできます。どんなコードになっているのか追ってみたい)

Listner

      private class Listener : IListener
        {
            private ITriggeredFunctionExecutor _executor;
            private KubernetesTriggerAttribute _attribute;
            private System.Timers.Timer _timer;

            public Listener(ITriggeredFunctionExecutor executor, KubernetesTriggerAttribute attribute)
            {
                _executor = executor;
                _attribute = attribute;
                _timer = new System.Timers.Timer(5 * 1000)
                {
                    AutoReset = true
                };
                _timer.Elapsed += OnTimer;
            }

            public void Cancel()
            {
                // TODO: cancel the task
            }

            public void Dispose()
            {
                // Do some clean up
                _timer.Dispose();
            }

            public Task StartAsync(CancellationToken cancellationToken)
            {
                // TODO: Start monitoring your event source. 
                _timer.Start();


                return Task.FromResult(true);
            }

            public Task StopAsync(CancellationToken cancellationToken)
            {
                // TODO: Stop monitoring your event source
                _timer.Stop();
                return Task.FromResult(true);
            }

            private static HttpClient client;

            static Listener()
            {
                var httpClientHandler = new HttpClientHandler();
                httpClientHandler.ServerCertificateCustomValidationCallback = (message, cert, chain, sslPlicyErrors) => true;
                client = new HttpClient(httpClientHandler);
                client.BaseAddress = new Uri(System.Environment.GetEnvironmentVariable("serverUrl"));

            }

            private async void OnTimer(object sender, System.Timers.ElapsedEventArgs e)
            {
                // Call Kubernetes REST API

                client.DefaultRequestHeaders.Clear();
                client.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", this._attribute.Token);
                var response = await client.GetAsync("/api/v1/namespaces/default/pods");
                var result = await response.Content.ReadAsStringAsync();
                var resultOject = JsonConvert.DeserializeObject<Rootobject>(result);

                bool hasWrongPod = false;
                foreach (var item in resultOject.items)
                {
                    var ts = DateTime.UtcNow - item.status.startTime;
                    if ("Pending" == item.status.phase || ts.TotalMinutes > 5)
                    {
                        Console.WriteLine("**** Wrong Pod Detected ****");
                        hasWrongPod = true;
                    }
                    Console.WriteLine($"Pod: {item.metadata.name}");
                    Console.WriteLine($"Status: {item.status.phase}");
                    Console.WriteLine($"Started {ts.TotalMinutes} min before");
                }
                // Get to know if it is wrong Pod
                if (hasWrongPod) {
                    // Trigger the function.
                    var triggerValue = new KubernetesTriggerValue();
                    triggerValue.Result = result;
                    TriggeredFunctionData input = new TriggeredFunctionData
                    {

                        TriggerValue = triggerValue
                    };
                    await _executor.TryExecuteAsync(input, CancellationToken.None);
                }

            }
        }

こんな感じでコードを書くと楽に Custom Trigger を書くことが出来ました。めでたしめでたし。ちなみに、Custom Trigger は、Functions V2 からの模様です。5秒ごとに、Kubernetes のクラスタにREST API で問い合わせをして、Pending が5分以上つづいているPod があったらトリガします。

afsample.png

ちなみに、Custom Trigger は、Scale Controller のコントロールは無理で、どうやら、Consumption Plan も厳しそうですね。下記参照。基本App Service プランでスケールは自分でしないようにする感じですね。それでも楽しいかも。

参考

4
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
5