3
2

CRDのない超簡単Kubernetes Operator(Python編)

Last updated at Posted at 2023-12-29

はじめに

本稿は、KubernetesのOperator初学者のために、Operatorのコントローラの部分だけを簡単に書いてみて、Operatorが何をしているかを明らかにします。

ここではPythonとkubernetesモジュール(kubernetes Python SDK)を用いて、Operatorとして動作する簡単なプログラムを書きます。これは「CRDのない超簡単Kubernetes Operator」の第2弾であり、第1弾の続編です。まだ読んでいない方は第1弾のシェルスクリプト編から読むことをお勧めします。

シェルスクリプト編では、この一連の記事の目的なども説明しています。

Python編では、Operatorに関する理解を促すだけではなく、Pythonを使用してクラスターAPIを操作する方法についても簡単に説明します。

目標

本稿と、関連する一連の記事では以下を目標とします。取り消し線の項目は、本稿ではなく別記事で説明します。

  • Kubernetes Operatorをざっくり理解する
  • Python SDKを使用してOperatorのロジックを作成する
  • Python Operator SDK (kopf)を使用してOperatorを作成する

また、タイトルにあるようにここではOperatorのコントローラが実現するロジックやコーディングを中心に説明し、CRDについては触れません。また作成したロジックをOperatorとして動作させるための周辺作業については必要最小限の説明とさせてください。

必要なもの

前提とする知識

Kubernetesの一般的なオブジェクト(Deployment, Pod)と内容とその関係について理解している必要があります。

また、プログラムを読んでわかる程度のPythonの知識を必要とします。

用意する環境

シェルスクリプト編と同様に、見てわかることを目標にしているため、必要な環境は特にありません。

もちろん、実際に動かした方が理解が進むはずなので、何らかのKubernetesクラスターを用意していただくのが良いです。本稿の例を作成する際にOpenShift Localを使用しました。

PythonでKubernetes APIにアクセスするために、Kubernetes Python clientを使用します。これはGitHubでも提供されていますが、kubernetesパッケージとしてPyPIで提供されているので、pipコマンドでインストールできます。

$ pip install -y kubernetes

APIのドキュメントは、同じくGitHubで提供されています。

今回のお題

この記事の基本方針はCRDを用意しないことであるため、既存のKubernetesリソースであるDeploymentを利用する単純なDeploymentコントローラを作成します。Kubernetesには組み込みでDeploymentコントローラが動作しており、Deploymentの記述に従い、Podが作成されます。ここではその動作と並行で別のPodを起動するOperatorを作成します。

  1. 指定したnamespaceに存在するDeploymentリソースに対して、同じ名前のPodを作成する。
  2. コンテナは何でも構わない(今回は、適当に無限ループするシェルスクリプトとする)。
  3. Deploymentリソースが削除されたら、Podも削除される。

標準的なDeploymentコントローラのサブセットであり、これをsimple-deployment Operatorと名付けます。

そのため、1つのDeploymentに対して、標準的なDeploymentが起動するPodと、simple-deployment operatorが起動するPodの2種類のPodが起動することになります。

simple-deployment operator

control loop

上記の要件 1.を満たすために、次のコントロールループを実行します。まず疑似コードで示します。

無限ループ {
  namespace内に存在するDeploymentのリストを取得

  for d in Deploymentのリスト {
    if dと同じ名前のPodが存在しない {
      dと同じ名前のPodを起動する
    }
  }
}

Pythonでは次のコードになります。

def control_loop() -> None:
    while True:
        deployment_list = get_deployment_list('simple-deployment')

        for item in deployment_list:
            deployment_name = item.metadata.name
            namespace       = item.metadata.namespace

            # Reconciliation
            pod_name = deployment_name
            if (find_pod(pod_name, namespace) == None):
                start_pod(make_pod(pod_name, deployment_name), namespace)
        sleep(10)

get_deployment_listは、指定したnamespace内の全Deploymentリソースのリストを返す関数です。Deploymentリソースは、その構造に従い、dot記法でアクセスできます。このコードでは、取得したDeploymentリソースの.metadata.nameや、.metadata.namespaceにアクセスしています。

このDeploymentリソースは、PythonのV1Deploymentクラスとして実装されています。詳しくは、KubernetesのPython SDKマニュアルを参照してください。これらのリソースクラスについては、起動するPodのリソースを組み立てるコードの説明で詳しく取り上げます。

find_podは、指定したpodが存在すれば、そのリソースを返します。make_podはPodリソースを作成し、それをstart_podで起動します。

コントローラの機能はこれで全てです。前述したsimple-deployment operatorの機能である、「コンテナの指定」や「Deploymentを削除したらPodも削除される」などはPodの作成の際にPodリソースの中で定義します。

サポート関数

imports

ここからはcontrol loopで使用するサポート関数をそれぞれ説明します。まず最初に必要なインポートを説明します。

from kubernetes import client, config
from kubernetes.client.rest import ApiException
from kubernetes.client import V1Pod, V1ObjectMeta, V1PodSpec, V1Deployment

このclientは、Kubernetesインターフェースです。V1Pod, V1ObjectMeta, V1PodSpec, V1Deploymentは、Kubernetesリソースをクラスとして表現したもので、APIを呼び出す際に使用します。

get_deployment_list

def get_deployment_list(namespace:str) -> []:
    api = client.AppsV1Api()
    try:
        response = api.list_namespaced_deployment(namespace)
    except ApiException as e:
        return []
    return response.items

get_deployment_listでは、namespace内の全Deploymentを取得します。ドキュメントからそれらしいものを探し、動作を確認してから使用します。

ここでは、list_namespaced_depoymentが適切です。このエンドポイントは、一覧によるとAppsV1Apiクラスに属しているため、client.AppsV1Api()でクラスを作成し、そのメソッドとして呼び出します。

APIはDeploymentのリストを返すので、そのitems属性に入っている配列をget_deployment_list関数から返します。指定したnamespaceにDeploymentが存在しない場合には、Exceptionが発生するので、その際には空のリストを返しています。

find_pod

find_podは、このOperatorの動作である「Deploymentと同じ名前のPodが存在しなければ作成する」のために、指定した名前のPodがnamespace内にあるかないかを確認します。

def find_pod(name:str, namespace:str) -> V1Pod:
    api = client.CoreV1Api()
    try:
        response = api.read_namespaced_pod(name, namespace)
    except ApiException as e:
        return None
    return response

大きな構造は、get_deployment_listと同じです。呼び出すread_namespaced_podが、CoreV1Apiクラスのメソッドなので、ここではCoreV1Apiクラスをインスタンス化しています。

start_pod

start_podはパラメータとしてPodリソースを受け取り、それを起動します。Podリソースは、別の関数で作成するとして、ここでは単純にcreate_namespaced_podを呼び出すだけです。

def start_pod(pod:V1Pod, namespace:str) -> None:
    """
    instanceate the pod
    """
    api = client.CoreV1Api()

    try:
        response = api.create_namespaced_pod(namespace, pod)
    except ApiException as e:
        print(f"Exception when calling CoreV1Api->create_namespaced_pod: {e}")

面倒なところ、つまりPodのリソースの組み立ては、別の関数にお任せです。それは後で説明する make_podmake_prototype_pod で行っています。

Kubernetesリソースの組み立て

make_prototype_pod

make_prototype_podは中身のない(空の)Podリソースを作成します。Podリソースは、V1Podクラスとして実装されているので、そのインスタンスを作ることから始めます。

pod = V1Pod()

みなさんよくご存知の通り、Podリソースはyaml的に表現すると、おおよそ次のような形をしています。

apiVersion: v1
kind: Pod
metadata:
    メタデータについて色々
spec:
    specについて色々
status:
    ステータスについて色々

APIドキュメントに書かれている通り、V1Podクラスは、apiVersion, kind, metadata, spec, statusをキーとして持つ辞書です。apiVersion, kindは前述のyaml的表現でもわかるように単なるstrですが、metadata, spec, statusは、その下にそれぞれのデータ構造を持ち、PythonSDKではクラスとして表現されています。したがって、前述のようにV1Pod()をインスタンス化しただけでは十分ではなく、それぞれのデータに適切なデータ構造を設定しなければなりません。

pod = V1Pod()
pod.metadata = V1ObjectMeta()
pod.spec     = V1PodSpec()
pod.status   = V1PodStatus()

ここで必要とするPodリソースは、Podを作成するためのものなので、make_prototype_podでは、statusを設定していません。またmetadataやspecは後から設定もできますが、V1Pod()のキーワード引数としても渡しても設定できるため、ここではV1ObjectMetaV1PodSpecのインスタンスを作成した後にV1Podのインスタンスを作成しています。関数のコード全体を示します。

def make_prototype_pod() -> V1Pod:
    """
    Create a generic V1Pod data
    """
    metadata = V1ObjectMeta()
    spec = V1PodSpec(containers=[])

    # pod construction
    pod = V1Pod(metadata=metadata, spec=spec)

    return pod

make_pod

make_podでは、make_prototype_podで作成した、空のPodリソースに値を詰めます。

先にコード全体を見てみましょう。

def make_pod(name:str, deployment_name:str) -> V1Pod:
    """
    Create a pod controled by the deployment
    """
    pod = make_prototype_pod()
    pod.metadata.name = name
    pod.spec.containers = [{
                        "command": [
                            "bash", "-c", f"while true ; do echo {name} ; sleep 1 ; done"
                        ],
                        "image": "ubi8",
                        "name": "echo",
                    }]

    pod.metadata.labels = { "app": name }

    # set the deployment as its owner for garbage collection of the deployment
    pod.metadata.owner_references = [
                    {
                        "apiVersion": "apps/v1",
                        "blockOwnerDeletion": True,
                        "controller": True,
                        "kind": "Deployment",
                        "name": deployment_name,
                        "uid": find_owner_reference(deployment_name)
                    }
                ]
    # print(pod)
    return pod

pod.metadata.owner_referencesを設定している箇所は後述します。それ以外は、

  • pod.metadata.nameにPodの名前
  • pod.metadata.labelsにラベル
  • pod.spec.containersにコンテナ仕様

を設定しているだけです。これにより、よくある次のようなPodリソースができます。

apiVersion: v1
kind: Pod
metadata:
  name: Podの名前
spec:
  containers:
  - command:
    - "bash"
    - "-c"
    - "while true ; do echo Podの名前 ; sleep 1 ; done"
    image: ubi8
    name: echo

Podの名前は、make_podのパラメータとして受け取ったものです。コンテナのコマンドでPodの名前をechoさせるために、Pythonのf文字列を使用している点に注意してください。

pod.spec.containersは、実際のところV1Containerクラスのインスタンス(のリスト)ですが、V1Containerインスタンスの代わりに必要な値をキーとして持つPython辞書も使用できます。ここではPython辞書を使用しています。pod.metadata.labelsも同様です。

ガベージコレクション

以上で、simple-deployment Operatorのロジックは完成しました。指定したnamespaceにDeploymentが存在すると、それと同じPodを起動します。

それでは、そのDeploymentが削除されるとどうなるでしょうか。期待としては同じ名前を持つPodも消えて欲しいところです。これをコントローラのロジックとして実現する方法もありますが、ここではKubernetesのガベージコレクションの仕組みを利用します。

Kubernetesは使用されていないリソースを自動的に削除するガベージコレクション機能を持ちます。ガベージコレクタはownerのいないリソースを削除対象としますが、そのために各リソースのownerReferenceを参照します。make_podの中で設定していたものです。

pod.metadata.ownerReferenceは、V1OwnerReferenceのインスタンス(のリスト)ですが、ここでもPython辞書を指定しています。詳細を見てみましょう。

    pod.metadata.owner_references = [
                    {
                        "apiVersion": "apps/v1",
                        "blockOwnerDeletion": True,
                        "controller": True,
                        "kind": "Deployment",
                        "name": deployment_name,
                        "uid": find_owner_reference(deployment_name)
                    }
                ]

これにより、uidを持つDeploymentが削除された際に、このPodリソースも削除されます。

詳しくは、Kubernetesドキュメントを参照してください。

参照先Deploymentのuidを得るために、find_owner_reference関数を使用しています。これは単に、find_deploymentのラッパーであり、目的とするDeploymentのuidを返します。

def find_owner_reference(name:str) -> str:
    """
    get operator's deployment UID to set owner_reference of a pod 
    """
    deployment = find_deployment(name, NAMESPACE)

    if (deployment):
        return deployment.metadata.uid
    else:
        return None

コード

最後に、コード全体を示します。

from time import sleep
from datetime import datetime as dt
import urllib3
from urllib3.exceptions import InsecureRequestWarning
urllib3.disable_warnings(InsecureRequestWarning)

from kubernetes import client, config, utils
from kubernetes.client.rest import ApiException
    
from kubernetes.client import V1Pod, V1ObjectMeta, V1PodSpec, V1Deployment

NAMESPACE = "simple-deployment"
CONTROL_LOOP_INTERVAL = 10


def make_prototype_pod() -> V1Pod:
    """
    Create a generic V1Pod data
    """
    metadata = V1ObjectMeta()
    spec = V1PodSpec(containers=[])

    # pod construction
    pod = V1Pod(metadata=metadata, spec=spec)

    return pod


def make_pod(name:str, deployment_name:str) -> V1Pod:
    """
    Create a pod controled by the deployment
    """
    pod = make_prototype_pod()
    pod.metadata.name = name
    pod.spec.containers = [{
                        "command": [
                            "bash", "-c", f"while true ; do echo {name} ; sleep 1 ; done"
                        ],
                        "image": "ubi8",
                        "name": "echo",
                    }]

    pod.metadata.labels = { "app": name }

    # set the deployment as its owner for garbage collection of the deployment
    pod.metadata.owner_references = [
                    {
                        "apiVersion": "apps/v1",
                        "blockOwnerDeletion": True,
                        "controller": True,
                        "kind": "Deployment",
                        "name": deployment_name,
                        "uid": find_owner_reference(deployment_name)
                    }
                ]
    # print(pod)
    return pod


def find_owner_reference(name:str) -> str:
    """
    get operator's deployment UID to set owner_reference of a pod 
    """
    deployment = find_deployment(name, NAMESPACE)

    if (deployment):
        return deployment.metadata.uid
    else:
        return None


def start_pod(pod:V1Pod, namespace:str) -> None:
    """
    instanceate the pod
    """
    api = client.CoreV1Api()

    try:
        response = api.create_namespaced_pod(namespace, pod)
    except ApiException as e:
        print(f"Exception when calling CoreV1Api->create_namespaced_pod: {e}")


def find_pod(name:str, namespace:str) -> V1Pod:
    """
    get specified pod
    """
    api = client.CoreV1Api()

    try:
        response = api.read_namespaced_pod(name, namespace)
    except ApiException as e:
        print(f"{dt.now()} : Exception when calling CoreV1Api->read_namespaced_pod: {e}")
        return None

    return response


def find_deployment(name:str, namespace:str) -> V1Deployment:
    """
    get specified deployment
    """
    api = client.AppsV1Api()

    try:
        response = api.read_namespaced_deployment(name, namespace, pretty=False)
    except ApiException as e:
        print(f"{dt.now()} : Exception when calling AppsV1Api->read_namespaced_deployment: {e}")
        return None

    return response


def get_deployment_list(namespace:str) -> []:
    """
    get list of all deployments in the namespace
    """
    api = client.AppsV1Api()

    try:
        response = api.list_namespaced_deployment(namespace)
    except ApiException as e:
        print(f"{dt.now()} : Exception when calling AppsV1Api->list_namespaced_deployment: {e}")
        return []

    return response.items


def control_loop() -> None:
    """
    operator controll loop
    """
    while True:
        print(f"{dt.now()} : control loop start")

        deployment_list = get_deployment_list(NAMESPACE)

        for item in deployment_list:
            deployment_name = item.metadata.name
            namespace       = item.metadata.namespace
            print(f"{dt.now()} : deployment name = {deployment_name}")

            # Reconciliation
            pod_name = deployment_name

            if (find_pod(pod_name, namespace) == None):
                print(f"{dt.now()} : pod {pod_name} is not found, create a new one")
                start_pod(make_pod(pod_name, deployment_name), namespace)

        sleep(CONTROL_LOOP_INTERVAL)


if __name__ == "__main__":

    config.load_kube_config("./kubeconfig")

    control_loop()

    exit(1)

まとめ

Kubernetes PythonSDKを使用して、Deploymentリソースを参照するOperator simple-deploymentを作成しました。

このOperator(コントローラ)は、動作が単純であるため、参照するリソース(Deployment)の有無しか見ていません。しかし、一般的なOperatorが行うように、リソースの修正をどのように検知すれば良いでしょうか。また、ガベージコレクションを行うためのownerReferenceの設定など、面倒なところがあるのも悩ましいところです。

こうした、Operator(コントローラ)を作成する際に必要となる様々な機能は、OperatorSDKが担当する分野です。Pythonでコントローラを作成するのであれば、kopfがそれらを行います。もう少し込み入ったコントローラが必要となる場合には、力技でロジックを書くこともできますが、Operator SDKの力を借りることで、作業はずっと楽になります。

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2