5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Kubernetes3Advent Calendar 2020

Day 18

Kubernetes上でPulsar Functions使ってみた

Last updated at Posted at 2020-12-18

Pulsar

こんにちは。ヤフー株式会社システム統括本部の木村です。
私の所属するteamではopen-sourceのpub-sub型messaging platformであるApache Pulsarを社内向けに開発, 運用しています。

弊teamでは過去に幾つかPulsarに関する記事を書いてきました。

Pulsarがどのようなsoftwareなのかについては上の記事を御参照ください。
本稿では、Pulsarの機能の一つであるPulsar Functionsを紹介し、これをKubernetes上で動作させるtutorialを提供します。

Pulsar Functions

Pulsar Functionsを利用するとuserはPulsar cluster内にfunctionと呼ばれるprocessを作成できます。これは

  • 入力topicにmessageが届くと
  • userが.jar.pyとして実装した処理をmessageに施し
  • 出力topicに送る

と言う動作をします。

pulsar-function

Pulsar Functionsを使用する事で、別個にsystemを用意する事無くPulsar単体でmessageを処理する事が可能になります。これはserverlessなFaaSを提供します(AWS Lambdaなどの影響を受けたようです)。

Pulsar FunctionsをKubernetesで使う

実際にPulsar Functionsを使ってみましょう。
本節では、Pulsar clusterのdeployからfunctionの作成, 利用までを行います。

本節の手順はMacとCentOS上で確認しましたが、Ubuntuなどの他Linux distributionでも動作します。
概ね公式document通りですが、一部注意が必要な箇所が有るために追記, 修正してあります。

事前準備

次のsoftwareがinstallされている必要が有ります。

  • Docker
  • Kubectl
  • Minkube
  • Helm

Macでのinstallation例

$ brew install docker kubectl minikube helm
$ brew cask install docker

CentOSでのinstallation例

# docker
# https://docs.docker.com/engine/install/centos/
$ sudo yum install -y yum-utils
$ sudo yum-config-manager --add-repo  https://download.docker.com/linux/centos/docker-ce.repo
$ sudo yum install -y docker-ce
$ sudo systemctl start docker
$ sudo usermod -aG docker $USER && newgrp docker

# kubectl
# https://kubernetes.io/docs/tasks/tools/install-kubectl/
$ curl -LO "https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl"
$ chmod +x ./kubectl
$ sudo mv ./kubectl /usr/local/bin/kubectl
$ kubectl version --client

# minikube
# https://minikube.sigs.k8s.io/docs/start/
$ curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
$ chmod +x minikube

# helm
# https://helm.sh/docs/intro/install/
$ curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3
$ chmod 700 get_helm.sh
$ ./get_helm.sh

MinikubeでKubernetes clusterを準備する

本解説ではMinikube 1 を利用します。
既に有るclusterを使う事も可能ですが、本解説通りの手順では動作しない場合が有ります。

# Dockerを起動しておきます

# Minikubeを起動します(defaultでDockerがdriverになります)
$ minikube start --cpus=4

🏄  Done! kubectl is now configured to use "minikube" cluster and "default" namespace by default

# Minikubeをcontextとして設定します
$ kubectl config use-context minikube

Pulsarをdeployする

上で起動したKubernetes clusterにPulsarをdeployします。
deploymentにはHelm 2 のchartが公式に用意されているので、それを利用します。

$ git clone https://github.com/apache/pulsar-helm-chart -b pulsar-2.6.2-2 --depth 1
$ cd pulsar-helm-chart

# Pulsarのdeploymentに使うsecretを生成します
# namespaceもこれで作成されます
# usage: -n <k8s namespace> -k <pulsar release name> -c
$ scripts/pulsar/prepare_helm_release.sh -n pulsar -k pulsar-mini -c

# Minikube用に用意された設定を利用してdeployします
$ helm install pulsar-mini ./charts/pulsar \
-f examples/values-minikube.yaml \
--set initialize=true \
-n pulsar --create-namespace

# functionを作るのに必要な権限をbrokerに与えます
$ kubectl apply -f - <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: "pulsar-mini-functions-worker"
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: "pulsar-mini-functions-worker"
subjects:
- kind: ServiceAccount
  name: "pulsar-mini-broker-acct"
  namespace: pulsar
EOF

# deploymentが完了したか確認します
# 全てのSTATUSがRunningかCompletedになるまで待ちます
$ kubectl get pods -n pulsar

NAME                                         READY   STATUS      RESTARTS   AGE
pulsar-mini-bookie-0                         1/1     Running     0          30m
pulsar-mini-bookie-init-q87qw                0/1     Completed   0          30m
pulsar-mini-broker-0                         1/1     Running     0          30m
pulsar-mini-grafana-555cf54cf-kr74l          1/1     Running     0          30m
pulsar-mini-prometheus-5556dbb8b8-hjvbq      1/1     Running     0          30m
pulsar-mini-proxy-0                          1/1     Running     0          30m
pulsar-mini-pulsar-init-l4whg                0/1     Completed   0          30m
pulsar-mini-pulsar-manager-6c6889dff-5qml9   1/1     Running     0          30m
pulsar-mini-toolset-0                        1/1     Running     0          30m
pulsar-mini-zookeeper-0                      1/1     Running     0          30m

bookieやbrokerなどのcomponentがpodとしてdeployされました。

ここで利用したvalues-minikube.yamlはMinikubeへのdeployment用に最小限の設定値が既述されたfileです。違う設定でdeployしたい場合、詳細はHelm chartのREADME公式documentを御確認ください。

produce-consume確認

Pulsar clusterが正常に動作しているか確認するためにproduceconsumeを実行します。


# 接続先を確認します
$ minikube service pulsar-mini-proxy -n pulsar

|-----------|-------------------|-------------|---------------------------|
| NAMESPACE |       NAME        | TARGET PORT |            URL            |
|-----------|-------------------|-------------|---------------------------|
| pulsar    | pulsar-mini-proxy | http/80     | http://192.168.49.2:30538 |
|           |                   | pulsar/6650 | http://192.168.49.2:31318 |
|-----------|-------------------|-------------|---------------------------|
🏃  Starting tunnel for service pulsar-mini-proxy.
|-----------|-------------------|-------------|------------------------|
| NAMESPACE |       NAME        | TARGET PORT |          URL           |
|-----------|-------------------|-------------|------------------------|
| pulsar    | pulsar-mini-proxy |             | http://127.0.0.1:56975 |
|           |                   |             | http://127.0.0.1:56976 |
|-----------|-------------------|-------------|------------------------|

# 最下のURL(ここでは127.0.0.1:56976)を記録します

別のterminalを開き、subscribeします。

# pulsarをdownload, 解凍
$ cd ..
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.6.2/apache-pulsar-2.6.2-bin.tar.gz
$ tar xf apache-pulsar-2.6.2-bin.tar.gz
$ cd apache-pulsar-2.6.2

# consumerを作成してsubscribeします
# --urlの値は先ほど記録した物です
$ bin/pulsar-client --url pulsar://127.0.0.1:56976 consume persistent://public/default/test -s subscription

更に別のterminalを開きproduceします。

$ bin/pulsar-client --url pulsar://127.0.0.1:56976 produce persistent://public/default/test -m "hello"

subscribeしたterminalに戻りreceiveしているか確認します。

----- got message -----
key:[null], properties:[], content:hello

functionを作成

いよいよPulsar Functionsを利用します。

# pulsar-admin利用のためにtoolsetに入ります
$ kubectl exec -it -n pulsar pulsar-mini-toolset-0 -- /bin/bash

# 初めはfunctionが有りません
$ bin/pulsar-admin functions list
# 何も表示されない

# functionを作成します
# jar: functionsを実装したJARのpath
# classname: functionを実装したclass名
# fqfn: fully qualified function name (<tenant>/<namespace>/<function name>)
# inputs: 入力topic(comma区切り)
# output: 出力topic
# log-topic: log topic
$ bin/pulsar-admin functions create \
--jar examples/api-examples.jar \
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
--fqfn public/default/exclamation \
--inputs persistent://public/default/in \
--output persistent://public/default/out \
--log-topic persistent://public/default/log

"Created successfully"

# exclamationの名でfunctionが作成されています
$ bin/pulsar-admin functions list

"exclamation"

# functionの状態を確認します
# "runnning" : true であれば問題有りません
$ bin/pulsar-admin functions status --name exclamation

{
  "numInstances" : 1,
  "numRunning" : 1,
  "instances" : [ {
    "instanceId" : 0,
    "status" : {
      "running" : true,
      "error" : "",
      "numRestarts" : 0,
      "numReceived" : 0,
      "numSuccessfullyProcessed" : 0,
      "numUserExceptions" : 0,
      "latestUserExceptions" : [ ],
      "numSystemExceptions" : 0,
      "latestSystemExceptions" : [ ],
      "averageLatency" : 0.0,
      "lastInvocationTime" : 0,
      "workerId" : "c-pulsar-mini-fw-pulsar-mini-broker-0.pulsar-mini-broker.pulsar.svc.cluster.local-8080"
    }
  } ]
}

$ exit

# functionがpodとして作成されています
$ kubectl get pods -n pulsar

pf-public-default-exclamation-0              0/1     Runnning    0          30s # this!
pulsar-mini-bookie-0                         1/1     Running     0          1h
pulsar-mini-bookie-init-q87qw                0/1     Completed   0          1h
pulsar-mini-broker-0                         1/1     Running     0          1h
pulsar-mini-grafana-555cf54cf-kr74l          1/1     Running     0          1h
pulsar-mini-prometheus-5556dbb8b8-hjvbq      1/1     Running     0          1h
pulsar-mini-proxy-0                          1/1     Running     0          1h
pulsar-mini-pulsar-init-l4whg                0/1     Completed   0          1h
pulsar-mini-pulsar-manager-6c6889dff-5qml9   1/1     Running     0          1h
pulsar-mini-toolset-0                        1/1     Running     0          1h
pulsar-mini-zookeeper-0                      1/1     Running     0          1h

ここで用いたJARは例として公式に準備された物です。
sourceを御覧になると判りますが、入力messageの末尾に!を追加して出力する単純なmethodです。
当然、自前でfunctionを実装する事も可能です。Javaの場合はjava.util.Functionorg.apache.pulsar.functions.api.Functionを実装したJARが必要になります(詳細)。Pythonにも対応しています。

functionを利用

作成したfunctionが機能していることを確認します。

# 出力topicにsubscribeします
$ bin/pulsar-client --url pulsar://127.0.0.1:56976 consume persistent://public/default/out -s subscription

別のterminalを開き入力topicにproduceします

$ bin/pulsar-client --url pulsar://127.0.0.1:56976 produce persistent://public/default/in -m "hello"

subscribeしたterminalに戻り、末尾に!の付いたmessageをreceiveしていることを確認します。

----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CAgQACAA, __pfn_input_topic__=persistent://public/default/in], content:hello!

Pulsar Functionsを利用してmessageにmethodを適用することができました。

後処理

最後に、本稿で導入したresourceなどを削除します。

$ helm delete pulsar-mini -n pulsar
$ kubectl delete --all all -n pulsar
$ minikube delete --all

複数開いたterminalは全て閉じます。

終わりに

本稿では、Pulsarの機能であり、FaaSを実現するPulsar Functionsの紹介と、それをMinikubeで動作させる手順について書きました。
ここで書き切れなかったPulsar Functionsの詳細については公式documentを御参照ください。

本稿がPulsarとPulsar Functionsの理解の一助となれば幸いです。

  1. localでKubernetes clusterを容易に作成できるsoftware

  2. Kubernetes上のapplicationの定義, 導入, 更新等をchartと呼ばれるfileを用いて簡単に行えるsoftware

5
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?