kubernetes

Kubernetes 作業キューを使った並列ジョブ

k8sのジョブの利用方法で、Queue with Pod Per Work Item (作業項目毎のポッドのキュー) の意味が良く分からないので、リンク先のCoarse Parallel Processing Using a Work Queue を検証して、その意図する処を確かめたメモです。

概要

目的とする処理方式を図にすると、次の様になります。
主題であるバッチ型処理は、右下の赤い四角形で、RabbitMQ からデータを取得して、バッチ・ジョブを実行します。

この図で Depolyment or Replication Controller と表記した箱で、左上がRabbitMQへのデータの投入側になります。そして、真ん中の箱が、RabbitMQのコンテナが動作するポッドです。その上に描いた紫の小さな箱が、RabbitMQを他のデプロイメントやジョブからアクセスできる様にするための "サービス" です。 この "サービス" を立てることで、クラスタIPアドレスが付与され、k8s内部のDNSへサービス名が登録され、データ生成側のポッド、バッチ処理のポッドから、サービス名でアクセスできる様になります。

スクリーンショット 2018-02-03 13.26.09.png

この様な構成をとることで、大量にメモリを消費するレンダリング、膨大な宛先へ短時間でメール送信するための処理、NoSQLデータベースをサーチしてデータを集める、などなど、一回の処理で、1分以上を要し、大量のメモリや、並列処理を必要とする用途に、適したバッチ処理の環境を作ることができます。

ここで1分間以上の処理と書きましたが、Depolyment や Replication Controller は、終了通知のシグナルを受けてから1分以内で、コンテナ上のプロセスを正常終了させる必要があるためです。 さもなければ、強制停止となります。

検証

実際に上記のジョブを動作させて振る舞いを確認して行きます。

メッセージキューサービスの開始

RabbitMQのサービスを実行するための YAMLファイルは、GitHubのサンプルYAMLを利用します。 https://github.com/kubernetes/kubernetes/tree/release-1.3/examples/celery-rabbitmq

次の様に、GitHubに置かれたファイルを kubectl から直接指定して、RabbitMQを起動します。

$ kubectl create -f https://raw.githubusercontent.com/kubernetes/kubernetes/release-1.3/examples/celery-rabbitmq/rabbitmq-service.yaml
service "rabbitmq-service" created

$ kubectl create -f https://raw.githubusercontent.com/kubernetes/kubernetes/release-1.3/examples/celery-rabbitmq/rabbitmq-controller.yaml
replicationcontroller "rabbitmq-controller" created

これで、RabbitMQの準備は完了になります。 もしも、RabbitMQが動作する仮想サーバー(ノード)が異常停止すれば、残りのノードで、RabbitMQのポッドが起動します。 今回はやっていませんが、キューの永続化ディスクを永続ストレージに設定していれば、キューのデータの喪失も防ぐことができます。

クラスタのポッドにログインして、サービスにアクセスする

ここまでに立ち上げた RabbitMQ のテストを行います。 k8sクラスタ内のClusterIPにアクセスするために、シェルやcurlコマンドを実行するためのポッドを起動してログインします。

$ kubectl run -i --tty temp --image ubuntu:14.04
If you don't see a command prompt, try pressing enter.
root@temp-95cf56858-zfvnp:/# apt-get update
...

root@temp-95cf56858-zfvnp:/# apt-get install -y curl ca-certificates amqp-tools python dnsutils
...

既にポッドが存在していて、再度ポッドにログインする場合は、kubectl get pods で目的のポッドのIDをメモっておき、次のコマンドでログインします。

$ kubectl exec -it temp-95cf56858-zfvnp bash

メッセージキューサービスのテスト

まずは サービスのrabbitmq-serviceの作成によって kube-dnsに登録されているか確認します。 nslookup コマンドで k8sクラスタ内ネットワークのアドレスが確認できれば成功です。

root@temp-95cf56858-zfvnp:/# nslookup rabbitmq-service
Server:                       172.21.0.10
Address:                  172.21.0.10#53

Name:                     rabbitmq-service.default.svc.cluster.local
Address: 172.21.144.164

繰り返し利用するキューのブローカのURLを環境変数にセットしておき、テスト用のキュー名 foo を作成します。

root@temp-95cf56858-zfvnp:/# export BROKER_URL=amqp://guest:guest@rabbitmq-service:5672
root@temp-95cf56858-zfvnp:/# /usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d
foo

キューに書き込んで、読み取れたらテスト成功です。

root@temp-95cf56858-zfvnp:/# /usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello
root@temp-95cf56858-zfvnp:/# /usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo
Hello

キューの作成とデータのプット

テストに成功したので、ジョブが実際にアクセスするキュー job1 を作成します。 そして、job1に8個の文字列を書き込んでおきます。

root@temp-95cf56858-zfvnp:/# /usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1  -d
job1

root@temp-95cf56858-zfvnp:/# for f in apple banana cherry date fig grape lemon melon
> do
>   /usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f
> done

繰り返し検証する場合には、次のワンラインをコピペすることで、何度もキューに書き込むことができます。

for f in apple banana cherry date fig grape lemon melon; do /usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f; done

以上でキューの準備が完了しました。 最初の絵で言えば、左上のデータ生成側常駐側ポッド、真ん中の常駐型サービスの二つが出来た事になります。
次に、右下の赤いボックスのジョブを作って行きます。

JOBのコンテナの作成

参考にした Coarse Parallel Processing Using a Work Queue の worker.py には誤りがあり動作しないので、sys.stdin以下を少し修正した次のコードを利用します。

worker.py
#!/usr/bin/env python

# Just prints standard out and sleeps for 10 seconds.
import sys
import time
print("Processing " + sys.stdin.read())
time.sleep(10)

コンテナをビルドするための Dockerfile ですが、最後の行の $BROKER_URL, $QUEUE は、kubectl でジョブを実行するためのYAMLファイルに環境変数として定義します。 こうすることで、コンテナを再ビルドすることなく、ブローカーのアドレスとキュー名の変更に対応できます。

# Specify BROKER_URL and QUEUE when running
FROM ubuntu:14.04

RUN apt-get update && \
    apt-get install -y curl ca-certificates amqp-tools python \
       --no-install-recommends \
    && rm -rf /var/lib/apt/lists/*
COPY ./worker.py /worker.py

CMD  /usr/bin/amqp-consume --url=$BROKER_URL -q $QUEUE -c 1 /worker.py

上記の2つのファイルがあるディレクトリで、次のコマンドを実行して、コンテナを作成します。

$ docker build -t job-wq .
Sending build context to Docker daemon  4.096kB
Step 1/4 : FROM ubuntu:14.04
 ---> dc4491992653
Step 2/4 : RUN apt-get update &&     apt-get install -y curl ca-certificates amqp-tools python        --no-install-recommends     && rm -rf /var/lib/apt/lists/*
 ---> Running in 896d08c85507
Ign http://archive.ubuntu.com trusty InRelease
Get:1 http://security.ubuntu.com trusty-security InRelease [65.9 kB]
...

Removing intermediate container 92823107d4af
Successfully built 1f7d3f40a2b6
Successfully tagged job-wq:latest

出来上がったコンテナは、ローカル環境の Docker リポジトリにありますから、以下のコマンドで確認できます。

$ docker images job-wq
REPOSITORY          TAG                 IMAGE ID            CREATED              SIZE
job-wq              latest              1f7d3f40a2b6        About a minute ago   256MB

コンテナをレジストリへ登録

出来上がったコンテナを k8s がアクセスできるコンテナのレジストリへアップロードします。 名前の後のタグを変更していますが、latestだとレジストリでバージョンが解らないので、v2とか区別できる様にして タグを付与して、プッシュ すなわち、アップロードします。

$ docker tag job-wq:latest registry.au-syd.bluemix.net/takara/job-wq:v2
$ docker push registry.au-syd.bluemix.net/takara/job-wq:v2
The push refers to a repository [registry.au-syd.bluemix.net/takara/job-wq]
2fb352a15d88: Layer already exists 
9f68a7b44cb7: Pushed 
31df331e1f23: Layer already exists 
630730f8c75d: Layer already exists 
827cd1db9e95: Layer already exists 
e6e107f1da2f: Layer already exists 
c41b9462ea4b: Layer already exists 
v2: digest: sha256:f692821d5b47309c9e07388e2f47c17c71299dd05d6dfb302a8e7d23b624fc9e size: 1778

リポジトリへ登録するには、認証をパスしていることが必要ですが、IBM Cloud であれば bx cr login となります。
レジストリのコマンドを利用して、登録を確認します。

$ bx cr images
Listing images...

REPOSITORY                                        NAMESPACE   TAG   DIGEST         CREATED         SIZE     VULNERABILITY STATUS   
registry.au-syd.bluemix.net/takara/job-wq         takara      v2    f692821d5b47   3 minutes ago   85 MB    OK   

これでレジストリ登録の準備は完了です。

バッチ処理の実行

バッチジョブのYAMLを作成します。 8個のデータを登録したので completions: 8 をセットして、並列度に parallelism: 2を設定します。 これで、8個のデータを2つのバッチジョブが並列に、キューからデータを取り出して処理します。

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq
spec:
  completions: 8
  parallelism: 2
  template:
    metadata:
      name: job-wq
    spec:
      containers:
      - name: que-consumer
        image: registry.au-syd.bluemix.net/takara/job-wq:v2
        env:
        - name: BROKER_URL
          value: amqp://guest:guest@rabbitmq-service:5672
        - name: QUEUE
          value: job1
      restartPolicy: OnFailure

ジョブの実行開始です。

$ kubectl create -f job.yml
job "job-wq" created

処理が進行する様子

ジョブが終了していく様を kubectl get job で追ってみます。 徐々に SUCCESSFUL が増えているのが解ります。

$ kubectl get job job-wq
NAME      DESIRED   SUCCESSFUL   AGE
job-wq    8         2            25s
$ kubectl get job job-wq
NAME      DESIRED   SUCCESSFUL   AGE
job-wq    8         3            35s
$ kubectl get job job-wq
NAME      DESIRED   SUCCESSFUL   AGE
job-wq    8         6            58s
$ kubectl get job job-wq
NAME      DESIRED   SUCCESSFUL   AGE
job-wq    8         7            1m
$ kubectl get job job-wq
NAME      DESIRED   SUCCESSFUL   AGE
job-wq    8         8            1m

このジョブを詳細に見ると、キューに入った8個のデータに対して、8個のポッドが実行されて処理が実行されている事が確認できます。 つまり、Queue with Pod Per Work Item (作業項目毎のポッドのキュー) という事ですね。

$ kubectl describe job job-wq
Name:           job-wq
Namespace:      default
Selector:       controller-uid=8545d996-0874-11e8-bca9-d6467fb293a9
Labels:         controller-uid=8545d996-0874-11e8-bca9-d6467fb293a9
                job-name=job-wq
Annotations:    <none>
Parallelism:    2
Completions:    8
Start Time:     Fri, 02 Feb 2018 23:55:17 +0000
Pods Statuses:  0 Running / 8 Succeeded / 0 Failed
Pod Template:
  Labels:  controller-uid=8545d996-0874-11e8-bca9-d6467fb293a9
           job-name=job-wq
  Containers:
   que-consumer:
    Image:  registry.au-syd.bluemix.net/takara/job-wq:v2
    Port:   <none>
    Environment:
      BROKER_URL:  amqp://guest:guest@rabbitmq-service:5672
      QUEUE:       job1
    Mounts:        <none>
  Volumes:         <none>
Events:
  Type    Reason            Age   From            Message
  ----    ------            ----  ----            -------
  Normal  SuccessfulCreate  1m    job-controller  Created pod: job-wq-st727
  Normal  SuccessfulCreate  1m    job-controller  Created pod: job-wq-2lzp5
  Normal  SuccessfulCreate  1m    job-controller  Created pod: job-wq-wjfzb
  Normal  SuccessfulCreate  1m    job-controller  Created pod: job-wq-t7ljq
  Normal  SuccessfulCreate  48s   job-controller  Created pod: job-wq-pjn59
  Normal  SuccessfulCreate  42s   job-controller  Created pod: job-wq-9h6bh
  Normal  SuccessfulCreate  35s   job-controller  Created pod: job-wq-zj7lr
  Normal  SuccessfulCreate  29s   job-controller  Created pod: job-wq-4znc9

参考元の記事では、ここで終わっているのですが、ちゃんとログの内容まで見て置かないと、気が済まないので、次へ進みます。

結果ログの表示

ジョブのポッドのリストを表示するのは、次のコマンドになります。

kubectl get pods  --show-all --selector=job-name=job-wq --output=jsonpath={.items..metadata.name}
job-wq-2lzp5 job-wq-4znc9 job-wq-9h6bh job-wq-pjn59 job-wq-st727 job-wq-t7ljq job-wq-wjfzb job-wq-zj7lr

この結果を for ループで受けて、ポッド毎にログを表示すると、以下になります。 キューのデータを1ポッドづつで処理しているのが解りますね。

$ for f in $(kubectl get pods  --show-all --selector=job-name=job-wq --output=jsonpath={.items..metadata.name}); do echo $f; kubectl logs $f; done
job-wq-2lzp5
Processing date
job-wq-4znc9
Processing banana
job-wq-9h6bh
Processing melon
job-wq-pjn59
Processing lemon
job-wq-st727
Processing cherry
job-wq-t7ljq
Processing fig
job-wq-wjfzb
Processing grape
job-wq-zj7lr
Processing apple

まとめ

Kubernetesのドキュメントは、良く書いてあると思うのですが、抽象的で具体的にどういう意味なのか、解らない事が多いので、理解のために、読み解きながら、実際にコマンドを実行して確認しているのですが、今回も、これだけ試す事で、正確に把握できた様に思います。
作業キューは、RabbitMQで無くても良いのでしょうけど、コンテナを引用する事で、目的の作業に最短で辿りつけ、再利用が容易という点でコンテナ技術は素晴らしいと思います。

おまけ

Replication Controller と Deployment

古いサンプルコードのほとんどは、Replication Controller を利用しており、最近のサンプリは、Deploymentを利用している様です。 そこで、今回は、Deploymentに書き換えても、変わらず実行できるか確認しました。

下記の二つのどちらでも変わらず実行できますが、最近ではDeploymentを利用する事が推奨されています。

Replication Controller で実行するケース

rabbitmq-controller.yaml
apiVersion: v1
kind: ReplicationController
metadata:
  labels:
    component: rabbitmq
  name: rabbitmq-controller
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: taskQueue
        component: rabbitmq
    spec:
      containers:
      - image: rabbitmq
        name: rabbitmq
        ports:
        - containerPort: 5672
        resources:
          limits:
            cpu: 100m

関係のあるものだけを抜粋した内容ですが、 rc と po で始まる行が、上記のYAMLの適用結果です。

$ kubectl get all

NAME                                READY     STATUS             RESTARTS   AGE
po/rabbitmq-controller-tmqxp        1/1       Running            0          5m

NAME                     DESIRED   CURRENT   READY     AGE
rc/rabbitmq-controller   1         1         1         5m

NAME                     TYPE           CLUSTER-IP       EXTERNAL-IP     PORT(S)          AGE
svc/rabbitmq-service     ClusterIP      172.21.110.213   <none>          5672/TCP         5m

Deployment で実行するケース

rabbitmq-deployment.yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  labels:
    component: rabbitmq
  name: rabbitmq-deployment
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: taskQueue
        component: rabbitmq
    spec:
      containers:
      - image: rabbitmq
        name: rabbitmq
        ports:
        - containerPort: 5672
        resources:
          limits:
            cpu: 100m

抜粋ですが、deploy - rs - po の階層構造で実行されているのが解ります。

$ kubectl get all
NAME                         DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
deploy/rabbitmq-deployment   1         1         1            1           1h

NAME                                DESIRED   CURRENT   READY     AGE
rs/rabbitmq-deployment-755d55bd69   1         1         1         1h

NAME                                      READY     STATUS    RESTARTS   AGE
po/rabbitmq-deployment-755d55bd69-68jdc   1/1       Running   0          1h

svc/rabbitmq-service     ClusterIP      172.21.170.254   <none>          5672/TCP         1h