k8sのジョブの利用方法で、Queue with Pod Per Work Item (作業項目毎のポッドのキュー) の意味が良く分からないので、リンク先のCoarse Parallel Processing Using a Work Queue を検証して、その意図する処を確かめたメモです。
概要
目的とする処理方式を図にすると、次の様になります。
主題であるバッチ型処理は、右下の赤い四角形で、RabbitMQ からデータを取得して、バッチ・ジョブを実行します。
この図で Depolyment or Replication Controller と表記した箱で、左上がRabbitMQへのデータの投入側になります。そして、真ん中の箱が、RabbitMQのコンテナが動作するポッドです。その上に描いた紫の小さな箱が、RabbitMQを他のデプロイメントやジョブからアクセスできる様にするための "サービス" です。 この "サービス" を立てることで、クラスタIPアドレスが付与され、k8s内部のDNSへサービス名が登録され、データ生成側のポッド、バッチ処理のポッドから、サービス名でアクセスできる様になります。
この様な構成をとることで、大量にメモリを消費するレンダリング、膨大な宛先へ短時間でメール送信するための処理、NoSQLデータベースをサーチしてデータを集める、などなど、一回の処理で、1分以上を要し、大量のメモリや、並列処理を必要とする用途に、適したバッチ処理の環境を作ることができます。
ここで1分間以上の処理と書きましたが、Depolyment や Replication Controller は、終了通知のシグナルを受けてから1分以内で、コンテナ上のプロセスを正常終了させる必要があるためです。 さもなければ、強制停止となります。
検証
実際に上記のジョブを動作させて振る舞いを確認して行きます。
メッセージキューサービスの開始
RabbitMQのサービスを実行するための YAMLファイルは、GitHubのサンプルYAMLを利用します。 https://github.com/kubernetes/kubernetes/tree/release-1.3/examples/celery-rabbitmq
次の様に、GitHubに置かれたファイルを kubectl から直接指定して、RabbitMQを起動します。
$ kubectl create -f https://raw.githubusercontent.com/kubernetes/kubernetes/release-1.3/examples/celery-rabbitmq/rabbitmq-service.yaml
service "rabbitmq-service" created
$ kubectl create -f https://raw.githubusercontent.com/kubernetes/kubernetes/release-1.3/examples/celery-rabbitmq/rabbitmq-controller.yaml
replicationcontroller "rabbitmq-controller" created
これで、RabbitMQの準備は完了になります。 もしも、RabbitMQが動作する仮想サーバー(ノード)が異常停止すれば、残りのノードで、RabbitMQのポッドが起動します。 今回はやっていませんが、キューの永続化ディスクを永続ストレージに設定していれば、キューのデータの喪失も防ぐことができます。
クラスタのポッドにログインして、サービスにアクセスする
ここまでに立ち上げた RabbitMQ のテストを行います。 k8sクラスタ内のClusterIPにアクセスするために、シェルやcurlコマンドを実行するためのポッドを起動してログインします。
$ kubectl run -i --tty temp --image ubuntu:14.04
If you don't see a command prompt, try pressing enter.
root@temp-95cf56858-zfvnp:/# apt-get update
...
root@temp-95cf56858-zfvnp:/# apt-get install -y curl ca-certificates amqp-tools python dnsutils
...
既にポッドが存在していて、再度ポッドにログインする場合は、kubectl get pods
で目的のポッドのIDをメモっておき、次のコマンドでログインします。
$ kubectl exec -it temp-95cf56858-zfvnp bash
メッセージキューサービスのテスト
まずは サービスのrabbitmq-service
の作成によって kube-dnsに登録されているか確認します。 nslookup コマンドで k8sクラスタ内ネットワークのアドレスが確認できれば成功です。
root@temp-95cf56858-zfvnp:/# nslookup rabbitmq-service
Server: 172.21.0.10
Address: 172.21.0.10#53
Name: rabbitmq-service.default.svc.cluster.local
Address: 172.21.144.164
繰り返し利用するキューのブローカのURLを環境変数にセットしておき、テスト用のキュー名 foo
を作成します。
root@temp-95cf56858-zfvnp:/# export BROKER_URL=amqp://guest:guest@rabbitmq-service:5672
root@temp-95cf56858-zfvnp:/# /usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d
foo
キューに書き込んで、読み取れたらテスト成功です。
root@temp-95cf56858-zfvnp:/# /usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello
root@temp-95cf56858-zfvnp:/# /usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo
Hello
キューの作成とデータのプット
テストに成功したので、ジョブが実際にアクセスするキュー job1
を作成します。 そして、job1に8個の文字列を書き込んでおきます。
root@temp-95cf56858-zfvnp:/# /usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1 -d
job1
root@temp-95cf56858-zfvnp:/# for f in apple banana cherry date fig grape lemon melon
> do
> /usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f
> done
繰り返し検証する場合には、次のワンラインをコピペすることで、何度もキューに書き込むことができます。
for f in apple banana cherry date fig grape lemon melon; do /usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f; done
以上でキューの準備が完了しました。 最初の絵で言えば、左上のデータ生成側常駐側ポッド、真ん中の常駐型サービスの二つが出来た事になります。
次に、右下の赤いボックスのジョブを作って行きます。
JOBのコンテナの作成
参考にした Coarse Parallel Processing Using a Work Queue の worker.py には誤りがあり動作しないので、sys.stdin以下を少し修正した次のコードを利用します。
#!/usr/bin/env python
# Just prints standard out and sleeps for 10 seconds.
import sys
import time
print("Processing " + sys.stdin.read())
time.sleep(10)
コンテナをビルドするための Dockerfile ですが、最後の行の $BROKER_URL, $QUEUE は、kubectl でジョブを実行するためのYAMLファイルに環境変数として定義します。 こうすることで、コンテナを再ビルドすることなく、ブローカーのアドレスとキュー名の変更に対応できます。
# Specify BROKER_URL and QUEUE when running
FROM ubuntu:14.04
RUN apt-get update && \
apt-get install -y curl ca-certificates amqp-tools python \
--no-install-recommends \
&& rm -rf /var/lib/apt/lists/*
COPY ./worker.py /worker.py
CMD /usr/bin/amqp-consume --url=$BROKER_URL -q $QUEUE -c 1 /worker.py
上記の2つのファイルがあるディレクトリで、次のコマンドを実行して、コンテナを作成します。
$ docker build -t job-wq .
Sending build context to Docker daemon 4.096kB
Step 1/4 : FROM ubuntu:14.04
---> dc4491992653
Step 2/4 : RUN apt-get update && apt-get install -y curl ca-certificates amqp-tools python --no-install-recommends && rm -rf /var/lib/apt/lists/*
---> Running in 896d08c85507
Ign http://archive.ubuntu.com trusty InRelease
Get:1 http://security.ubuntu.com trusty-security InRelease [65.9 kB]
...
Removing intermediate container 92823107d4af
Successfully built 1f7d3f40a2b6
Successfully tagged job-wq:latest
出来上がったコンテナは、ローカル環境の Docker リポジトリにありますから、以下のコマンドで確認できます。
$ docker images job-wq
REPOSITORY TAG IMAGE ID CREATED SIZE
job-wq latest 1f7d3f40a2b6 About a minute ago 256MB
コンテナをレジストリへ登録
出来上がったコンテナを k8s がアクセスできるコンテナのレジストリへアップロードします。 名前の後のタグを変更していますが、latestだとレジストリでバージョンが解らないので、v2とか区別できる様にして タグを付与して、プッシュ すなわち、アップロードします。
$ docker tag job-wq:latest registry.au-syd.bluemix.net/takara/job-wq:v2
$ docker push registry.au-syd.bluemix.net/takara/job-wq:v2
The push refers to a repository [registry.au-syd.bluemix.net/takara/job-wq]
2fb352a15d88: Layer already exists
9f68a7b44cb7: Pushed
31df331e1f23: Layer already exists
630730f8c75d: Layer already exists
827cd1db9e95: Layer already exists
e6e107f1da2f: Layer already exists
c41b9462ea4b: Layer already exists
v2: digest: sha256:f692821d5b47309c9e07388e2f47c17c71299dd05d6dfb302a8e7d23b624fc9e size: 1778
リポジトリへ登録するには、認証をパスしていることが必要ですが、IBM Cloud であれば bx cr login
となります。
レジストリのコマンドを利用して、登録を確認します。
$ bx cr images
Listing images...
REPOSITORY NAMESPACE TAG DIGEST CREATED SIZE VULNERABILITY STATUS
registry.au-syd.bluemix.net/takara/job-wq takara v2 f692821d5b47 3 minutes ago 85 MB OK
これでレジストリ登録の準備は完了です。
バッチ処理の実行
バッチジョブのYAMLを作成します。 8個のデータを登録したので completions: 8
をセットして、並列度に parallelism: 2
を設定します。 これで、8個のデータを2つのバッチジョブが並列に、キューからデータを取り出して処理します。
apiVersion: batch/v1
kind: Job
metadata:
name: job-wq
spec:
completions: 8
parallelism: 2
template:
metadata:
name: job-wq
spec:
containers:
- name: que-consumer
image: registry.au-syd.bluemix.net/takara/job-wq:v2
env:
- name: BROKER_URL
value: amqp://guest:guest@rabbitmq-service:5672
- name: QUEUE
value: job1
restartPolicy: OnFailure
ジョブの実行開始です。
$ kubectl create -f job.yml
job "job-wq" created
処理が進行する様子
ジョブが終了していく様を kubectl get job
で追ってみます。 徐々に SUCCESSFUL が増えているのが解ります。
$ kubectl get job job-wq
NAME DESIRED SUCCESSFUL AGE
job-wq 8 2 25s
$ kubectl get job job-wq
NAME DESIRED SUCCESSFUL AGE
job-wq 8 3 35s
$ kubectl get job job-wq
NAME DESIRED SUCCESSFUL AGE
job-wq 8 6 58s
$ kubectl get job job-wq
NAME DESIRED SUCCESSFUL AGE
job-wq 8 7 1m
$ kubectl get job job-wq
NAME DESIRED SUCCESSFUL AGE
job-wq 8 8 1m
このジョブを詳細に見ると、キューに入った8個のデータに対して、8個のポッドが実行されて処理が実行されている事が確認できます。 つまり、Queue with Pod Per Work Item (作業項目毎のポッドのキュー) という事ですね。
$ kubectl describe job job-wq
Name: job-wq
Namespace: default
Selector: controller-uid=8545d996-0874-11e8-bca9-d6467fb293a9
Labels: controller-uid=8545d996-0874-11e8-bca9-d6467fb293a9
job-name=job-wq
Annotations: <none>
Parallelism: 2
Completions: 8
Start Time: Fri, 02 Feb 2018 23:55:17 +0000
Pods Statuses: 0 Running / 8 Succeeded / 0 Failed
Pod Template:
Labels: controller-uid=8545d996-0874-11e8-bca9-d6467fb293a9
job-name=job-wq
Containers:
que-consumer:
Image: registry.au-syd.bluemix.net/takara/job-wq:v2
Port: <none>
Environment:
BROKER_URL: amqp://guest:guest@rabbitmq-service:5672
QUEUE: job1
Mounts: <none>
Volumes: <none>
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal SuccessfulCreate 1m job-controller Created pod: job-wq-st727
Normal SuccessfulCreate 1m job-controller Created pod: job-wq-2lzp5
Normal SuccessfulCreate 1m job-controller Created pod: job-wq-wjfzb
Normal SuccessfulCreate 1m job-controller Created pod: job-wq-t7ljq
Normal SuccessfulCreate 48s job-controller Created pod: job-wq-pjn59
Normal SuccessfulCreate 42s job-controller Created pod: job-wq-9h6bh
Normal SuccessfulCreate 35s job-controller Created pod: job-wq-zj7lr
Normal SuccessfulCreate 29s job-controller Created pod: job-wq-4znc9
参考元の記事では、ここで終わっているのですが、ちゃんとログの内容まで見て置かないと、気が済まないので、次へ進みます。
結果ログの表示
ジョブのポッドのリストを表示するのは、次のコマンドになります。
kubectl get pods --show-all --selector=job-name=job-wq --output=jsonpath={.items..metadata.name}
job-wq-2lzp5 job-wq-4znc9 job-wq-9h6bh job-wq-pjn59 job-wq-st727 job-wq-t7ljq job-wq-wjfzb job-wq-zj7lr
この結果を for ループで受けて、ポッド毎にログを表示すると、以下になります。 キューのデータを1ポッドづつで処理しているのが解りますね。
$ for f in $(kubectl get pods --show-all --selector=job-name=job-wq --output=jsonpath={.items..metadata.name}); do echo $f; kubectl logs $f; done
job-wq-2lzp5
Processing date
job-wq-4znc9
Processing banana
job-wq-9h6bh
Processing melon
job-wq-pjn59
Processing lemon
job-wq-st727
Processing cherry
job-wq-t7ljq
Processing fig
job-wq-wjfzb
Processing grape
job-wq-zj7lr
Processing apple
まとめ
Kubernetesのドキュメントは、良く書いてあると思うのですが、抽象的で具体的にどういう意味なのか、解らない事が多いので、理解のために、読み解きながら、実際にコマンドを実行して確認しているのですが、今回も、これだけ試す事で、正確に把握できた様に思います。
作業キューは、RabbitMQで無くても良いのでしょうけど、コンテナを引用する事で、目的の作業に最短で辿りつけ、再利用が容易という点でコンテナ技術は素晴らしいと思います。
おまけ
Replication Controller と Deployment
古いサンプルコードのほとんどは、Replication Controller を利用しており、最近のサンプリは、Deploymentを利用している様です。 そこで、今回は、Deploymentに書き換えても、変わらず実行できるか確認しました。
下記の二つのどちらでも変わらず実行できますが、最近ではDeploymentを利用する事が推奨されています。
Replication Controller で実行するケース
apiVersion: v1
kind: ReplicationController
metadata:
labels:
component: rabbitmq
name: rabbitmq-controller
spec:
replicas: 1
template:
metadata:
labels:
app: taskQueue
component: rabbitmq
spec:
containers:
- image: rabbitmq
name: rabbitmq
ports:
- containerPort: 5672
resources:
limits:
cpu: 100m
関係のあるものだけを抜粋した内容ですが、 rc と po で始まる行が、上記のYAMLの適用結果です。
$ kubectl get all
NAME READY STATUS RESTARTS AGE
po/rabbitmq-controller-tmqxp 1/1 Running 0 5m
NAME DESIRED CURRENT READY AGE
rc/rabbitmq-controller 1 1 1 5m
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
svc/rabbitmq-service ClusterIP 172.21.110.213 <none> 5672/TCP 5m
Deployment で実行するケース
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
labels:
component: rabbitmq
name: rabbitmq-deployment
spec:
replicas: 1
template:
metadata:
labels:
app: taskQueue
component: rabbitmq
spec:
containers:
- image: rabbitmq
name: rabbitmq
ports:
- containerPort: 5672
resources:
limits:
cpu: 100m
抜粋ですが、deploy - rs - po の階層構造で実行されているのが解ります。
$ kubectl get all
NAME DESIRED CURRENT UP-TO-DATE AVAILABLE AGE
deploy/rabbitmq-deployment 1 1 1 1 1h
NAME DESIRED CURRENT READY AGE
rs/rabbitmq-deployment-755d55bd69 1 1 1 1h
NAME READY STATUS RESTARTS AGE
po/rabbitmq-deployment-755d55bd69-68jdc 1/1 Running 0 1h
svc/rabbitmq-service ClusterIP 172.21.170.254 <none> 5672/TCP 1h