はじめに
本稿は、KubernetesのOperator初学者のために、Operatorのコントローラの部分だけを簡単に書いてみて、Operatorが何をしているかを明らかにします。
ここではPythonとkubernetesモジュール(kubernetes Python SDK)を用いて、Operatorとして動作する簡単なプログラムを書きます。これは「CRDのない超簡単Kubernetes Operator」の第2弾であり、第1弾の続編です。まだ読んでいない方は第1弾のシェルスクリプト編から読むことをお勧めします。
シェルスクリプト編では、この一連の記事の目的なども説明しています。
Python編では、Operatorに関する理解を促すだけではなく、Pythonを使用してクラスターAPIを操作する方法についても簡単に説明します。
目標
本稿と、関連する一連の記事では以下を目標とします。取り消し線の項目は、本稿ではなく別記事で説明します。
- Kubernetes Operatorをざっくり理解する
- Python SDKを使用してOperatorのロジックを作成する
Python Operator SDK (kopf)を使用してOperatorを作成する
また、タイトルにあるようにここではOperatorのコントローラが実現するロジックやコーディングを中心に説明し、CRDについては触れません。また作成したロジックをOperatorとして動作させるための周辺作業については必要最小限の説明とさせてください。
必要なもの
前提とする知識
Kubernetesの一般的なオブジェクト(Deployment, Pod)と内容とその関係について理解している必要があります。
また、プログラムを読んでわかる程度のPythonの知識を必要とします。
用意する環境
シェルスクリプト編と同様に、見てわかることを目標にしているため、必要な環境は特にありません。
もちろん、実際に動かした方が理解が進むはずなので、何らかのKubernetesクラスターを用意していただくのが良いです。本稿の例を作成する際にOpenShift Localを使用しました。
PythonでKubernetes APIにアクセスするために、Kubernetes Python clientを使用します。これはGitHubでも提供されていますが、kubernetesパッケージとしてPyPIで提供されているので、pipコマンドでインストールできます。
$ pip install -y kubernetes
APIのドキュメントは、同じくGitHubで提供されています。
今回のお題
この記事の基本方針はCRDを用意しないことであるため、既存のKubernetesリソースであるDeploymentを利用する単純なDeploymentコントローラを作成します。Kubernetesには組み込みでDeploymentコントローラが動作しており、Deploymentの記述に従い、Podが作成されます。ここではその動作と並行で別のPodを起動するOperatorを作成します。
- 指定したnamespaceに存在するDeploymentリソースに対して、同じ名前のPodを作成する。
- コンテナは何でも構わない(今回は、適当に無限ループするシェルスクリプトとする)。
- Deploymentリソースが削除されたら、Podも削除される。
標準的なDeploymentコントローラのサブセットであり、これをsimple-deployment Operatorと名付けます。
そのため、1つのDeploymentに対して、標準的なDeploymentが起動するPodと、simple-deployment operatorが起動するPodの2種類のPodが起動することになります。
simple-deployment operator
control loop
上記の要件 1.を満たすために、次のコントロールループを実行します。まず疑似コードで示します。
無限ループ {
namespace内に存在するDeploymentのリストを取得
for d in Deploymentのリスト {
if dと同じ名前のPodが存在しない {
dと同じ名前のPodを起動する
}
}
}
Pythonでは次のコードになります。
def control_loop() -> None:
while True:
deployment_list = get_deployment_list('simple-deployment')
for item in deployment_list:
deployment_name = item.metadata.name
namespace = item.metadata.namespace
# Reconciliation
pod_name = deployment_name
if (find_pod(pod_name, namespace) == None):
start_pod(make_pod(pod_name, deployment_name), namespace)
sleep(10)
get_deployment_list
は、指定したnamespace内の全Deploymentリソースのリストを返す関数です。Deploymentリソースは、その構造に従い、dot記法でアクセスできます。このコードでは、取得したDeploymentリソースの.metadata.name
や、.metadata.namespace
にアクセスしています。
このDeploymentリソースは、PythonのV1Deployment
クラスとして実装されています。詳しくは、KubernetesのPython SDKマニュアルを参照してください。これらのリソースクラスについては、起動するPodのリソースを組み立てるコードの説明で詳しく取り上げます。
find_pod
は、指定したpodが存在すれば、そのリソースを返します。make_pod
はPodリソースを作成し、それをstart_pod
で起動します。
コントローラの機能はこれで全てです。前述したsimple-deployment operatorの機能である、「コンテナの指定」や「Deploymentを削除したらPodも削除される」などはPodの作成の際にPodリソースの中で定義します。
サポート関数
imports
ここからはcontrol loopで使用するサポート関数をそれぞれ説明します。まず最初に必要なインポートを説明します。
from kubernetes import client, config
from kubernetes.client.rest import ApiException
from kubernetes.client import V1Pod, V1ObjectMeta, V1PodSpec, V1Deployment
このclient
は、Kubernetesインターフェースです。V1Pod
, V1ObjectMeta
, V1PodSpec
, V1Deployment
は、Kubernetesリソースをクラスとして表現したもので、APIを呼び出す際に使用します。
get_deployment_list
def get_deployment_list(namespace:str) -> []:
api = client.AppsV1Api()
try:
response = api.list_namespaced_deployment(namespace)
except ApiException as e:
return []
return response.items
get_deployment_list
では、namespace内の全Deploymentを取得します。ドキュメントからそれらしいものを探し、動作を確認してから使用します。
ここでは、list_namespaced_depoyment
が適切です。このエンドポイントは、一覧によるとAppsV1Api
クラスに属しているため、client.AppsV1Api()
でクラスを作成し、そのメソッドとして呼び出します。
APIはDeploymentのリストを返すので、そのitems属性に入っている配列をget_deployment_list関数から返します。指定したnamespaceにDeploymentが存在しない場合には、Exceptionが発生するので、その際には空のリストを返しています。
find_pod
find_pod
は、このOperatorの動作である「Deploymentと同じ名前のPodが存在しなければ作成する」のために、指定した名前のPodがnamespace内にあるかないかを確認します。
def find_pod(name:str, namespace:str) -> V1Pod:
api = client.CoreV1Api()
try:
response = api.read_namespaced_pod(name, namespace)
except ApiException as e:
return None
return response
大きな構造は、get_deployment_list
と同じです。呼び出すread_namespaced_pod
が、CoreV1Api
クラスのメソッドなので、ここではCoreV1Api
クラスをインスタンス化しています。
start_pod
start_podはパラメータとしてPodリソースを受け取り、それを起動します。Podリソースは、別の関数で作成するとして、ここでは単純にcreate_namespaced_pod
を呼び出すだけです。
def start_pod(pod:V1Pod, namespace:str) -> None:
"""
instanceate the pod
"""
api = client.CoreV1Api()
try:
response = api.create_namespaced_pod(namespace, pod)
except ApiException as e:
print(f"Exception when calling CoreV1Api->create_namespaced_pod: {e}")
面倒なところ、つまりPodのリソースの組み立ては、別の関数にお任せです。それは後で説明する make_pod
と make_prototype_pod
で行っています。
Kubernetesリソースの組み立て
make_prototype_pod
make_prototype_pod
は中身のない(空の)Podリソースを作成します。Podリソースは、V1Pod
クラスとして実装されているので、そのインスタンスを作ることから始めます。
pod = V1Pod()
みなさんよくご存知の通り、Podリソースはyaml的に表現すると、おおよそ次のような形をしています。
apiVersion: v1
kind: Pod
metadata:
メタデータについて色々
spec:
specについて色々
status:
ステータスについて色々
APIドキュメントに書かれている通り、V1Pod
クラスは、apiVersion
, kind
, metadata
, spec
, status
をキーとして持つ辞書です。apiVersion
, kind
は前述のyaml的表現でもわかるように単なるstrですが、metadata
, spec
, status
は、その下にそれぞれのデータ構造を持ち、PythonSDKではクラスとして表現されています。したがって、前述のようにV1Pod()をインスタンス化しただけでは十分ではなく、それぞれのデータに適切なデータ構造を設定しなければなりません。
pod = V1Pod()
pod.metadata = V1ObjectMeta()
pod.spec = V1PodSpec()
pod.status = V1PodStatus()
ここで必要とするPodリソースは、Podを作成するためのものなので、make_prototype_pod
では、statusを設定していません。またmetadataやspecは後から設定もできますが、V1Pod()
のキーワード引数としても渡しても設定できるため、ここではV1ObjectMeta
とV1PodSpec
のインスタンスを作成した後にV1Pod
のインスタンスを作成しています。関数のコード全体を示します。
def make_prototype_pod() -> V1Pod:
"""
Create a generic V1Pod data
"""
metadata = V1ObjectMeta()
spec = V1PodSpec(containers=[])
# pod construction
pod = V1Pod(metadata=metadata, spec=spec)
return pod
make_pod
make_pod
では、make_prototype_pod
で作成した、空のPodリソースに値を詰めます。
先にコード全体を見てみましょう。
def make_pod(name:str, deployment_name:str) -> V1Pod:
"""
Create a pod controled by the deployment
"""
pod = make_prototype_pod()
pod.metadata.name = name
pod.spec.containers = [{
"command": [
"bash", "-c", f"while true ; do echo {name} ; sleep 1 ; done"
],
"image": "ubi8",
"name": "echo",
}]
pod.metadata.labels = { "app": name }
# set the deployment as its owner for garbage collection of the deployment
pod.metadata.owner_references = [
{
"apiVersion": "apps/v1",
"blockOwnerDeletion": True,
"controller": True,
"kind": "Deployment",
"name": deployment_name,
"uid": find_owner_reference(deployment_name)
}
]
# print(pod)
return pod
pod.metadata.owner_references
を設定している箇所は後述します。それ以外は、
- pod.metadata.nameにPodの名前
- pod.metadata.labelsにラベル
- pod.spec.containersにコンテナ仕様
を設定しているだけです。これにより、よくある次のようなPodリソースができます。
apiVersion: v1
kind: Pod
metadata:
name: Podの名前
spec:
containers:
- command:
- "bash"
- "-c"
- "while true ; do echo Podの名前 ; sleep 1 ; done"
image: ubi8
name: echo
Podの名前は、make_pod
のパラメータとして受け取ったものです。コンテナのコマンドでPodの名前をecho
させるために、Pythonのf文字列を使用している点に注意してください。
pod.spec.containersは、実際のところV1Containerクラスのインスタンス(のリスト)ですが、V1Container
インスタンスの代わりに必要な値をキーとして持つPython辞書も使用できます。ここではPython辞書を使用しています。pod.metadata.labelsも同様です。
ガベージコレクション
以上で、simple-deployment Operatorのロジックは完成しました。指定したnamespaceにDeploymentが存在すると、それと同じPodを起動します。
それでは、そのDeploymentが削除されるとどうなるでしょうか。期待としては同じ名前を持つPodも消えて欲しいところです。これをコントローラのロジックとして実現する方法もありますが、ここではKubernetesのガベージコレクションの仕組みを利用します。
Kubernetesは使用されていないリソースを自動的に削除するガベージコレクション機能を持ちます。ガベージコレクタはownerのいないリソースを削除対象としますが、そのために各リソースのownerReferenceを参照します。make_pod
の中で設定していたものです。
pod.metadata.ownerReferenceは、V1OwnerReferenceのインスタンス(のリスト)ですが、ここでもPython辞書を指定しています。詳細を見てみましょう。
pod.metadata.owner_references = [
{
"apiVersion": "apps/v1",
"blockOwnerDeletion": True,
"controller": True,
"kind": "Deployment",
"name": deployment_name,
"uid": find_owner_reference(deployment_name)
}
]
これにより、uidを持つDeploymentが削除された際に、このPodリソースも削除されます。
詳しくは、Kubernetesドキュメントを参照してください。
参照先Deploymentのuidを得るために、find_owner_reference
関数を使用しています。これは単に、find_deployment
のラッパーであり、目的とするDeploymentのuidを返します。
def find_owner_reference(name:str) -> str:
"""
get operator's deployment UID to set owner_reference of a pod
"""
deployment = find_deployment(name, NAMESPACE)
if (deployment):
return deployment.metadata.uid
else:
return None
コード
最後に、コード全体を示します。
from time import sleep
from datetime import datetime as dt
import urllib3
from urllib3.exceptions import InsecureRequestWarning
urllib3.disable_warnings(InsecureRequestWarning)
from kubernetes import client, config, utils
from kubernetes.client.rest import ApiException
from kubernetes.client import V1Pod, V1ObjectMeta, V1PodSpec, V1Deployment
NAMESPACE = "simple-deployment"
CONTROL_LOOP_INTERVAL = 10
def make_prototype_pod() -> V1Pod:
"""
Create a generic V1Pod data
"""
metadata = V1ObjectMeta()
spec = V1PodSpec(containers=[])
# pod construction
pod = V1Pod(metadata=metadata, spec=spec)
return pod
def make_pod(name:str, deployment_name:str) -> V1Pod:
"""
Create a pod controled by the deployment
"""
pod = make_prototype_pod()
pod.metadata.name = name
pod.spec.containers = [{
"command": [
"bash", "-c", f"while true ; do echo {name} ; sleep 1 ; done"
],
"image": "ubi8",
"name": "echo",
}]
pod.metadata.labels = { "app": name }
# set the deployment as its owner for garbage collection of the deployment
pod.metadata.owner_references = [
{
"apiVersion": "apps/v1",
"blockOwnerDeletion": True,
"controller": True,
"kind": "Deployment",
"name": deployment_name,
"uid": find_owner_reference(deployment_name)
}
]
# print(pod)
return pod
def find_owner_reference(name:str) -> str:
"""
get operator's deployment UID to set owner_reference of a pod
"""
deployment = find_deployment(name, NAMESPACE)
if (deployment):
return deployment.metadata.uid
else:
return None
def start_pod(pod:V1Pod, namespace:str) -> None:
"""
instanceate the pod
"""
api = client.CoreV1Api()
try:
response = api.create_namespaced_pod(namespace, pod)
except ApiException as e:
print(f"Exception when calling CoreV1Api->create_namespaced_pod: {e}")
def find_pod(name:str, namespace:str) -> V1Pod:
"""
get specified pod
"""
api = client.CoreV1Api()
try:
response = api.read_namespaced_pod(name, namespace)
except ApiException as e:
print(f"{dt.now()} : Exception when calling CoreV1Api->read_namespaced_pod: {e}")
return None
return response
def find_deployment(name:str, namespace:str) -> V1Deployment:
"""
get specified deployment
"""
api = client.AppsV1Api()
try:
response = api.read_namespaced_deployment(name, namespace, pretty=False)
except ApiException as e:
print(f"{dt.now()} : Exception when calling AppsV1Api->read_namespaced_deployment: {e}")
return None
return response
def get_deployment_list(namespace:str) -> []:
"""
get list of all deployments in the namespace
"""
api = client.AppsV1Api()
try:
response = api.list_namespaced_deployment(namespace)
except ApiException as e:
print(f"{dt.now()} : Exception when calling AppsV1Api->list_namespaced_deployment: {e}")
return []
return response.items
def control_loop() -> None:
"""
operator controll loop
"""
while True:
print(f"{dt.now()} : control loop start")
deployment_list = get_deployment_list(NAMESPACE)
for item in deployment_list:
deployment_name = item.metadata.name
namespace = item.metadata.namespace
print(f"{dt.now()} : deployment name = {deployment_name}")
# Reconciliation
pod_name = deployment_name
if (find_pod(pod_name, namespace) == None):
print(f"{dt.now()} : pod {pod_name} is not found, create a new one")
start_pod(make_pod(pod_name, deployment_name), namespace)
sleep(CONTROL_LOOP_INTERVAL)
if __name__ == "__main__":
config.load_kube_config("./kubeconfig")
control_loop()
exit(1)
まとめ
Kubernetes PythonSDKを使用して、Deploymentリソースを参照するOperator simple-deployment
を作成しました。
このOperator(コントローラ)は、動作が単純であるため、参照するリソース(Deployment)の有無しか見ていません。しかし、一般的なOperatorが行うように、リソースの修正をどのように検知すれば良いでしょうか。また、ガベージコレクションを行うためのownerReferenceの設定など、面倒なところがあるのも悩ましいところです。
こうした、Operator(コントローラ)を作成する際に必要となる様々な機能は、OperatorSDKが担当する分野です。Pythonでコントローラを作成するのであれば、kopfがそれらを行います。もう少し込み入ったコントローラが必要となる場合には、力技でロジックを書くこともできますが、Operator SDKの力を借りることで、作業はずっと楽になります。